Well, you didn’t say what you use as class T
nor what are those functions that operate on it to produce String
s. Then I will invent a class for this:
class Colorido {
private void complica() {
try {
Thread.sleep((int) (Math.random() * 50));
} catch (InterruptedException e) {
// Ignora.
}
}
public String getVermelho() { complica(); return "vermelho"; }
public String getLaranja() { complica(); return "laranja"; }
public String getAmarelo() { complica(); return "amarelo"; }
public String getLima() { complica(); return "lima"; }
public String getVerde() { complica(); return "verde"; }
public String getCiano() { complica(); return "ciano"; }
public String getAzul() { complica(); return "azul"; }
public String getVioleta() { complica(); return "violeta"; }
public String getRosa() { complica(); return "rosa"; }
public String getRoxo() { complica(); return "roxo"; }
public String getBranco() { complica(); return "branco"; }
public String getPreto() { complica(); return "preto"; }
public String getMarrom() { complica(); return "marrom"; }
public String getOliva() { complica(); return "oliva"; }
public String getBege() { complica(); return "bege"; }
public String getCinza() { complica(); return "cinza"; }
public static List<Function<Colorido, String>> funcoes() {
return Arrays.asList(
Colorido::getVermelho, Colorido::getLaranja, Colorido::getAmarelo, Colorido::getLima,
Colorido::getVerde, Colorido::getCiano, Colorido::getAzul, Colorido::getVioleta,
Colorido::getRosa, Colorido::getRoxo, Colorido::getBranco, Colorido::getPreto,
Colorido::getMarrom, Colorido::getOliva, Colorido::getBege, Colorido::getCinza
);
}
}
This is certainly a very silly class. However, note that the method funcoes
will return a list with 16 functions that operate on the object. Each function returns the name of a different color, but all of them will take some time to do this thanks to Thread.sleep
random time within the method complica
. After all, if we want to test parallelism, it is important that threads do not run too quickly to prevent one from starting when the other is done.
So, let’s test the map-reduce you put in the question:
public static <T> List<String> mapReduceSequencial(
List<Function<T, String>> funcoes, // inicializa com as diversas funções
T objetoSobProcessamento) // parâmetro arbitrário
{
List<String> retornoProcessamento = funcoes
.stream()
.map(f -> f.apply(objetoSobProcessamento))
.reduce(new ArrayList<>(), (listaAcumulada, novoValor) -> {
listaAcumulada.add(novoValor);
return listaAcumulada;
}, (l1, l2) -> {
l1.addAll(l2);
return l1;
});
return Collections.unmodifiableList(retornoProcessamento);
}
He is summoned thus:
System.out.println(mapReduceSequencial(Colorido.funcoes(), new Colorido()));
Here’s the way out (I added the line break just to make it easier to read):
[vermelho, laranja, amarelo, lima, verde, ciano, azul, violeta,
rosa, roxo, branco, preto, marrom, oliva, bege, cinza]
Note that the order of colors is the same as in the list.
And if we change that .stream()
for .parallelStream()
, what happens? In this case, the result will vary because of the Thread.sleep
random and because of the inherent Indeterminism of seeing multiple simultaneous threads, but in one test I did the result was this:
[verde, rosa, roxo, ciano, azul, branco, violeta, preto,
vermelho, laranja, amarelo, marrom, oliva, lima, verde, rosa,
roxo, ciano, azul, branco, violeta, preto, vermelho, laranja,
amarelo, marrom, oliva, lima, bege, cinza, verde, rosa,
roxo, ciano, azul, branco, violeta, preto, vermelho, laranja,
amarelo, marrom, oliva, lima, verde, rosa, roxo, ciano,
azul, branco, violeta, preto, vermelho, laranja, amarelo, marrom,
oliva, lima, bege, cinza, verde, rosa, roxo, ciano,
azul, branco, violeta, preto, vermelho, laranja, amarelo, marrom,
oliva, lima, verde, rosa, roxo, ciano, azul, branco,
violeta, preto, vermelho, laranja, amarelo, marrom, oliva, lima,
bege, cinza, verde, rosa, roxo, ciano, azul, branco,
violeta, preto, vermelho, laranja, amarelo, marrom, oliva, lima,
verde, rosa, roxo, ciano, azul, branco, violeta, preto,
vermelho, laranja, amarelo, marrom, oliva, lima, bege, cinza]
Well, that result is clearly very wrong. That means implementing your reduce
does not work in Stream
s parallels (but be calm and soon I’ll explain how to fix this). Note that there are several subsequences repeating in this result there.
Looking at the documentation of the method Stream<T>.reduce(U, BiFunction<U, ? super T, U>, BinaryOperator<U>)
whose parameters are called respectively identity
, accumulator
and combiner
, we found this:
This is equivalent to:
U result = identity;
for (T element : this stream)
result = accumulator.apply(result, element)
return result;
but is not constrained to execute sequentially.
Translating into Portuguese:
This is equivalent to [code] but not limited to running sequentially.
Let’s take a look at this line first:
U result = identity;
That means your ArrayList
is the first value of result
.
Now that line inside the for
:
result = accumulator.apply(result, element)
This means that from the given list, another list will be produced and based on it another list and so on until the elements are finished. It turns out that in your case the list produced is always the same, and so instead of you producing new results based on the previous partial results, you’re actually modifying the previous partial results and returning them as if they were new.
The result is that by doing this in parallel, when the combiner is used, you will have to l1
and l2
point to the same object, which is the ArrayList
, and with it, when you call l1.addAll(l2)
, you will add the list to itself, which means it will duplicate all the elements existing in it! Combine that with the fact that parallel threads are taking random/unpredictable times and the result is that mess that was produced.
Back there in the javadoc, is also written this:
This Means that for all u
, combiner(identity, u)
is Equal to u
.
Translating:
It means that for all u
, combiner(identity, u)
is equal to u
.
This means, in an algebraic way, that identity
must be the neutral element of the operation combiner
, just as 0 is the neutral element of sum and 1 is the neutral element of multiplication. Anything combined with identity
should result in the same thing.
The combiner
is your last lambda that makes the addAll
. In your two letters you change the list, and therefore you change what is considered identity
. Therefore, by invoking combiner(identity, u)
you end up having a list that is not the same as the u
was before and yes one that equates to u
folded. This means that your vanes violate the rules defined by the documentation of the reduce
by violating the neutral element rule of combiner
. Therefore, it is obvious that bad things will happen.
We may change a few things in your files to restore the behavior/contract required by reduce
:
public static <T> List<String> mapReduceParalelo2(
List<Function<T, String>> funcoes, // inicializa com as diversas funções
T objetoSobProcessamento) // parâmetro arbitrário
{
List<String> retornoProcessamento = funcoes
.parallelStream()
.map(f -> f.apply(objetoSobProcessamento))
.reduce(Collections.emptyList(), (listaAcumulada, novoValor) -> {
List<String> novaLista = listaAcumulada.isEmpty() ? new ArrayList<>() : listaAcumulada;
novaLista.add(novoValor);
return novaLista;
}, (l1, l2) -> {
if (l1.isEmpty()) return l2;
if (l2.isEmpty()) return l1;
l1.addAll(l2);
return l1;
});
return Collections.unmodifiableList(retornoProcessamento);
}
The name of the method is mapReduceParalelo2
because I’ll call what produced the messy result of mapReduceParalelo1
. In the method parReduceParalelo2
, note that the identity
is Collections.emptyList()
. I did it to emphasize that the identity
should not be changed (trying to do this would result in an exception).
The accumulator
test whether the given list is a identity
to avoid problems by creating a new one if it is not. If the given list is not a identity
, then it can be safely changed because the original list would not be reused anywhere if a new one were created.
Already the combiner
, checks whether one of the lists is a identity
, returning the other in this case. If none of them is the identity
, then it’s okay to do the addAll
instead of creating a new one because none of the original lists would be reused after.
The moral of the story is that the identity
is always reused, while the other partial results are not. If the identity
was always recreated rather than reused, the mapReduceParalelo1
functionary.
This may have gotten a little complicated because in fact the reduce
was designed to work with immutable data, at least with a identity
immutable. Although it is possible to use the reduce
for mutable things, the result is this somewhat complicated and confusing piece that came out. Already the collect
is different. The collect
was indeed designed to work with changeable things. For example:
public static <T> List<String> mapCollectParalelo(
List<Function<T, String>> funcoes, // inicializa com as diversas funções
T objetoSobProcessamento) // parâmetro arbitrário
{
List<String> retornoProcessamento = funcoes
.parallelStream()
.map(f -> f.apply(objetoSobProcessamento))
.collect(ArrayList::new, List::add, List::addAll);
return Collections.unmodifiableList(retornoProcessamento);
}
Note the simplicity of the parameters of method collect
. Note that all three parameters are method-References. The first, rather than being a fixed object to serve as identity
always being reused, is the Supplier
which is used to create new lists, in this case the constructor of ArrayList
. The second parameter (accumulator
) is what adds elements to the list, that is to say the method add
(and the result of it is void
, because the expected operation here is a mutation). The third parameter (combiner
) that combines elements is the addAll
(also void
). Note that it is used parallelStream()
. The result is what is expected (and if it were sequential, it is also):
[vermelho, laranja, amarelo, lima, verde, ciano, azul, violeta,
rosa, roxo, branco, preto, marrom, oliva, bege, cinza]
Maybe you realize that join ArrayList::new
, List::add
and List::addAll
be a very frequent and common case. Can’t you encapsulatthese three in one object? Well, that’s what the interface Collector
. And the method collect(Collector)
is the version of collect
using such an interface. To obtain a Collector
that creates the list using these three parameters, just use the method Collectors.toList()
. So you can use the collect
simply like that:
List<String> retornoProcessamento = funcoes
.parallelStream()
.map(f -> f.apply(objetoSobProcessamento))
.collect(Collectors.toList());
The latter should be the code you wanted. Note that it is much simpler and straightforward than what you posted in your question!
I put the complete and compileable code on Github.
Finally, on the subject of Onosendai answer you linked in question, that is about Javascript, not Java. The Javascript model is totally single-threaded, so there’s not much to talk about parallelism there.
I believe you killed my next question: when to use
reduce
and when to usecollect
– Jefferson Quesado