Guida a Stream.reduce ()

1. Panoramica

L'API Stream fornisce un ricco repertorio di funzioni intermedie, di riduzione e di terminale, che supportano anche la parallelizzazione.

Più specificamente, le operazioni di flusso di riduzione ci consentono di produrre un unico risultato da una sequenza di elementi , applicando ripetutamente un'operazione di combinazione agli elementi nella sequenza.

In questo tutorial, vedremo il general-purpose Stream.reduce () il funzionamento e lo vediamo in alcuni casi di utilizzo di cemento.

2. I concetti chiave: identità, accumulatore e combinatore

Prima di esaminare più a fondo l'utilizzo dell'operazione Stream.reduce () , suddividiamo gli elementi partecipanti dell'operazione in blocchi separati. In questo modo capiremo più facilmente il ruolo che ognuno gioca:

  • Identità : un elemento che è il valore iniziale dell'operazione di riduzione e il risultato predefinito se il flusso è vuoto
  • Accumulatore : una funzione che accetta due parametri: un risultato parziale dell'operazione di riduzione e l'elemento successivo del flusso
  • Combinatore : una funzione utilizzata per combinare il risultato parziale dell'operazione di riduzione quando la riduzione è parallelizzata o quando c'è una mancata corrispondenza tra i tipi di argomenti dell'accumulatore e i tipi di implementazione dell'accumulatore

3. Utilizzo di Stream.reduce ()

Per comprendere meglio la funzionalità degli elementi identità, accumulatore e combinatore, diamo un'occhiata ad alcuni esempi di base:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int result = numbers .stream() .reduce(0, (subtotal, element) -> subtotal + element); assertThat(result).isEqualTo(21);

In questo caso, il valore Integer 0 è l'identità. Memorizza il valore iniziale dell'operazione di riduzione e anche il risultato predefinito quando il flusso di valori Integer è vuoto.

Allo stesso modo, l'espressione lambda :

subtotal, element -> subtotal + element

è l'accumulatore , poiché prende la somma parziale dei valori Integer e l'elemento successivo nel flusso.

Per rendere il codice ancora più conciso, possiamo usare un riferimento al metodo, invece di un'espressione lambda:

int result = numbers.stream().reduce(0, Integer::sum); assertThat(result).isEqualTo(21);

Naturalmente, possiamo usare un'operazione reduce () sui flussi che contengono altri tipi di elementi.

Ad esempio, possiamo usare reduce () su un array di elementi String e unirli in un unico risultato:

List letters = Arrays.asList("a", "b", "c", "d", "e"); String result = letters .stream() .reduce("", (partialString, element) -> partialString + element); assertThat(result).isEqualTo("abcde");

Allo stesso modo, possiamo passare alla versione che utilizza un metodo di riferimento:

String result = letters.stream().reduce("", String::concat); assertThat(result).isEqualTo("abcde");

Usiamo l' operazione reduce () per unire gli elementi maiuscoli dell'array di lettere :

String result = letters .stream() .reduce( "", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase()); assertThat(result).isEqualTo("ABCDE");

Inoltre, possiamo usare reduce () in un flusso parallelizzato (ne parleremo più avanti):

List ages = Arrays.asList(25, 30, 45, 28, 32); int computedAges = ages.parallelStream().reduce(0, a, b -> a + b, Integer::sum);

Quando un flusso viene eseguito in parallelo, il runtime Java lo divide in più flussi secondari. In questi casi, è necessario utilizzare una funzione per combinare i risultati dei substream in uno solo . Questo è il ruolo del combinatore : nello snippet sopra, è il riferimento al metodo Integer :: sum .

Abbastanza divertente, questo codice non verrà compilato:

List users = Arrays.asList(new User("John", 30), new User("Julie", 35)); int computedAges = users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge()); 

In questo caso, abbiamo un flusso di oggetti User e i tipi di argomenti dell'accumulatore sono Integer e User. Tuttavia, l'implementazione dell'accumulatore è una somma di numeri interi, quindi il compilatore non può dedurre il tipo di parametro utente .

Possiamo risolvere questo problema utilizzando un combinatore:

int result = users.stream() .reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); assertThat(result).isEqualTo(65);

Per dirla semplicemente, se usiamo flussi sequenziali e i tipi di argomenti dell'accumulatore ei tipi della sua implementazione corrispondono, non abbiamo bisogno di usare un combinatore .

4. Riduzione in parallelo

Come abbiamo imparato prima, possiamo usare reduce () su flussi parallelizzati.

Quando usiamo flussi parallelizzati, dobbiamo assicurarci che reduce () o qualsiasi altra operazione di aggregazione eseguita sui flussi sia:

  • associativo : il risultato non è influenzato dall'ordine degli operandi
  • non interferente : l'operazione non influisce sulla sorgente dati
  • senza stato e deterministico : l'operazione non ha stato e produce lo stesso output per un dato input

Dovremmo soddisfare tutte queste condizioni per evitare risultati imprevedibili.

Come previsto, le operazioni eseguite su flussi parallelizzati, incluso reduce (), vengono eseguite in parallelo, sfruttando quindi le architetture hardware multi-core.

Per ovvie ragioni, i flussi parallelizzati sono molto più performanti rispetto alle controparti sequenziali . Anche così, possono essere eccessivi se le operazioni applicate allo stream non sono costose o il numero di elementi nello stream è piccolo.

Ovviamente, i flussi parallelizzati sono la strada giusta quando dobbiamo lavorare con flussi di grandi dimensioni ed eseguire costose operazioni di aggregazione.

Creiamo un semplice test di benchmark JMH (Java Microbenchmark Harness) e confrontiamo i rispettivi tempi di esecuzione quando si utilizza l' operazione reduce () su un flusso sequenziale e parallelizzato:

@State(Scope.Thread) private final List userList = createUsers(); @Benchmark public Integer executeReduceOnParallelizedStream() { return this.userList .parallelStream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } @Benchmark public Integer executeReduceOnSequentialStream() { return this.userList .stream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } 

Nel benchmark JMH sopra, confrontiamo i tempi medi di esecuzione . Creiamo semplicemente una lista contenente un gran numero di oggetti utente . Successivamente, chiamiamo reduce () su un flusso sequenziale e parallelizzato e controlliamo che quest'ultimo funzioni più velocemente del primo (in secondi per operazione).

Questi sono i nostri risultati di riferimento:

Benchmark Mode Cnt Score Error Units JMHStreamReduceBenchMark.executeReduceOnParallelizedStream avgt 5 0,007 ± 0,001 s/op JMHStreamReduceBenchMark.executeReduceOnSequentialStream avgt 5 0,010 ± 0,001 s/op

5. Lanciare e gestire le eccezioni durante la riduzione

Negli esempi precedenti, l' operazione reduce () non genera eccezioni. Ma potrebbe, ovviamente.

Ad esempio, supponiamo di dover dividere tutti gli elementi di un flusso per un fattore fornito e quindi sommarli:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int divider = 2; int result = numbers.stream().reduce(0, a / divider + b / divider); 

This will work, as long as the divider variable is not zero. But if it is zero, reduce() will throw an ArithmeticException exception: divide by zero.

We can easily catch the exception and do something useful with it, such as logging it, recovering from it and so forth, depending on the use case, by using a try/catch block:

public static int divideListElements(List values, int divider) { return values.stream() .reduce(0, (a, b) -> { try { return a / divider + b / divider; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return 0; }); }

While this approach will work, we polluted the lambda expression with the try/catch block. We no longer have the clean one-liner that we had before.

To fix this issue, we can use the extract function refactoring technique, and extract the try/catch block into a separate method:

private static int divide(int value, int factor) { int result = 0; try { result = value / factor; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return result } 

Now, the implementation of the divideListElements() method is again clean and streamlined:

public static int divideListElements(List values, int divider) { return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider)); } 

Assuming that divideListElements() is a utility method implemented by an abstract NumberUtils class, we can create a unit test to check the behavior of the divideListElements() method:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21); 

Let's also test the divideListElements() method, when the supplied List of Integer values contains a 0:

List numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21); 

Finally, let's test the method implementation when the divider is 0, too:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 0)).isEqualTo(0);

6. Complex Custom Objects

We can also use Stream.reduce() with custom objects that contain non-primitive fields. To do so, we need to provide a relevant identity, accumulator, and combiner for the data type.

Suppose our User is part of a review website. Each of our Users can possess one Rating, which is averaged over many Reviews.

First, let's start with our Review object. Each Review should contain a simple comment and score:

public class Review { private int points; private String review; // constructor, getters and setters }

Next, we need to define our Rating, which will hold our reviews alongside a points field. As we add more reviews, this field will increase or decrease accordingly:

public class Rating { double points; List reviews = new ArrayList(); public void add(Review review) { reviews.add(review); computeRating(); } private double computeRating() { double totalPoints = reviews.stream().map(Review::getPoints).reduce(0, Integer::sum); this.points = totalPoints / reviews.size(); return this.points; } public static Rating average(Rating r1, Rating r2) { Rating combined = new Rating(); combined.reviews = new ArrayList(r1.reviews); combined.reviews.addAll(r2.reviews); combined.computeRating(); return combined; } }

We have also added an average function to compute an average based on the two input Ratings. This will work nicely for our combiner and accumulator components.

Next, let's define a list of Users, each with their own sets of reviews.

User john = new User("John", 30); john.getRating().add(new Review(5, "")); john.getRating().add(new Review(3, "not bad")); User julie = new User("Julie", 35); john.getRating().add(new Review(4, "great!")); john.getRating().add(new Review(2, "terrible experience")); john.getRating().add(new Review(4, "")); List users = Arrays.asList(john, julie); 

Ora che John e Julie sono stati presi in considerazione, usiamo Stream.reduce () per calcolare una valutazione media per entrambi gli utenti. Come identità , restituiamo una nuova valutazione se la nostra lista di input è vuota :

Rating averageRating = users.stream() .reduce(new Rating(), (rating, user) -> Rating.average(rating, user.getRating()), Rating::average);

Se facciamo i conti, dovremmo scoprire che il punteggio medio è 3,6:

assertThat(averageRating.getPoints()).isEqualTo(3.6);

7. Conclusione

In questo tutorial abbiamo imparato come utilizzare l' operazione Stream.reduce () . Inoltre, abbiamo appreso come eseguire riduzioni su flussi sequenziali e parallelizzati e come gestire le eccezioni durante la riduzione .

Come al solito, tutti gli esempi di codice mostrati in questo tutorial sono disponibili su GitHub.