Persist "pieces" of a tree (large) in parallel

Asked

Viewed 187 times

10

I find myself with the following problem at hand:

Goal: Parallelize an ETL process that:

  • Reads from an external interface a tree with an undetermined number of elements.
  • Transforms the representation
  • Write these values into a local database.

The current implementation

Start reading all data from the external interface to a memory structure:

Map<String, Map<String, Object>> arvore = leArvore("v=raiz");

The map key is a path to a given tree element. The map value in turn is a second map linking attribute names to their respective values:

Example:

{ 
    'v=raiz' = {
        'nome_atributo_1' : 'valor_1', 
        'nome_atributo_2' : 'valor_2' 
    }, 
    'v=raiz,v=elemento_1' = {
        'nome_atributo_3' : 'valor_3' 
    },
    'v=raiz,v=elemento_2' = {
        'nome_atributo_4' : 'valor_4' 
    }, 
    'v=raiz,v=elemento_1,v=elemento_3' = {
        'nome_atributo_5' : 'valor_5',
        'nome_atributo_6' : 'valor_6',
        'nome_atributo_7' : 'valor_7'   
    } 
}

This structure then undergoes a translation process (mapping attribute values to known types).

Finally a recursive algorithm writes the tree into the database:

void escreveArvore(String caminhoNo) {
    // Persiste o elemento e seus atributos
    persisteElemento(caminhoNo, arvore.get(caminhoNo));
    arvore.remove(caminhoNo);
    for (String caminhoFilho : filhosDiretos(caminhoNo)) {
        escreveArvore(caminhoFilho);    
    }
} 

escrevaArvore("v=raiz");

Why we are changing the implementation?

These trees can be gigantic and deep. Our flow carries several of these trees in memory, which back and forth results in OutOfMemoryErrors. In addition, the external interface responds much better to small queries that return a block of elements than a query that returns the entire tree.

Read and transform into blocks:

int tamanhoBloco = 100;
int quantidadeFilhos = contaFilhos("v=raiz");
int quantidadeBlocos  = quantidadeFilhos / tamanhoBloco + 1;
// Esse laço é mais rápido do que leArvore("v=raiz"), mesmo sem paralelismo
for (int bloco = 0; i < quantidadeBlocos  ; bloco++) {
    Map<String, Map<String, Object>> blocoAtual = leFilhos("raiz", bloco, tamanhoBloco);
}

This part can be easily explained by creating a pool of threads and submitting each block for one ExecutorService.

The transformation can also be easily rewritten with similar logic. A ArrayBlockingQueue can be used as Broker between the reader and transformer. The transformer can receive and process each block as soon as the reader finishes its work.

The problem

I have no idea how to parallelize writing from the database tree.

I can’t just persist one block at a time because I have to have persisted all relatives of a certain element before persisting the element in question. Example, in the above graph I can only persist elemento_3 after having persisted raiz and elemento_1.

Writing operations at the bank are also quite costly and times accumulate easily.

My first attempt was to use an auxiliary structure:

// Caminho do elemento ligado à uma flag dizendo se esse já foi persistido
Map<String, Boolean> elementosPersistidos = new ConcurrentHashMap<String, boolean>();

In this version I replaced the recursive algorithm with a logic that tries to persist each block of elements in parallel. When finding a path with nonpersistent relatives I enter relatives dummy in the database (which are later replaced by the true elements). The performance was abysmal due to the greater amount of Writes.

On the other hand keeping the recursive method means that eventually the entire tree ends up in memory. With faster readings the writing becomes the bottleneck and the chances of OutOfMemory increase.

Is there a known algorithm for writing "pieces" of a tree in parallel? Among these algorithms, there is one that adapts well to my situation, i.e., where elements of the tree arrive disorderly in blocks, and it is desirable to simultaneously minimize the use of memory and the amount of writing?


Updates

Methods using the external interface

Map<String, Map<String, Object>> leArvore(String raiz);
Map<String, Map<String, Object>> leFilhos(String no, int bloco, int tamanhoBloco);

The only method that uses the bank is:

void persisteElemento(String caminhoNo, Map<String, Object> atributos);

Example of out-of-order writing

Bloco 1:
    v=raiz
    v=raiz,v=elemento_1
Bloco 2: 
    v=raiz,v=elemento_1,v=elemento_3

If Bloco 2 is read / processed before the Bloco 1, the next step would be to callpersisteElemento for elemento_3, in this situation we would be trying to persist elemento_3 before raiz and elemento_1 (what we’re trying to avoid).


P.S.: The language in question is Java and the database is Versant, but at the moment I am more concerned with the algorithm itself than with the implementation. As I did not want to restrict the question, answers in pseudo-code are welcome.

  • 1

    Look, the tip I’m about to give you has nothing to do with your doubt, but it’s something a friend did in a doctoral problem and he told me that the speed he achieved is orders of greater magnitude. What he did was to use a ramdisk (a drive that is mapped to RAM, validated on Linux). If you have enough RAM, you can run your algorithm in an intermediate bank that is all in RAM. At the end, you record the data in the official database. Obviously, you need to have enough RAM, it has 64 GB and spent 40 GB. Basically, put a whole Sqlite in memory RAM.

  • Obviously, my comment above only helps in the persisting part of the bank, since the speed of this part will increase. This does not help the Outofmemoryexception that happens when processing the tree.

1 answer

6


Parallelism challenge

The first thing to keep in mind when thinking of parallelism is the dependence on each task that potentially can be performed in parallel.

Of course, not everything is black and white when we talk about parallelism and competition, but in this particular case I think it makes a crucial difference, since the dependence of the parent node to persist a child node is a big part of the problem.

The technique of recording a record dummy was very interesting, but it does not solve the problem because technically it prolongs the time of the task while the dependency is not satisfied. It could work in other scenarios, however, perhaps where the cost of access to the bank was lower than processing.

Possible solution: Producer x Consumer

Theoretical ramblings aside, a method I could think of to satisfy the dependencies and maintain a reasonable degree of parallelization is a specific application of the Producer x Consumer standard.

In this implementation, the same component plays the role of consumer and producer:

  • First it consumes a given element E queued.
  • Processes E, in this case, by inserting it into the database, obtaining its ID and thus resolving the dependencies of its children.
  • Finally, the component plays the role of producer and places the children of E queued to be processed in parallel.

This means that the root of the tree will be processed without parallelism, but from there all the children can potentially be processed in parallel, as well as the children of the children until virtually all the "leaf" elements of the tree can be parallelized.

An interesting detail is that, in the role of consumer, this implementation only needs to know the tree partially, that is, the node being processed. Whereas the producer only needs to know his children. Really not be the impact of this on a large-scale execution, but in theory it is not necessary to store in memory what is not being processed.

Example implementation

I made a basic example implementation using the available information and I believe it is possible to adapt it according to the needs of the project.

See below:

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class TreeConsumerExample {

    //cria mapa de atributos a partir dos parâmetros: chave, valor, chave, valor...
    Map<String, Object> mapOf(Object... args) {
        Map<String, Object> m = new HashMap<>();
        for (int i = 0; i < args.length; i += 2) {
            m.put((String) args[i], args[i+1]);
        }
        return m;
    }

    final Map<String, Map<String, Object>> arvore = new HashMap<>();
    final ArrayBlockingQueue<ElementoArvore> fila = new ArrayBlockingQueue<>(64);
    final AtomicLong ultimoIdInseridoNoBanco = new AtomicLong();
    final AtomicLong quantidadeElementosProcessados = new AtomicLong();

    //inicia arvore
    TreeConsumerExample() {
        arvore.put("v=raiz", mapOf("nome_atributo_1", "valor_1", "nome_atributo_2", "valor_2"));
        arvore.put("v=raiz,v=elemento_1", mapOf("nome_atributo_3", "valor_3"));
        arvore.put("v=raiz,v=elemento_2", mapOf("nome_atributo_4", "valor_4"));
        arvore.put("v=raiz,v=elemento_1,v=elemento_3", mapOf("nome_atributo_5", "valor_5", "nome_atributo_6", "valor_6", "nome_atributo_7", "valor_7"));
    }

    //armazena informacoes de um elemento da árvore na fila
    static class ElementoArvore {
        final String caminhoNo;
        final Map<String, Object> valores;
        final Long idPai;
        public ElementoArvore(String caminhoNo, Map<String, Object> valores, Long idPai) {
            this.caminhoNo = caminhoNo;
            this.valores = valores;
            this.idPai = idPai;
        }
        public Map<String, Object> getValores() {
            return valores;
        }
        public Long getIdPai() {
            return idPai;
        }
        public String getCaminhoNo() {
            return caminhoNo;
        }
    }

    //persiste um elemento e retorna o novo ID
    Long persisteElemento(ElementoArvore elemento) {
        Long novoId = ultimoIdInseridoNoBanco.incrementAndGet();
        System.out.printf("Persistindo %s | valores = %s | id pai = %d, novo id = %d%n", elemento.getCaminhoNo(),
                elemento.getValores(), elemento.getIdPai(), novoId);
        return novoId;
    }

    //recupera os filhos de um elemento a partir do caminho
    Collection<String> filhosDiretos(String caminhoNo) {
        return arvore.keySet().stream()
                .filter(k -> k.length() > caminhoNo.length() && k.startsWith(caminhoNo) && k.substring(caminhoNo.length() + 1).indexOf(',')  < 0)
                .collect(Collectors.toList());
    }

    //thread que retira um elemento da fila, processa e adiciona os filhos
    class ConsumidorProdutor extends Thread {
        final Integer threadNumber;

        public ConsumidorProdutor(Integer threadNumber) {
            this.threadNumber = threadNumber;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    //recupera elemento da fila para processar
                    final ElementoArvore e = fila.take();

                    //verifica poison object
                    if (e.getCaminhoNo() == null) {
                        //se null, fim da fila, recoloca o elemento para acordar outras threads e finaliza a thread atual
                        fila.put(e);
                        System.out.printf("Fim da fila sinalizado. Finalizando thread %d%n", threadNumber);
                        break;
                    }

                    //procesa elemento
                    System.out.printf("Processando elemento %s na thread %d%n", e.getCaminhoNo(), threadNumber);
                    final Long novoId = persisteElemento(e);

                    //insere filhos para processamento usando o ID inserido
                    for (String caminhoFilho : filhosDiretos(e.getCaminhoNo())) {
                        fila.put(new ElementoArvore(caminhoFilho, arvore.get(caminhoFilho), novoId));
                    }

                    //verifica final do processamento
                    if (quantidadeElementosProcessados.incrementAndGet() >= arvore.size()) {
                        //elemento demarcando fim do processamento
                        //isso faz com que as threads sejam "acordadas" e terminem
                        fila.put(new ElementoArvore(null, null, null));
                        System.out.printf("Fim da fila encontrado. Finalizando thread %d%n", threadNumber);
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    void escreveArvore(String caminhoNo) {
        //cria pool de threads
        final ExecutorService threadPool = Executors.newFixedThreadPool(8);

        for (int i = 0; i < 8; i++) {
            threadPool.execute(new ConsumidorProdutor(i+1));
        }

        //coloca o elemento raiz na fila para iniciar o processamento
        try {
            fila.put(new ElementoArvore(caminhoNo, arvore.get(caminhoNo), null));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        //aguarda término
        threadPool.shutdown();
    }

    public static void main(String[] args) {
        new TreeConsumerExample().escreveArvore("v=raiz");
    }
}

To detect the end of the processing, I check the item counter processed with the tree size. This should be improved, mainly because it only works if the whole tree is covered.

In addition, the implementation uses a "poisoned" object to mark the end of the execution. This is necessary because the various threads will be locked waiting for new elements in the queue. Then, valueless objects are placed so that the threads are "woken up" and then they verify that they actually received an empty object, then finishing the execution.

Another detail of implementation is that the ArrayBlockingQueue will not allow more elements than the informed capacity in the manufacturer. This means that a producer may be blocked when trying to insert an item in the queue if it is full until a consumer removes elements from the queue. The advantage is that memory usage will remain relatively constant throughout the run.

However, depending on the tree balance, it may be that many nodes are inserted and few are removed and the execution ends in a Dead lock. If this is the case, replace with LinkedBlockingDeque solves the issue, although there is greater use of memory.

Version with LinkedBlockingDeque

public class TreeConsumerExample {

    //cria mapa de atributos a partir dos parâmetros: chave, valor, chave, valor...
    Map<String, Object> mapOf(Object... args) {
        Map<String, Object> m = new HashMap<>();
        for (int i = 0; i < args.length; i += 2) {
            m.put((String) args[i], args[i+1]);
        }
        return m;
    }

    final Map<String, Map<String, Object>> arvore = new HashMap<>();
    final LinkedBlockingDeque<ElementoArvore> fila = new LinkedBlockingDeque<>();
    final AtomicLong ultimoIdInseridoNoBanco = new AtomicLong();
    final AtomicLong quantidadeElementosProcessados = new AtomicLong();

    //inicia arvore
    TreeConsumerExample() {
        arvore.put("v=raiz", mapOf("nome_atributo_1", "valor_1", "nome_atributo_2", "valor_2"));
        arvore.put("v=raiz,v=elemento_1", mapOf("nome_atributo_3", "valor_3"));
        arvore.put("v=raiz,v=elemento_2", mapOf("nome_atributo_4", "valor_4"));
        arvore.put("v=raiz,v=elemento_1,v=elemento_3", mapOf("nome_atributo_5", "valor_5", "nome_atributo_6", "valor_6", "nome_atributo_7", "valor_7"));
    }

    //armazena informacoes de um elemento da árvore na fila
    static class ElementoArvore {
        final String caminhoNo;
        final Map<String, Object> valores;
        final Long idPai;
        public ElementoArvore(String caminhoNo, Map<String, Object> valores, Long idPai) {
            this.caminhoNo = caminhoNo;
            this.valores = valores;
            this.idPai = idPai;
        }
        public Map<String, Object> getValores() {
            return valores;
        }
        public Long getIdPai() {
            return idPai;
        }
        public String getCaminhoNo() {
            return caminhoNo;
        }
    }

    //persiste um elemento e retorna o novo ID
    Long persisteElemento(ElementoArvore elemento) {
        Long novoId = ultimoIdInseridoNoBanco.incrementAndGet();
        System.out.printf("Persistindo %s | valores = %s | id pai = %d, novo id = %d%n", elemento.getCaminhoNo(),
                elemento.getValores(), elemento.getIdPai(), novoId);
        return novoId;
    }

    //recupera os filhos de um elemento a partir do caminho
    Collection<String> filhosDiretos(String caminhoNo) {
        return arvore.keySet().stream()
                .filter(k -> k.length() > caminhoNo.length() && k.startsWith(caminhoNo) && k.substring(caminhoNo.length() + 1).indexOf(',')  < 0)
                .collect(Collectors.toList());
    }

    //thread que retira um elemento da fila, processa e adiciona os filhos
    class ConsumidorProdutor extends Thread {
        final Integer threadNumber;

        public ConsumidorProdutor(Integer threadNumber) {
            this.threadNumber = threadNumber;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    //recupera elemento da fila para processar
                    final ElementoArvore e = fila.take();

                    //verifica poison object
                    if (e.getCaminhoNo() == null) {
                        //se null, fim da fila, recoloca o elemento para acordar outras threads e finaliza a thread atual
                        fila.add(e);
                        System.out.printf("Fim da fila sinalizado. Finalizando thread %d%n", threadNumber);
                        break;
                    }

                    //procesa elemento
                    System.out.printf("Processando elemento %s na thread %d%n", e.getCaminhoNo(), threadNumber);
                    final Long novoId = persisteElemento(e);

                    //insere filhos para processamento usando o ID inserido
                    fila.addAll(filhosDiretos(e.getCaminhoNo()).stream()
                            .map(caminhoFilho -> new ElementoArvore(caminhoFilho, arvore.get(caminhoFilho), novoId))
                            .collect(Collectors.toList()));

                    //verifica final do processamento
                    if (quantidadeElementosProcessados.incrementAndGet() >= arvore.size()) {
                        //elemento demarcando fim do processamento
                        //isso faz com que as threads sejam "acordadas" e terminem
                        fila.add(new ElementoArvore(null, null, null));
                        System.out.printf("Fim da fila encontrado. Finalizando thread %d%n", threadNumber);
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    void escreveArvore(String caminhoNo) {
        //cria pool de threads
        final ExecutorService threadPool = Executors.newFixedThreadPool(8);

        for (int i = 0; i < 8; i++) {
            threadPool.execute(new ConsumidorProdutor(i+1));
        }

        //coloca o elemento raiz na fila para iniciar o processamento
        try {
            fila.put(new ElementoArvore(caminhoNo, arvore.get(caminhoNo), null));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        //aguarda término
            threadPool.shutdown();

    }

    public static void main(String[] args) {
        new TreeConsumerExample().escreveArvore("v=raiz");
    }
}

Considerations

This solution depends on whether it is possible to quickly read the direct children of a node, so that it is not possible for a child to be found before the parent.

How much this implementation can generate gain (if not end up getting worse) is not possible to say.

What I can theorize is that the gain will be proportional to the number of children the knots usually have in a tree. If the nodes have, on average, less than two nodes, the execution will be practically serial. Although without the overhead of a very large stack of running, still there will be time spent synchronizing the queue.

Another possible solution: change in the structure of reading data

In fact, a set of solutions can emerge if it is possible to change the reading modeling. This vaguely resembles a old question about tree modeling in databases, but in a different sense.

In this scenario, each node would have a depth field p so that the reading of the tree could be done ordered by the level (level-order), where p('v=raiz')=1, p('v=raiz,v=elemento_1')=2 and so on. The amount of commas (plus one) would equal the depth.

One of the algorithm possibilities is the following.

Be:

  • p the depth of a knot in the tree
  • n a knot
  • q amount of nodes on one level p
  • b a block of nodes on a level p, equal division from quantity q
  • l(p,b) the list of nodes of a block b, at a depth p
  • k the key to a knot
  • m(p,n)=k: a map that stores a key k dates a depth p and a knot n
  • n.p a parent knot of the current knot

An algorithm could be:

  1. Do p=1 (root)
  2. Count us n where p(n)=1, finalize whether q=0
  3. Split quantity q of knots in blocks and, for each block b, process in parallel as follows:
    1. Get list l(p,b), whose nodes are deep p and belonging to the block b
    2. Process each node n on the list:
    3. Recover Dad’s key by accessing m(p-1,n.p), if there is no root it is
    4. Write to the database
    5. Retrieve primary key k
    6. Saving key by making m(p,n)=k
  4. Wait for parallel block processing
  5. Remove parent nodes from map, m(p-1,*), because they will no longer be needed
  6. Increase depth p++
  7. Back to step 2

The basic idea is to process the tree levels in sequence, since each level depends on the previous one, but parallelizing the processing of the nodes of each level.

The levels already processed can be discarded, except the last (p-1) containing information from parents at the current level.

In order to be able to get the nodes by level, the method in the Teri interface would be something like this:

Map<String, Map<String, Object>> leNos(int nivelProfundidade, int bloco, int tamanhoBloco);

Considerations

The above algorithm is much more complex and only works well if there are enough nodes per tree level.

It only works if it is possible to change the interface and the reading structure.

It would be possible to modify and optimize the algorithm even depending on what types of modifications are possible in the database.

One more possible solution: predetermined relationship

The basic idea is to be able to reference a node in the target database without having to record data in the database.

This seems to me the simplest solution from the point of view of an algorithm, although viability depends on some factors.

I imagine that in the current implementation, each node has a foreign key for the parent, whose value is the primary key of the parent, which in turn is generated by entering the record.

On the other hand, if we changed the bank structure so that the foreign key of the record was something like the node path on the map (or maybe a hash of it), we could then insert a child before the father because the foreign key would be easily determined independently.

Of course, restrictions such as foreign-key referential integrity need to be relaxed in order not to force the existence of the referenced record.

The method persisteElemento would record this information derived from caminhoNo, for example:

  • To the root 'v=raiz', the value of the parent node would be null and the node identifier would be the way 'v=raiz'.
  • For the element 'v=raiz,v=elemento_1', the value of the parent node would be v=raiz, i.e., the path of this node minus the one that has the last comma ahead. The identifier of this node would be 'v=raiz,v=elemento_1'.
  • All the children of the knot 'v=raiz,v=elemento_1' would have as parent knot the value 'v=raiz,v=elemento_1'.

As I mentioned, the above values can be replaced, perhaps, by hash code to save space, because occupying more space is one of the biggest impacts, if not the biggest, of this approach.

One last possible solution: waiting list

Another approach would be to use a waiting list to leave nodes temporarily orphaned.

Well, actually, not just a single list, but a list by parent node that is missing.

Whenever a node does not have the parent, it will be added to the parent-related waiting list.

When any node is inserted into the bank, it is then checked if there is a waiting list for it. If it exists, the queued nodes are added again to the processing list or processed immediately as desired.

A simplified implementation would be:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;

public class WaitProducerExample {

    //exemplo de sequência de leitura
    final List<String> leituraFake = Arrays.asList("v=raiz,v=elemento_1,v=elemento_3", "v=raiz", "v=raiz,v=elemento_1");

    //fila de processamento
    final LinkedBlockingDeque<String> fila = new LinkedBlockingDeque<>();

    //lista de nós pendentes, key -> nó pai, value -> nós aguardando leitura
    final ConcurrentHashMap<String, List<String>> listaDeEspera = new ConcurrentHashMap<>();

    //chaves dos nós inseridos
    final ConcurrentHashMap<String, Long> inseridos = new ConcurrentHashMap<>();

    //sequência de ids fake
    final AtomicLong ultimoIdInseridoNoBanco = new AtomicLong();

    //persiste um elemento e retorna o novo ID
    Long persisteElemento(String caminhoNo, Long idPai) {
        final Long novoId = ultimoIdInseridoNoBanco.incrementAndGet();
        System.out.printf("Persistindo %s | id pai = %d | novo id = %d%n", caminhoNo, idPai, novoId);
        return novoId;
    }

    //recupera o caminho do pai, dado o caminho do filho
    String getPai(String caminhoNo) {
        final int v = caminhoNo.lastIndexOf(',');
        return v > 0 ? caminhoNo.substring(0, v) : "";
    }

    void escreveArvore() {
        //faz uma leitura fake de um bloco desordenado de nós
        fila.addAll(leituraFake);

        //percorre fila até acabarem os elementos
        while (!fila.isEmpty()) {
            try {
                //caminho do nó
                final String no = fila.take();
                //caminho do pai
                final String pai = getPai(no);

                System.out.printf("Processando nó %s, pai = %s%n", no, pai);

                //verifica se o pai já foi inserido
                //sincronizações separadas são necessárias porque em uma thread o caminho do nó pai é usado para guardar
                //o nó atual na lista de espera, enquanto em outra thread o nó atual funciona como pai, consomindo a lista de espera
                final Long idPai = inseridos.get(pai);
                if (pai.isEmpty() || idPai != null) {
                    //se for raiz (caminho do pai vazio) ou se o pai foi encontrado, persiste o elemento e salva o ID inserido
                    inseridos.put(no, persisteElemento(no, idPai));
                    //sincroniza o nó para evitar que outra thread possa adicionar elementos na espera enquanto estiver consumindo
                    synchronized (no) {
                        //verifica se tinha alguém esperando
                        List<String> filhosEsperando = listaDeEspera.remove(no);
                        if (filhosEsperando == null) {
                            System.out.printf("Nenhum filho aguardando...%n");
                        } else {
                            //coloca de volta na fila principal (poderia processar imediatamente, mas acabaria em um algoritmo recursivo)
                            System.out.printf("Encontrado(s) %d filho(s) aguardando que já podem voltar para a fila: %s%n", filhosEsperando.size(), filhosEsperando);
                            fila.addAll(filhosEsperando);
                        }
                    }
                } else {
                    //senão coloca o nó na lista de espera associada com o caminho do pai
                    //sincroniza no pai para evitar que elementos sejam adicionados na lista de espera enquanto ela está sendo consumida
                    //se o pai for adicionado em algum momento após a verificação, o elemento estaria pronto para ser inserido, portanto deve voltar para a fila
                    synchronized (pai) {
                        //verifica novamente se não foi inserido, pois em caso de concorrência o estado pode mudar de uma linha para outra
                        if (inseridos.get(pai) == null) {
                            System.out.printf("Pai não encontrado, aguardando na lista de espera...%n");
                            listaDeEspera.compute(pai, (k, v) -> {
                                if (v == null) v = new ArrayList<>();
                                v.add(no);
                                return v;
                            });
                        } else {
                            //caso o pai tenha sido inserido entre a verificação acima e o bloco sincronizado, recoloca na fila normal ao invés da lista de espera
                            fila.add(no);
                        }
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        //verifica se nada ficou esquecida na espera
        if (!listaDeEspera.isEmpty()) {
            System.out.printf("Ops... você esqueceu de alguma coisa, não?!%n");
        }
    }

    public static void main(String[] args) {
        new WaitProducerExample().escreveArvore();
    }
}

Remarks

The above implementation does not use threads, but simulates inserting nodes with child before the parent.

Even not using threads, I put synchronization control that must account for competition issues, such as adding and removing items from the waiting list at the same time. This is because a thread can find an orphaned node and try to insert it into the waiting list while another thread can find the parent and try to drain the elements from the list. The object to be synchronized must be the parent in the first case and the node itself in the second, because in the first we are using the absent parent as reference and in the second the found node would be the parent of the orphan node. Complicated? Don’t tell me.

If someone makes use of this code, I strongly suggest to review the synchronization because I have not done extensive tests and something may have escaped.

  • Hi Luiz, thank you, as always, for the excellent answer. If I can bore you a little longer, my main problem is that the implementation of filhosDiretos assumes that the entire tree is in memory, returning to the initial problem of OutOfMemoryExceptions. What I would like in this producer/consumer algorithm was to have something more push, where the producer pushes the nodes already read. If the consumer cannot persist certain knot as one of his parents was not?

  • @Anthonyaccioly Hum... I had thought about the possibility that the actual implementation would do some sort of query to get the children and so would not incur memory problems. On the other hand, I understand from what you said about pushing the read nodes, that reading the nodes would not be random, but serial. Is that right? Would the idea be to keep the knot and persist as soon as the father is found? Or would it be the idea to seek out parent nodes on demand? Anyway, I think the question for me is: what is the read/write interface used to access the nodes?

  • I had thought to block the reading according to the issue code (this worked very well for reading and transformation). This means that in a Producer / Consumer model we would only have a partial tree per block at the time of writing. The idea is reading -> division by blocks -> n "threads" reading and processing each block -> blocking Queue -> n " threads" receiving and writing the blocks. The ideal is to persist and remove the nodes from memory as soon as possible. But I can’t be doing too many unnecessary writing operations in the bank.

  • Both "persist as soon as the father is found" and "seek out parent nodes on demand" (in fact, waiting until the block in which the father was persisted ends by some kind of notification) are valid ideas. The read and write interface is just like I said. Only what accesses the database is the method persisteElemento, and only the methods leArvore and leFilhos access the external interface (I will update the question), everything else is in memory.

  • @Anthonyaccioly I think I still don’t understand, given the current structure, how a child node could be processed before the parent node. If possible, enter an example in the question where this occurs.

  • Let’s say elemento_1 is in the bloco 1 and elemento_3 is in the bloco 2. If reading the external interface / transformation of the bloco 2 end before the bloco 1, in a strategy naive where we always persist any element o elemento_3 would be persisted before the elemento_1 (what is not possible). That’s why today we wait for all the reading/processing and keep the whole tree in memory before writing in the database (which, again, I’m trying to avoid due to memory consumption). The subtlety of the thing is that both memory and Writes should be minimised.

Show 1 more comment

Browser other questions tagged

You are not signed in. Login or sign up in order to post.