Guida alla libreria Java Parallel Collectors

1. Introduzione

Parallel-collectors è una piccola libreria che fornisce una serie di raccoglitori API Java Stream che consentono l'elaborazione parallela, aggirando allo stesso tempo le principali carenze di Parallel Stream standard.

2. Dipendenze di Maven

Se vogliamo iniziare a utilizzare la libreria, dobbiamo aggiungere una singola voce nel file pom.xml di Maven :

 com.pivovarit parallel-collectors 1.1.0 

O una singola riga nel file di build di Gradle:

compile 'com.pivovarit:parallel-collectors:1.1.0'

La versione più recente può essere trovata su Maven Central.

3. Avvertenze sui flussi paralleli

I flussi paralleli erano uno dei punti salienti di Java 8, ma si sono rivelati applicabili esclusivamente all'elaborazione pesante della CPU.

La ragione di ciò era il fatto che i flussi paralleli erano supportati internamente da un ForkJoinPool condiviso a livello di JVM , che forniva un parallelismo limitato ed era utilizzato da tutti i flussi paralleli in esecuzione su una singola istanza JVM.

Ad esempio, immagina di avere un elenco di ID e di volerli utilizzare per recuperare un elenco di utenti e che questa operazione sia costosa.

Potremmo usare Parallel Streams per questo:

List ids = Arrays.asList(1, 2, 3); List results = ids.parallelStream() .map(i -> fetchById(i)) // each operation takes one second .collect(Collectors.toList()); System.out.println(results); // [user-1, user-2, user-3]

E infatti, possiamo vedere che c'è un notevole aumento della velocità. Ma diventa problematico se iniziamo a eseguire più operazioni di blocco in parallelo ... in parallelo. Ciò potrebbe saturare rapidamente il pool e provocare latenze potenzialmente enormi. Ecco perché è importante creare paratie creando pool di thread separati, per evitare che attività non correlate si influenzino a vicenda nell'esecuzione.

Per fornire un'istanza ForkJoinPool personalizzata , potremmo sfruttare il trucco descritto qui, ma questo approccio si basava su un hack non documentato ed era difettoso fino a JDK10. Possiamo leggere di più nel numero stesso - [JDK8190974].

4. Collettori paralleli in azione

I servizi di raccolta paralleli, come suggerisce il nome, sono solo servizi di raccolta API Stream standard che consentono di eseguire operazioni aggiuntive in parallelo nella fase collect () .

La classe ParallelCollectors (che rispecchia la classe Collectors ) è una facciata che fornisce l'accesso a tutte le funzionalità della libreria.

Se volessimo rifare l'esempio sopra, potremmo semplicemente scrivere:

ExecutorService executor = Executors.newFixedThreadPool(10); List ids = Arrays.asList(1, 2, 3); CompletableFuture
    
      results = ids.stream() .collect(ParallelCollectors.parallelToList(i -> fetchById(i), executor, 4)); System.out.println(results.join()); // [user-1, user-2, user-3]
    

Il risultato è lo stesso, tuttavia, siamo stati in grado di fornire il nostro pool di thread personalizzato, specificare il nostro livello di parallelismo personalizzato e il risultato è arrivato avvolto in un'istanza CompletableFuture senza bloccare il thread corrente.

Gli stream paralleli standard, d'altra parte, non sono stati in grado di raggiungere nessuno di questi.

4.1. ParallelCollectors.parallelToList / ToSet ()

Per quanto possa essere intuitivo, se vogliamo elaborare uno Stream in parallelo e raccogliere i risultati in un List o Set , possiamo semplicemente usare ParallelCollectors.parallelToList o parallelToSet :

List ids = Arrays.asList(1, 2, 3); List results = ids.stream() .collect(parallelToList(i -> fetchById(i), executor, 4)) .join();

4.2. ParallelCollectors.parallelToMap ()

Se vogliamo raccogliere elementi Stream in un'istanza Map , proprio come con l'API Stream, dobbiamo fornire due mappatori:

List ids = Arrays.asList(1, 2, 3); Map results = ids.stream() .collect(parallelToMap(i -> i, i -> fetchById(i), executor, 4)) .join(); // {1=user-1, 2=user-2, 3=user-3}

Possiamo anche fornire un fornitore di istanze di mappa personalizzato :

Map results = ids.stream() .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, executor, 4)) .join(); 

E una strategia di risoluzione dei conflitti personalizzata:

List ids = Arrays.asList(1, 2, 3); Map results = ids.stream() .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, (s1, s2) -> s1, executor, 4)) .join();

4.3. ParallelCollectors.parallelToCollection ()

Analogamente a quanto sopra, possiamo passare il nostro fornitore di raccolta personalizzato se vogliamo ottenere risultati confezionati nel nostro contenitore personalizzato:

List results = ids.stream() .collect(parallelToCollection(i -> fetchById(i), LinkedList::new, executor, 4)) .join();

4.4. ParallelCollectors.parallelToStream ()

Se quanto sopra non è sufficiente, possiamo effettivamente ottenere un'istanza Stream e continuare l'elaborazione personalizzata da lì:

Map
    
      results = ids.stream() .collect(parallelToStream(i -> fetchById(i), executor, 4)) .thenApply(stream -> stream.collect(Collectors.groupingBy(i -> i.length()))) .join();
    

4.5. ParallelCollectors.parallel ()

Questo ci consente di trasmettere i risultati in ordine di completamento:

ids.stream() .collect(parallel(i -> fetchByIdWithRandomDelay(i), executor, 4)) .forEach(System.out::println); // user-1 // user-3 // user-2 

In questo caso, possiamo aspettarci che il collector restituisca risultati diversi ogni volta poiché abbiamo introdotto un ritardo di elaborazione casuale.

4.6. ParallelCollectors.parallelOrdered ()

Questa funzione consente lo streaming dei risultati proprio come sopra, ma mantiene l'ordine originale:

ids.stream() .collect(parallelOrdered(i -> fetchByIdWithRandomDelay(i), executor, 4)) .forEach(System.out::println); // user-1 // user-2 // user-3 

In questo caso, il raccoglitore manterrà sempre l'ordine ma potrebbe essere più lento di quanto sopra.

5. Limitations

At the point of writing, parallel-collectors don't work with infinite streams even if short-circuiting operations are used – it's a design limitation imposed by Stream API internals. Simply put, Streams treat collectors as non-short-circuiting operations so the stream needs to process all upstream elements before getting terminated.

The other limitation is that short-circuiting operations don't interrupt the remaining tasks after short-circuiting.

6. Conclusion

Abbiamo visto come la libreria parallel-collectors ci consente di eseguire l'elaborazione parallela utilizzando i collector personalizzati dell'API Java Stream e CompletableFutures per utilizzare pool di thread personalizzati, parallelismo e stile non bloccante di CompletableFutures.

Come sempre, gli snippet di codice sono disponibili su GitHub.

Per ulteriori letture, vedere la libreria di collezionisti paralleli su GitHub, il blog dell'autore e l'account Twitter dell'autore.