In a map/reduce in Java, should the accumulation operation generate a new object? Or can I use the old one?

Asked

Viewed 252 times

11

I have an application that creates several objects on top of a stream of functions. And then I collect all these generated objects in a accumulator. For example, if I generated strings and accumulated in a list, I would do something like this:

List<Function<T, String>> funcoes = /* inicializa com as diversas funções */;

T objetoSobProcessamento = /* parâmetro arbitrário */
List<String> retornoProcessamento = funcoes.stream().map(f -> f.apply(objetoSobProcessamento))
  .reduce(new ArrayList<>(), (listaAcumulada, novoValor) -> {
            listaAcumulada.add(novoValor);
            return listaAcumulada;
          }, (l1, l2) -> {
            l1.addAll(l2);
            return l1;
          });

The real case is with other objects, slightly more complex, but follows the same spirit

When looking at the possible operations of reduce in Java, I came to a question about the parallelization of the reduction operation, in the generic case. Look that answer left me with the impression that this operation of reduce should be sequential, not parallelizable.

So my question is:

  • in the operation of accumulation, to take full advantage of the parallelism, I must produce at all times a new object? or should at all times reuse the object? or should just avoid reusing the object identidade, generating new objects when accumulating the first time and reusing this new element generated in the?

I know the combination operation/merge of the obtained objects is perfectly parallelizable if and only if the arguments passed to it are unique, as in the case illustrated below:

t1--A--ui  t2--A--ui  t3--A--ui  t4--A--ui
    |          |          |          |
    u1----M----u2         u3----M----u4
          |                     |
          u5---------M----------u6
                     |
                     u7

Where A is an accumulation operation, M is an operation of merge, t? is an object of the input type T, u? is an object of the return type U and ui is the identity object.

1 answer

6


Well, you didn’t say what you use as class T nor what are those functions that operate on it to produce Strings. Then I will invent a class for this:

class Colorido {
    private void complica() {
        try {
            Thread.sleep((int) (Math.random() * 50));
        } catch (InterruptedException e) {
            // Ignora.
        }
    }
    public String getVermelho() { complica(); return "vermelho"; }
    public String getLaranja()  { complica(); return "laranja";  }
    public String getAmarelo()  { complica(); return "amarelo";  }
    public String getLima()     { complica(); return "lima";     }
    public String getVerde()    { complica(); return "verde";    }
    public String getCiano()    { complica(); return "ciano";    }
    public String getAzul()     { complica(); return "azul";     }
    public String getVioleta()  { complica(); return "violeta";  }
    public String getRosa()     { complica(); return "rosa";     }
    public String getRoxo()     { complica(); return "roxo";     }
    public String getBranco()   { complica(); return "branco";   }
    public String getPreto()    { complica(); return "preto";    }
    public String getMarrom()   { complica(); return "marrom";   }
    public String getOliva()    { complica(); return "oliva";    }
    public String getBege()     { complica(); return "bege";     }
    public String getCinza()    { complica(); return "cinza";    }
    public static List<Function<Colorido, String>> funcoes() {
        return Arrays.asList(
            Colorido::getVermelho, Colorido::getLaranja, Colorido::getAmarelo, Colorido::getLima,
            Colorido::getVerde, Colorido::getCiano, Colorido::getAzul, Colorido::getVioleta,
            Colorido::getRosa, Colorido::getRoxo, Colorido::getBranco, Colorido::getPreto,
            Colorido::getMarrom, Colorido::getOliva, Colorido::getBege, Colorido::getCinza
        );
    }
}

This is certainly a very silly class. However, note that the method funcoes will return a list with 16 functions that operate on the object. Each function returns the name of a different color, but all of them will take some time to do this thanks to Thread.sleep random time within the method complica. After all, if we want to test parallelism, it is important that threads do not run too quickly to prevent one from starting when the other is done.

So, let’s test the map-reduce you put in the question:

public static <T> List<String> mapReduceSequencial(
    List<Function<T, String>> funcoes, // inicializa com as diversas funções
    T objetoSobProcessamento) // parâmetro arbitrário
{
    List<String> retornoProcessamento = funcoes
            .stream()
            .map(f -> f.apply(objetoSobProcessamento))
            .reduce(new ArrayList<>(), (listaAcumulada, novoValor) -> {
                listaAcumulada.add(novoValor);
                return listaAcumulada;
            }, (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            });
    return Collections.unmodifiableList(retornoProcessamento);
}

He is summoned thus:

System.out.println(mapReduceSequencial(Colorido.funcoes(), new Colorido()));

Here’s the way out (I added the line break just to make it easier to read):

[vermelho, laranja, amarelo, lima, verde, ciano, azul, violeta,
 rosa, roxo, branco, preto, marrom, oliva, bege, cinza]

Note that the order of colors is the same as in the list.

And if we change that .stream() for .parallelStream(), what happens? In this case, the result will vary because of the Thread.sleep random and because of the inherent Indeterminism of seeing multiple simultaneous threads, but in one test I did the result was this:

[verde, rosa, roxo, ciano, azul, branco, violeta, preto,
vermelho, laranja, amarelo, marrom, oliva, lima, verde, rosa,
roxo, ciano, azul, branco, violeta, preto, vermelho, laranja,
amarelo, marrom, oliva, lima, bege, cinza, verde, rosa,
roxo, ciano, azul, branco, violeta, preto, vermelho, laranja,
amarelo, marrom, oliva, lima, verde, rosa, roxo, ciano,
azul, branco, violeta, preto, vermelho, laranja, amarelo, marrom,
oliva, lima, bege, cinza, verde, rosa, roxo, ciano,
azul, branco, violeta, preto, vermelho, laranja, amarelo, marrom,
oliva, lima, verde, rosa, roxo, ciano, azul, branco,
violeta, preto, vermelho, laranja, amarelo, marrom, oliva, lima,
bege, cinza, verde, rosa, roxo, ciano, azul, branco,
violeta, preto, vermelho, laranja, amarelo, marrom, oliva, lima,
verde, rosa, roxo, ciano, azul, branco, violeta, preto,
vermelho, laranja, amarelo, marrom, oliva, lima, bege, cinza]

Well, that result is clearly very wrong. That means implementing your reduce does not work in Streams parallels (but be calm and soon I’ll explain how to fix this). Note that there are several subsequences repeating in this result there.

Looking at the documentation of the method Stream<T>.reduce(U, BiFunction<U, ? super T, U>, BinaryOperator<U>) whose parameters are called respectively identity, accumulator and combiner, we found this:

This is equivalent to:

 U result = identity;
 for (T element : this stream)
     result = accumulator.apply(result, element)
 return result;

but is not constrained to execute sequentially.

Translating into Portuguese:

This is equivalent to [code] but not limited to running sequentially.

Let’s take a look at this line first:

U result = identity;

That means your ArrayList is the first value of result.

Now that line inside the for:

    result = accumulator.apply(result, element)

This means that from the given list, another list will be produced and based on it another list and so on until the elements are finished. It turns out that in your case the list produced is always the same, and so instead of you producing new results based on the previous partial results, you’re actually modifying the previous partial results and returning them as if they were new.

The result is that by doing this in parallel, when the combiner is used, you will have to l1 and l2 point to the same object, which is the ArrayList, and with it, when you call l1.addAll(l2), you will add the list to itself, which means it will duplicate all the elements existing in it! Combine that with the fact that parallel threads are taking random/unpredictable times and the result is that mess that was produced.

Back there in the javadoc, is also written this:

This Means that for all u, combiner(identity, u) is Equal to u.

Translating:

It means that for all u, combiner(identity, u) is equal to u.

This means, in an algebraic way, that identity must be the neutral element of the operation combiner, just as 0 is the neutral element of sum and 1 is the neutral element of multiplication. Anything combined with identity should result in the same thing.

The combiner is your last lambda that makes the addAll. In your two letters you change the list, and therefore you change what is considered identity. Therefore, by invoking combiner(identity, u) you end up having a list that is not the same as the u was before and yes one that equates to u folded. This means that your vanes violate the rules defined by the documentation of the reduce by violating the neutral element rule of combiner. Therefore, it is obvious that bad things will happen.

We may change a few things in your files to restore the behavior/contract required by reduce:

public static <T> List<String> mapReduceParalelo2(
    List<Function<T, String>> funcoes, // inicializa com as diversas funções
    T objetoSobProcessamento) // parâmetro arbitrário
{
    List<String> retornoProcessamento = funcoes
            .parallelStream()
            .map(f -> f.apply(objetoSobProcessamento))
            .reduce(Collections.emptyList(), (listaAcumulada, novoValor) -> {
                List<String> novaLista = listaAcumulada.isEmpty() ? new ArrayList<>() : listaAcumulada;
                novaLista.add(novoValor);
                return novaLista;
            }, (l1, l2) -> {
                if (l1.isEmpty()) return l2;
                if (l2.isEmpty()) return l1;
                l1.addAll(l2);
                return l1;
            });
    return Collections.unmodifiableList(retornoProcessamento);
}

The name of the method is mapReduceParalelo2 because I’ll call what produced the messy result of mapReduceParalelo1. In the method parReduceParalelo2, note that the identity is Collections.emptyList(). I did it to emphasize that the identity should not be changed (trying to do this would result in an exception).

The accumulator test whether the given list is a identity to avoid problems by creating a new one if it is not. If the given list is not a identity, then it can be safely changed because the original list would not be reused anywhere if a new one were created.

Already the combiner, checks whether one of the lists is a identity, returning the other in this case. If none of them is the identity, then it’s okay to do the addAll instead of creating a new one because none of the original lists would be reused after.

The moral of the story is that the identity is always reused, while the other partial results are not. If the identity was always recreated rather than reused, the mapReduceParalelo1 functionary.

This may have gotten a little complicated because in fact the reduce was designed to work with immutable data, at least with a identity immutable. Although it is possible to use the reduce for mutable things, the result is this somewhat complicated and confusing piece that came out. Already the collect is different. The collect was indeed designed to work with changeable things. For example:

public static <T> List<String> mapCollectParalelo(
    List<Function<T, String>> funcoes, // inicializa com as diversas funções
    T objetoSobProcessamento) // parâmetro arbitrário
{
    List<String> retornoProcessamento = funcoes
            .parallelStream()
            .map(f -> f.apply(objetoSobProcessamento))
            .collect(ArrayList::new, List::add, List::addAll);
    return Collections.unmodifiableList(retornoProcessamento);
}

Note the simplicity of the parameters of method collect. Note that all three parameters are method-References. The first, rather than being a fixed object to serve as identity always being reused, is the Supplier which is used to create new lists, in this case the constructor of ArrayList. The second parameter (accumulator) is what adds elements to the list, that is to say the method add (and the result of it is void, because the expected operation here is a mutation). The third parameter (combiner) that combines elements is the addAll (also void). Note that it is used parallelStream(). The result is what is expected (and if it were sequential, it is also):

[vermelho, laranja, amarelo, lima, verde, ciano, azul, violeta,
 rosa, roxo, branco, preto, marrom, oliva, bege, cinza]

Maybe you realize that join ArrayList::new, List::add and List::addAll be a very frequent and common case. Can’t you encapsulatthese three in one object? Well, that’s what the interface Collector. And the method collect(Collector) is the version of collect using such an interface. To obtain a Collector that creates the list using these three parameters, just use the method Collectors.toList(). So you can use the collect simply like that:

    List<String> retornoProcessamento = funcoes
            .parallelStream()
            .map(f -> f.apply(objetoSobProcessamento))
            .collect(Collectors.toList());

The latter should be the code you wanted. Note that it is much simpler and straightforward than what you posted in your question!

I put the complete and compileable code on Github.

Finally, on the subject of Onosendai answer you linked in question, that is about Javascript, not Java. The Javascript model is totally single-threaded, so there’s not much to talk about parallelism there.

  • 1

    I believe you killed my next question: when to use reduce and when to use collect

Browser other questions tagged

You are not signed in. Login or sign up in order to post.