Guida al framework Fork / Join in Java

1. Panoramica

Il framework fork / join è stato presentato in Java 7. Fornisce strumenti per accelerare l'elaborazione parallela tentando di utilizzare tutti i core del processore disponibili, cosa che si ottiene attraverso un approccio divide et impera .

In pratica, questo significa che il framework prima esegue il "fork" , suddividendo ricorsivamente l'attività in attività secondarie indipendenti più piccole finché non sono abbastanza semplici da essere eseguite in modo asincrono.

Dopodiché, inizia la parte "join" , in cui i risultati di tutte le attività secondarie vengono ricorsivamente riunite in un unico risultato, o nel caso di un'attività che restituisce void, il programma attende semplicemente che ogni attività secondaria venga eseguita.

Per fornire un'esecuzione parallela efficace, il framework fork / join utilizza un pool di thread denominato ForkJoinPool , che gestisce i thread di lavoro di tipo ForkJoinWorkerThread .

2. ForkJoinPool

Il ForkJoinPool è il cuore del framework. Si tratta di un'implementazione di ExecutorService che gestisce i thread di lavoro e ci fornisce gli strumenti per ottenere informazioni sullo stato e sulle prestazioni del pool di thread.

I thread di lavoro possono eseguire solo un'attività alla volta, ma ForkJoinPool non crea un thread separato per ogni singola attività secondaria. Invece, ogni thread nel pool ha la propria coda a doppia estremità (o deque, deck pronunciato ) che memorizza le attività.

Questa architettura è vitale per bilanciare il carico di lavoro del thread con l'aiuto dell'algoritmo di furto del lavoro.

2.1. Algoritmo di furto del lavoro

In poche parole: i thread liberi tentano di "rubare" il lavoro da deques di thread occupati.

Per impostazione predefinita, un thread di lavoro riceve le attività dall'head del proprio deque. Quando è vuoto, il thread prende un'attività dalla coda del deque di un altro thread occupato o dalla coda di ingresso globale, poiché è qui che si trovano probabilmente le parti di lavoro più grandi.

Questo approccio riduce al minimo la possibilità che i thread competano per le attività. Riduce anche il numero di volte in cui il thread dovrà cercare lavoro, poiché funziona prima sui blocchi di lavoro più grandi disponibili.

2.2. Istanziazione ForkJoinPool

In Java 8, il modo più conveniente per accedere all'istanza di ForkJoinPool è utilizzare il suo metodo statico commonPool (). Come suggerisce il nome, questo fornirà un riferimento al pool comune, che è un pool di thread predefinito per ogni ForkJoinTask .

Secondo la documentazione di Oracle, l'utilizzo del pool comune predefinito riduce il consumo di risorse, poiché ciò scoraggia la creazione di un pool di thread separato per attività.

ForkJoinPool commonPool = ForkJoinPool.commonPool();

Lo stesso comportamento può essere ottenuto in Java 7 creando un ForkJoinPool e assegnandolo a un campo statico pubblico di una classe di utilità:

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

Ora è facilmente accessibile:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

Con i costruttori di ForkJoinPool , è possibile creare un pool di thread personalizzato con un livello specifico di parallelismo, produzione di thread e gestore di eccezioni. Nell'esempio precedente, il pool ha un livello di parallelismo di 2. Ciò significa che il pool utilizzerà 2 core del processore.

3. ForkJoinTask

ForkJoinTask è il tipo di base per le attività eseguite all'interno di ForkJoinPool. In pratica, una delle sue due sottoclassi dovrebbe essere estesa: la RecursiveAction per le attività void e la RecursiveTask per le attività che restituiscono un valore.Entrambi hanno un metodo astratto compute () in cui è definita la logica dell'attività.

3.1. RecursiveAction: un esempio

Nell'esempio seguente, l'unità di lavoro da elaborare è rappresentata da una stringa denominata carico di lavoro . A scopo dimostrativo, l'attività è priva di senso: semplicemente mette in maiuscolo il suo input e lo registra.

Per dimostrare il comportamento di fork del framework, l'esempio divide l'attività se workload .length () è maggiore di una soglia specificatautilizzando il metodo createSubtask () .

La stringa viene suddivisa ricorsivamente in sottostringhe, creando istanze CustomRecursiveTask basate su queste sottostringhe.

Di conseguenza, il metodo restituisce un List.

The list is submitted to the ForkJoinPool using the invokeAll() method:

public class CustomRecursiveAction extends RecursiveAction { private String workload = ""; private static final int THRESHOLD = 4; private static Logger logger = Logger.getAnonymousLogger(); public CustomRecursiveAction(String workload) { this.workload = workload; } @Override protected void compute() { if (workload.length() > THRESHOLD) { ForkJoinTask.invokeAll(createSubtasks()); } else { processing(workload); } } private List createSubtasks() { List subtasks = new ArrayList(); String partOne = workload.substring(0, workload.length() / 2); String partTwo = workload.substring(workload.length() / 2, workload.length()); subtasks.add(new CustomRecursiveAction(partOne)); subtasks.add(new CustomRecursiveAction(partTwo)); return subtasks; } private void processing(String work) { String result = work.toUpperCase(); logger.info("This result - (" + result + ") - was processed by " + Thread.currentThread().getName()); } }

This pattern can be used to develop your own RecursiveAction classes. To do this, create an object which represents the total amount of work, chose a suitable threshold, define a method to divide the work, and define a method to do the work.

3.2. RecursiveTask

For tasks that return a value, the logic here is similar, except that the result for each subtask is united in a single result:

public class CustomRecursiveTask extends RecursiveTask { private int[] arr; private static final int THRESHOLD = 20; public CustomRecursiveTask(int[] arr) { this.arr = arr; } @Override protected Integer compute() { if (arr.length > THRESHOLD) { return ForkJoinTask.invokeAll(createSubtasks()) .stream() .mapToInt(ForkJoinTask::join) .sum(); } else { return processing(arr); } } private Collection createSubtasks() { List dividedTasks = new ArrayList(); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, 0, arr.length / 2))); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, arr.length / 2, arr.length))); return dividedTasks; } private Integer processing(int[] arr) { return Arrays.stream(arr) .filter(a -> a > 10 && a  a * 10) .sum(); } }

In this example, the work is represented by an array stored in the arr field of the CustomRecursiveTask class. The createSubtasks() method recursively divides the task into smaller pieces of work until each piece is smaller than the threshold. Then, the invokeAll() method submits the subtasks to the common pool and returns a list of Future.

To trigger execution, the join() method is called for each subtask.

In this example, this is accomplished using Java 8's Stream API; the sum() method is used as a representation of combining sub results into the final result.

4. Submitting Tasks to the ForkJoinPool

To submit tasks to the thread pool, few approaches can be used.

The submit() or execute()method (their use cases are the same):

forkJoinPool.execute(customRecursiveTask); int result = customRecursiveTask.join();

The invoke()method forks the task and waits for the result, and doesn’t need any manual joining:

int result = forkJoinPool.invoke(customRecursiveTask);

The invokeAll() method is the most convenient way to submit a sequence of ForkJoinTasks to the ForkJoinPool. It takes tasks as parameters (two tasks, var args, or a collection), forks then returns a collection of Future objects in the order in which they were produced.

Alternatively, you can use separate fork() and join() methods. The fork() method submits a task to a pool, but it doesn't trigger its execution. The join() method must be used for this purpose. In the case of RecursiveAction, the join() returns nothing but null; for RecursiveTask, it returns the result of the task's execution:

customRecursiveTaskFirst.fork(); result = customRecursiveTaskLast.join();

In our RecursiveTask example we used the invokeAll() method to submit a sequence of subtasks to the pool. The same job can be done with fork() and join(), though this has consequences for the ordering of the results.

Per evitare confusione, è generalmente una buona idea utilizzare il metodo invokeAll () per inviare più di un'attività al ForkJoinPool.

5. Conclusioni

L'uso del framework fork / join può accelerare l'elaborazione di attività di grandi dimensioni, ma per ottenere questo risultato, è necessario seguire alcune linee guida:

  • Utilizzare il minor numero possibile di pool di thread: nella maggior parte dei casi, la decisione migliore è utilizzare un pool di thread per applicazione o sistema
  • Utilizzare il pool di thread comuni predefinito, se non è necessaria alcuna regolazione specifica
  • Usa una soglia ragionevole per dividere ForkJoinTask in sottoattività
  • Evita qualsiasi blocco nei tuoi ForkJoinTasks

Gli esempi utilizzati in questo articolo sono disponibili nel repository GitHub collegato.