Fork/Join
Functionality fork/join
, present from Java 7, it can be a little difficult to understand at first, as it attends a specific class of problems.
According to class documentation ForkJoinTask
, the idea is to allow a more efficient method of parallel processing by imposing some restrictions on the way the threads operas. The goal is to break down a problem into smaller tasks and run them independently, without using synchronization (synchronized
).
If you think about it a little bit, it solves Dynamic Programming. One of the classic examples is the Fibonacci number. The following code is an example of using the fork/join
calculating the Fibonacci number:
public class Fibonacci extends RecursiveTask<Long> {
long n;
public Fibonacci(long n) {
this.n = n;
}
@Override
protected Long compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
We can highlight the following points::
- We extend
RecursiveTask
, one of the two available implementations of ForkJoinTask
. A "recursive task" is intended to execute recursive subtasks in parallel and return a final value.
- Within the method
compute()
"magic" occurs, where we define the limit of recursar in the first if
and recursively invoke class instances in new threads.
- In invoking
f1.fork()
, we are asking for the calculation of f(n-2)
be done in another thread. This releases parallel processing of f(n-1)
.
- Finally, we invoke
f2.compute()
to calculate the value immediately and f1.join()
to recover the value of fork
or wait for processing to end.
You can run the above code as follows:
ForkJoinPool pool = new ForkJoinPool(4);
Fibonacci fibonacci = new Fibonacci(10);
long resultado = pool.invoke(fibonacci);
System.out.println(resultado);
A class ForkJoinPool
allows the management of parallel tasks and the constructor parameter establishes the level of parallelism, that is, how many threads will be used simultaneously.
The method pool.invoke(fibonacci)
starts processing, waits for calculation and returns the calculated number.
Threadpoolexecutor
However, if your problem doesn’t fall into the problem category where "divide to conquer" recursively is the best strategy, you can use more generic Apis such as ThreadPoolExecutor
.
She’s part of the same package that fork/join
, because they both implement ExecutorService
, but without the recursive and limited nature of the "sister".
To create an incentive to ThreadPoolexecutor
:
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
4, //tamanho inicial do pool
8, //tamanho máximo do pool
10000, //tempo de espera máximo para cada thread na fila
TimeUnit.MILLISECONDS, //unidade de tempo
new LinkedBlockingQueue<Runnable>() //implementação da fila de theads
);
Then we ask to run threads and use Futures to keep the promise of a future outcome:
Future<Integer> f1 = threadPoolExecutor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1;
}
});
Future<Integer> f2 = threadPoolExecutor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 2;
}
});
Note that the above code uses the method submit()
to request the execution of two Callable
s. One Callable
represents a thread, as well as a Runnable
, but returning a value.
The class Future
keeps a reference to the thread so that we can continue the execution and recover the result when it is ready, like this:
try {
Integer r1 = f1.get(1000, TimeUnit.MILLISECONDS);
Integer r2 = f2.get(1000, TimeUnit.MILLISECONDS);
System.out.println(r1 + r2);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
In the example above, I used the method get()
of the instances of Future
to await processing and effectively recover the result.
The parameters indicate that the waiting time limit is 1,000 milliseconds. If you pass this, a TimeoutException
will be released.
Final considerations
The choice between using threads classics, fork/join
, ThreadPoolExecutor
or any other mechanism depends on the nature of the problem you try to solve. One approach is not necessarily better than the others for all kinds of problems.
In addition, certain situations will require more specific implementations where you will need to extend API classes and interfaces java.util.concurrent
. The good news is that this API is made with this in mind.
Yeah, my problem actually already implemented
threads
to manipulate API emailJavaMail
, because I perform procedure to separate messages by sender, separate messages by content, create folders. Things like that, but I intend to use competition for other features in the system.– Macario1983
@Macario1983 Corrected!
– utluiz
I used a code with
forkInJoin
, Very much ball show guy, problem is how you put it, it does not allow synchronization, something I had to change in my code because I assigned to an array which messages have no attachment. So I chose to pass onelist
to facilitate. I will post the code also in case someone wants to see, or someone can fix.– Macario1983