Fork in Join in Java

Asked

Viewed 1,447 times

2

During a project I was suggested the use of Fork in Join of the Java API instead of threads, I found nothing easy to understand by the examples found on Google.

I understood that it is possible to pass a list of tasks and it subdivides and then gathers the results.

I wonder if it’s better than using threads and if someone has an example to show or even count their experience of using this model.

3 answers

4


Fork/Join

Functionality fork/join, present from Java 7, it can be a little difficult to understand at first, as it attends a specific class of problems.

According to class documentation ForkJoinTask, the idea is to allow a more efficient method of parallel processing by imposing some restrictions on the way the threads operas. The goal is to break down a problem into smaller tasks and run them independently, without using synchronization (synchronized).

If you think about it a little bit, it solves Dynamic Programming. One of the classic examples is the Fibonacci number. The following code is an example of using the fork/join calculating the Fibonacci number:

public class Fibonacci extends RecursiveTask<Long> {

    long n;

    public Fibonacci(long n) {
        this.n = n;
    }

    @Override
    protected Long compute() {
        if (n <= 1) {
            return n;
        }
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        return f2.compute() + f1.join();
    }

}

We can highlight the following points::

  • We extend RecursiveTask, one of the two available implementations of ForkJoinTask. A "recursive task" is intended to execute recursive subtasks in parallel and return a final value.
  • Within the method compute() "magic" occurs, where we define the limit of recursar in the first if and recursively invoke class instances in new threads.
  • In invoking f1.fork(), we are asking for the calculation of f(n-2) be done in another thread. This releases parallel processing of f(n-1).
  • Finally, we invoke f2.compute() to calculate the value immediately and f1.join() to recover the value of fork or wait for processing to end.

You can run the above code as follows:

ForkJoinPool pool = new ForkJoinPool(4);
Fibonacci fibonacci = new Fibonacci(10);
long resultado = pool.invoke(fibonacci);
System.out.println(resultado);

A class ForkJoinPool allows the management of parallel tasks and the constructor parameter establishes the level of parallelism, that is, how many threads will be used simultaneously.

The method pool.invoke(fibonacci) starts processing, waits for calculation and returns the calculated number.


Threadpoolexecutor

However, if your problem doesn’t fall into the problem category where "divide to conquer" recursively is the best strategy, you can use more generic Apis such as ThreadPoolExecutor.

She’s part of the same package that fork/join, because they both implement ExecutorService, but without the recursive and limited nature of the "sister".

To create an incentive to ThreadPoolexecutor:

ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
        4, //tamanho inicial do pool
        8, //tamanho máximo do pool  
        10000, //tempo de espera máximo para cada thread na fila 
        TimeUnit.MILLISECONDS, //unidade de tempo
        new LinkedBlockingQueue<Runnable>() //implementação da fila de theads
);

Then we ask to run threads and use Futures to keep the promise of a future outcome:

Future<Integer> f1 = threadPoolExecutor.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        return 1;
    }
});

Future<Integer> f2 = threadPoolExecutor.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        return 2;
    }
});

Note that the above code uses the method submit() to request the execution of two Callables. One Callable represents a thread, as well as a Runnable, but returning a value.

The class Future keeps a reference to the thread so that we can continue the execution and recover the result when it is ready, like this:

try {
    Integer r1 = f1.get(1000, TimeUnit.MILLISECONDS);
    Integer r2 = f2.get(1000, TimeUnit.MILLISECONDS);
    System.out.println(r1 + r2);
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}

In the example above, I used the method get() of the instances of Future to await processing and effectively recover the result.

The parameters indicate that the waiting time limit is 1,000 milliseconds. If you pass this, a TimeoutException will be released.


Final considerations

The choice between using threads classics, fork/join, ThreadPoolExecutor or any other mechanism depends on the nature of the problem you try to solve. One approach is not necessarily better than the others for all kinds of problems.

In addition, certain situations will require more specific implementations where you will need to extend API classes and interfaces java.util.concurrent. The good news is that this API is made with this in mind.

  • 1

    Yeah, my problem actually already implemented threads to manipulate API email JavaMail, because I perform procedure to separate messages by sender, separate messages by content, create folders. Things like that, but I intend to use competition for other features in the system.

  • @Macario1983 Corrected!

  • I used a code with forkInJoin, Very much ball show guy, problem is how you put it, it does not allow synchronization, something I had to change in my code because I assigned to an array which messages have no attachment. So I chose to pass one list to facilitate. I will post the code also in case someone wants to see, or someone can fix.

2

There is a project called Java Concurrent Animated that created a demonstration application of Java capabilities in concurrent/ parallel programming.

You can download a demo (with the fonts) that you can use to study everything related to this theme.

There are several types of animations that demonstrate how to use each competing programming model. During the animation, excerpts from the sample code are highlighted, giving a good idea of how to implement.

The demo is just a self-executable JAR. Just double-click.

It’s very cool and very well done!

  • Very good your recommendation!

0

I am posting the code as an example for anyone who wants to see how it was done. This can be changed by anyone who wants, as long as it has improved.

package service.forkinjoin;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;

import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Multipart;
import javax.mail.Part;
import javax.mail.internet.MimeBodyPart;

import service.FileUtil;

public class ForkSortMessages extends RecursiveAction {

    private static final long serialVersionUID = -1092415796824205832L;
    private List<Message> listMessages;
    private List<Message> listMessagesToDelete;

    public ForkSortMessages(List<Message> listMessages, List<Message> listMessagesToDelete) {
        this.listMessages = listMessages;
        this.listMessagesToDelete = listMessagesToDelete;
    }

    @Override
    protected void compute() {

        List<RecursiveAction> actions = new ArrayList<>();

        if (this.listMessages.size() <= Runtime.getRuntime().availableProcessors()) {
            try {
                this.separateMessages();
            } catch (MessagingException | IOException e) {
                e.printStackTrace();
            }
        } else {

            int end = this.listMessages.size() / 2;
            actions.add(new ForkSortMessages(this.listMessages.subList(0, end), this.listMessagesToDelete));
            end += this.listMessages.size() % 2 == 0 ? 0 : 1;
            actions.add(new ForkSortMessages(this.listMessages.subList(end, this.listMessages.size()), this.listMessagesToDelete));
            invokeAll(actions);
        }
    }

    private void separateMessages() throws MessagingException, IOException {

        for (Message message : this.listMessages) {

            if ((this.hasNoAttachment(message.getContentType()) || (this.hasNoXmlAttachment(message)))) {
                listMessagesToDelete.add(message);
            }
        }
    }

    private boolean hasNoAttachment(String content) {
        return !content.contains("multipart/MIXED");
    }

    private boolean hasNoXmlAttachment(Message message) throws IOException, MessagingException {

        Multipart multipart = (Multipart) message.getContent();

        for (int i = 0; i < multipart.getCount(); i++) {

            MimeBodyPart mimeBodyPart = (MimeBodyPart) multipart.getBodyPart(i);

            if (Part.ATTACHMENT.equalsIgnoreCase(mimeBodyPart.getDisposition())) {

                if (FileUtil.isXmlFile(mimeBodyPart.getFileName())) {
                    return false;
                }
            }
        }

        return true;
    }
}
  • This is an answer as it stands or is part of your question?

  • 1

    @bigown was a code I made, I do not know this 100% as is the idea, but I did based on what I saw in some examples. I’m leaving so I can help.

Browser other questions tagged

You are not signed in. Login or sign up in order to post.