Parallelism challenge
The first thing to keep in mind when thinking of parallelism is the dependence on each task that potentially can be performed in parallel.
Of course, not everything is black and white when we talk about parallelism and competition, but in this particular case I think it makes a crucial difference, since the dependence of the parent node to persist a child node is a big part of the problem.
The technique of recording a record dummy was very interesting, but it does not solve the problem because technically it prolongs the time of the task while the dependency is not satisfied. It could work in other scenarios, however, perhaps where the cost of access to the bank was lower than processing.
Possible solution: Producer x Consumer
Theoretical ramblings aside, a method I could think of to satisfy the dependencies and maintain a reasonable degree of parallelization is a specific application of the Producer x Consumer standard.
In this implementation, the same component plays the role of consumer and producer:
- First it consumes a given element
E
queued.
- Processes
E
, in this case, by inserting it into the database, obtaining its ID and thus resolving the dependencies of its children.
- Finally, the component plays the role of producer and places the children of
E
queued to be processed in parallel.
This means that the root of the tree will be processed without parallelism, but from there all the children can potentially be processed in parallel, as well as the children of the children until virtually all the "leaf" elements of the tree can be parallelized.
An interesting detail is that, in the role of consumer, this implementation only needs to know the tree partially, that is, the node being processed. Whereas the producer only needs to know his children. Really not be the impact of this on a large-scale execution, but in theory it is not necessary to store in memory what is not being processed.
Example implementation
I made a basic example implementation using the available information and I believe it is possible to adapt it according to the needs of the project.
See below:
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class TreeConsumerExample {
//cria mapa de atributos a partir dos parâmetros: chave, valor, chave, valor...
Map<String, Object> mapOf(Object... args) {
Map<String, Object> m = new HashMap<>();
for (int i = 0; i < args.length; i += 2) {
m.put((String) args[i], args[i+1]);
}
return m;
}
final Map<String, Map<String, Object>> arvore = new HashMap<>();
final ArrayBlockingQueue<ElementoArvore> fila = new ArrayBlockingQueue<>(64);
final AtomicLong ultimoIdInseridoNoBanco = new AtomicLong();
final AtomicLong quantidadeElementosProcessados = new AtomicLong();
//inicia arvore
TreeConsumerExample() {
arvore.put("v=raiz", mapOf("nome_atributo_1", "valor_1", "nome_atributo_2", "valor_2"));
arvore.put("v=raiz,v=elemento_1", mapOf("nome_atributo_3", "valor_3"));
arvore.put("v=raiz,v=elemento_2", mapOf("nome_atributo_4", "valor_4"));
arvore.put("v=raiz,v=elemento_1,v=elemento_3", mapOf("nome_atributo_5", "valor_5", "nome_atributo_6", "valor_6", "nome_atributo_7", "valor_7"));
}
//armazena informacoes de um elemento da árvore na fila
static class ElementoArvore {
final String caminhoNo;
final Map<String, Object> valores;
final Long idPai;
public ElementoArvore(String caminhoNo, Map<String, Object> valores, Long idPai) {
this.caminhoNo = caminhoNo;
this.valores = valores;
this.idPai = idPai;
}
public Map<String, Object> getValores() {
return valores;
}
public Long getIdPai() {
return idPai;
}
public String getCaminhoNo() {
return caminhoNo;
}
}
//persiste um elemento e retorna o novo ID
Long persisteElemento(ElementoArvore elemento) {
Long novoId = ultimoIdInseridoNoBanco.incrementAndGet();
System.out.printf("Persistindo %s | valores = %s | id pai = %d, novo id = %d%n", elemento.getCaminhoNo(),
elemento.getValores(), elemento.getIdPai(), novoId);
return novoId;
}
//recupera os filhos de um elemento a partir do caminho
Collection<String> filhosDiretos(String caminhoNo) {
return arvore.keySet().stream()
.filter(k -> k.length() > caminhoNo.length() && k.startsWith(caminhoNo) && k.substring(caminhoNo.length() + 1).indexOf(',') < 0)
.collect(Collectors.toList());
}
//thread que retira um elemento da fila, processa e adiciona os filhos
class ConsumidorProdutor extends Thread {
final Integer threadNumber;
public ConsumidorProdutor(Integer threadNumber) {
this.threadNumber = threadNumber;
}
@Override
public void run() {
try {
while (true) {
//recupera elemento da fila para processar
final ElementoArvore e = fila.take();
//verifica poison object
if (e.getCaminhoNo() == null) {
//se null, fim da fila, recoloca o elemento para acordar outras threads e finaliza a thread atual
fila.put(e);
System.out.printf("Fim da fila sinalizado. Finalizando thread %d%n", threadNumber);
break;
}
//procesa elemento
System.out.printf("Processando elemento %s na thread %d%n", e.getCaminhoNo(), threadNumber);
final Long novoId = persisteElemento(e);
//insere filhos para processamento usando o ID inserido
for (String caminhoFilho : filhosDiretos(e.getCaminhoNo())) {
fila.put(new ElementoArvore(caminhoFilho, arvore.get(caminhoFilho), novoId));
}
//verifica final do processamento
if (quantidadeElementosProcessados.incrementAndGet() >= arvore.size()) {
//elemento demarcando fim do processamento
//isso faz com que as threads sejam "acordadas" e terminem
fila.put(new ElementoArvore(null, null, null));
System.out.printf("Fim da fila encontrado. Finalizando thread %d%n", threadNumber);
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
void escreveArvore(String caminhoNo) {
//cria pool de threads
final ExecutorService threadPool = Executors.newFixedThreadPool(8);
for (int i = 0; i < 8; i++) {
threadPool.execute(new ConsumidorProdutor(i+1));
}
//coloca o elemento raiz na fila para iniciar o processamento
try {
fila.put(new ElementoArvore(caminhoNo, arvore.get(caminhoNo), null));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//aguarda término
threadPool.shutdown();
}
public static void main(String[] args) {
new TreeConsumerExample().escreveArvore("v=raiz");
}
}
To detect the end of the processing, I check the item counter processed with the tree size. This should be improved, mainly because it only works if the whole tree is covered.
In addition, the implementation uses a "poisoned" object to mark the end of the execution. This is necessary because the various threads will be locked waiting for new elements in the queue. Then, valueless objects are placed so that the threads are "woken up" and then they verify that they actually received an empty object, then finishing the execution.
Another detail of implementation is that the ArrayBlockingQueue
will not allow more elements than the informed capacity in the manufacturer. This means that a producer may be blocked when trying to insert an item in the queue if it is full until a consumer removes elements from the queue. The advantage is that memory usage will remain relatively constant throughout the run.
However, depending on the tree balance, it may be that many nodes are inserted and few are removed and the execution ends in a Dead lock. If this is the case, replace with LinkedBlockingDeque
solves the issue, although there is greater use of memory.
Version with LinkedBlockingDeque
public class TreeConsumerExample {
//cria mapa de atributos a partir dos parâmetros: chave, valor, chave, valor...
Map<String, Object> mapOf(Object... args) {
Map<String, Object> m = new HashMap<>();
for (int i = 0; i < args.length; i += 2) {
m.put((String) args[i], args[i+1]);
}
return m;
}
final Map<String, Map<String, Object>> arvore = new HashMap<>();
final LinkedBlockingDeque<ElementoArvore> fila = new LinkedBlockingDeque<>();
final AtomicLong ultimoIdInseridoNoBanco = new AtomicLong();
final AtomicLong quantidadeElementosProcessados = new AtomicLong();
//inicia arvore
TreeConsumerExample() {
arvore.put("v=raiz", mapOf("nome_atributo_1", "valor_1", "nome_atributo_2", "valor_2"));
arvore.put("v=raiz,v=elemento_1", mapOf("nome_atributo_3", "valor_3"));
arvore.put("v=raiz,v=elemento_2", mapOf("nome_atributo_4", "valor_4"));
arvore.put("v=raiz,v=elemento_1,v=elemento_3", mapOf("nome_atributo_5", "valor_5", "nome_atributo_6", "valor_6", "nome_atributo_7", "valor_7"));
}
//armazena informacoes de um elemento da árvore na fila
static class ElementoArvore {
final String caminhoNo;
final Map<String, Object> valores;
final Long idPai;
public ElementoArvore(String caminhoNo, Map<String, Object> valores, Long idPai) {
this.caminhoNo = caminhoNo;
this.valores = valores;
this.idPai = idPai;
}
public Map<String, Object> getValores() {
return valores;
}
public Long getIdPai() {
return idPai;
}
public String getCaminhoNo() {
return caminhoNo;
}
}
//persiste um elemento e retorna o novo ID
Long persisteElemento(ElementoArvore elemento) {
Long novoId = ultimoIdInseridoNoBanco.incrementAndGet();
System.out.printf("Persistindo %s | valores = %s | id pai = %d, novo id = %d%n", elemento.getCaminhoNo(),
elemento.getValores(), elemento.getIdPai(), novoId);
return novoId;
}
//recupera os filhos de um elemento a partir do caminho
Collection<String> filhosDiretos(String caminhoNo) {
return arvore.keySet().stream()
.filter(k -> k.length() > caminhoNo.length() && k.startsWith(caminhoNo) && k.substring(caminhoNo.length() + 1).indexOf(',') < 0)
.collect(Collectors.toList());
}
//thread que retira um elemento da fila, processa e adiciona os filhos
class ConsumidorProdutor extends Thread {
final Integer threadNumber;
public ConsumidorProdutor(Integer threadNumber) {
this.threadNumber = threadNumber;
}
@Override
public void run() {
try {
while (true) {
//recupera elemento da fila para processar
final ElementoArvore e = fila.take();
//verifica poison object
if (e.getCaminhoNo() == null) {
//se null, fim da fila, recoloca o elemento para acordar outras threads e finaliza a thread atual
fila.add(e);
System.out.printf("Fim da fila sinalizado. Finalizando thread %d%n", threadNumber);
break;
}
//procesa elemento
System.out.printf("Processando elemento %s na thread %d%n", e.getCaminhoNo(), threadNumber);
final Long novoId = persisteElemento(e);
//insere filhos para processamento usando o ID inserido
fila.addAll(filhosDiretos(e.getCaminhoNo()).stream()
.map(caminhoFilho -> new ElementoArvore(caminhoFilho, arvore.get(caminhoFilho), novoId))
.collect(Collectors.toList()));
//verifica final do processamento
if (quantidadeElementosProcessados.incrementAndGet() >= arvore.size()) {
//elemento demarcando fim do processamento
//isso faz com que as threads sejam "acordadas" e terminem
fila.add(new ElementoArvore(null, null, null));
System.out.printf("Fim da fila encontrado. Finalizando thread %d%n", threadNumber);
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
void escreveArvore(String caminhoNo) {
//cria pool de threads
final ExecutorService threadPool = Executors.newFixedThreadPool(8);
for (int i = 0; i < 8; i++) {
threadPool.execute(new ConsumidorProdutor(i+1));
}
//coloca o elemento raiz na fila para iniciar o processamento
try {
fila.put(new ElementoArvore(caminhoNo, arvore.get(caminhoNo), null));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//aguarda término
threadPool.shutdown();
}
public static void main(String[] args) {
new TreeConsumerExample().escreveArvore("v=raiz");
}
}
Considerations
This solution depends on whether it is possible to quickly read the direct children of a node, so that it is not possible for a child to be found before the parent.
How much this implementation can generate gain (if not end up getting worse) is not possible to say.
What I can theorize is that the gain will be proportional to the number of children the knots usually have in a tree. If the nodes have, on average, less than two nodes, the execution will be practically serial. Although without the overhead of a very large stack of running, still there will be time spent synchronizing the queue.
Another possible solution: change in the structure of reading data
In fact, a set of solutions can emerge if it is possible to change the reading modeling. This vaguely resembles a old question about tree modeling in databases, but in a different sense.
In this scenario, each node would have a depth field p
so that the reading of the tree could be done ordered by the level (level-order), where p('v=raiz')=1
, p('v=raiz,v=elemento_1')=2
and so on. The amount of commas (plus one) would equal the depth.
One of the algorithm possibilities is the following.
Be:
p
the depth of a knot in the tree
n
a knot
q
amount of nodes on one level p
b
a block of nodes on a level p
, equal division from quantity q
l(p,b)
the list of nodes of a block b
, at a depth p
k
the key to a knot
m(p,n)=k
: a map that stores a key k
dates a depth p
and a knot n
n.p
a parent knot of the current knot
An algorithm could be:
- Do
p=1
(root)
- Count us
n
where p(n)=1
, finalize whether q=0
- Split quantity
q
of knots in blocks and, for each block b
, process in parallel as follows:
- Get list
l(p,b)
, whose nodes are deep p
and belonging to the block b
- Process each node
n
on the list:
- Recover Dad’s key by accessing
m(p-1,n.p)
, if there is no root it is
- Write to the database
- Retrieve primary key
k
- Saving key by making
m(p,n)=k
- Wait for parallel block processing
- Remove parent nodes from map,
m(p-1,*)
, because they will no longer be needed
- Increase depth
p++
- Back to step 2
The basic idea is to process the tree levels in sequence, since each level depends on the previous one, but parallelizing the processing of the nodes of each level.
The levels already processed can be discarded, except the last (p-1
) containing information from parents at the current level.
In order to be able to get the nodes by level, the method in the Teri interface would be something like this:
Map<String, Map<String, Object>> leNos(int nivelProfundidade, int bloco, int tamanhoBloco);
Considerations
The above algorithm is much more complex and only works well if there are enough nodes per tree level.
It only works if it is possible to change the interface and the reading structure.
It would be possible to modify and optimize the algorithm even depending on what types of modifications are possible in the database.
One more possible solution: predetermined relationship
The basic idea is to be able to reference a node in the target database without having to record data in the database.
This seems to me the simplest solution from the point of view of an algorithm, although viability depends on some factors.
I imagine that in the current implementation, each node has a foreign key for the parent, whose value is the primary key of the parent, which in turn is generated by entering the record.
On the other hand, if we changed the bank structure so that the foreign key of the record was something like the node path on the map (or maybe a hash of it), we could then insert a child before the father because the foreign key would be easily determined independently.
Of course, restrictions such as foreign-key referential integrity need to be relaxed in order not to force the existence of the referenced record.
The method persisteElemento
would record this information derived from caminhoNo
, for example:
- To the root
'v=raiz'
, the value of the parent node would be null
and the node identifier would be the way 'v=raiz'
.
- For the element
'v=raiz,v=elemento_1'
, the value of the parent node would be v=raiz
, i.e., the path of this node minus the one that has the last comma ahead. The identifier of this node would be 'v=raiz,v=elemento_1'
.
- All the children of the knot
'v=raiz,v=elemento_1'
would have as parent knot the value 'v=raiz,v=elemento_1'
.
As I mentioned, the above values can be replaced, perhaps, by hash code to save space, because occupying more space is one of the biggest impacts, if not the biggest, of this approach.
One last possible solution: waiting list
Another approach would be to use a waiting list to leave nodes temporarily orphaned.
Well, actually, not just a single list, but a list by parent node that is missing.
Whenever a node does not have the parent, it will be added to the parent-related waiting list.
When any node is inserted into the bank, it is then checked if there is a waiting list for it. If it exists, the queued nodes are added again to the processing list or processed immediately as desired.
A simplified implementation would be:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
public class WaitProducerExample {
//exemplo de sequência de leitura
final List<String> leituraFake = Arrays.asList("v=raiz,v=elemento_1,v=elemento_3", "v=raiz", "v=raiz,v=elemento_1");
//fila de processamento
final LinkedBlockingDeque<String> fila = new LinkedBlockingDeque<>();
//lista de nós pendentes, key -> nó pai, value -> nós aguardando leitura
final ConcurrentHashMap<String, List<String>> listaDeEspera = new ConcurrentHashMap<>();
//chaves dos nós inseridos
final ConcurrentHashMap<String, Long> inseridos = new ConcurrentHashMap<>();
//sequência de ids fake
final AtomicLong ultimoIdInseridoNoBanco = new AtomicLong();
//persiste um elemento e retorna o novo ID
Long persisteElemento(String caminhoNo, Long idPai) {
final Long novoId = ultimoIdInseridoNoBanco.incrementAndGet();
System.out.printf("Persistindo %s | id pai = %d | novo id = %d%n", caminhoNo, idPai, novoId);
return novoId;
}
//recupera o caminho do pai, dado o caminho do filho
String getPai(String caminhoNo) {
final int v = caminhoNo.lastIndexOf(',');
return v > 0 ? caminhoNo.substring(0, v) : "";
}
void escreveArvore() {
//faz uma leitura fake de um bloco desordenado de nós
fila.addAll(leituraFake);
//percorre fila até acabarem os elementos
while (!fila.isEmpty()) {
try {
//caminho do nó
final String no = fila.take();
//caminho do pai
final String pai = getPai(no);
System.out.printf("Processando nó %s, pai = %s%n", no, pai);
//verifica se o pai já foi inserido
//sincronizações separadas são necessárias porque em uma thread o caminho do nó pai é usado para guardar
//o nó atual na lista de espera, enquanto em outra thread o nó atual funciona como pai, consomindo a lista de espera
final Long idPai = inseridos.get(pai);
if (pai.isEmpty() || idPai != null) {
//se for raiz (caminho do pai vazio) ou se o pai foi encontrado, persiste o elemento e salva o ID inserido
inseridos.put(no, persisteElemento(no, idPai));
//sincroniza o nó para evitar que outra thread possa adicionar elementos na espera enquanto estiver consumindo
synchronized (no) {
//verifica se tinha alguém esperando
List<String> filhosEsperando = listaDeEspera.remove(no);
if (filhosEsperando == null) {
System.out.printf("Nenhum filho aguardando...%n");
} else {
//coloca de volta na fila principal (poderia processar imediatamente, mas acabaria em um algoritmo recursivo)
System.out.printf("Encontrado(s) %d filho(s) aguardando que já podem voltar para a fila: %s%n", filhosEsperando.size(), filhosEsperando);
fila.addAll(filhosEsperando);
}
}
} else {
//senão coloca o nó na lista de espera associada com o caminho do pai
//sincroniza no pai para evitar que elementos sejam adicionados na lista de espera enquanto ela está sendo consumida
//se o pai for adicionado em algum momento após a verificação, o elemento estaria pronto para ser inserido, portanto deve voltar para a fila
synchronized (pai) {
//verifica novamente se não foi inserido, pois em caso de concorrência o estado pode mudar de uma linha para outra
if (inseridos.get(pai) == null) {
System.out.printf("Pai não encontrado, aguardando na lista de espera...%n");
listaDeEspera.compute(pai, (k, v) -> {
if (v == null) v = new ArrayList<>();
v.add(no);
return v;
});
} else {
//caso o pai tenha sido inserido entre a verificação acima e o bloco sincronizado, recoloca na fila normal ao invés da lista de espera
fila.add(no);
}
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//verifica se nada ficou esquecida na espera
if (!listaDeEspera.isEmpty()) {
System.out.printf("Ops... você esqueceu de alguma coisa, não?!%n");
}
}
public static void main(String[] args) {
new WaitProducerExample().escreveArvore();
}
}
Remarks
The above implementation does not use threads, but simulates inserting nodes with child before the parent.
Even not using threads, I put synchronization control that must account for competition issues, such as adding and removing items from the waiting list at the same time. This is because a thread can find an orphaned node and try to insert it into the waiting list while another thread can find the parent and try to drain the elements from the list. The object to be synchronized must be the parent in the first case and the node itself in the second, because in the first we are using the absent parent as reference and in the second the found node would be the parent of the orphan node. Complicated? Don’t tell me.
If someone makes use of this code, I strongly suggest to review the synchronization because I have not done extensive tests and something may have escaped.
Look, the tip I’m about to give you has nothing to do with your doubt, but it’s something a friend did in a doctoral problem and he told me that the speed he achieved is orders of greater magnitude. What he did was to use a ramdisk (a drive that is mapped to RAM, validated on Linux). If you have enough RAM, you can run your algorithm in an intermediate bank that is all in RAM. At the end, you record the data in the official database. Obviously, you need to have enough RAM, it has 64 GB and spent 40 GB. Basically, put a whole Sqlite in memory RAM.
– cantoni
Obviously, my comment above only helps in the persisting part of the bank, since the speed of this part will increase. This does not help the Outofmemoryexception that happens when processing the tree.
– cantoni