Guida ad Akka Streams

1. Panoramica

In questo articolo, esamineremo la libreria akka-streams che è costruita in cima alla struttura dell'attore Akka, che aderisce al manifesto dei flussi reattivi. L'API Akka Streams ci consente di comporre facilmente flussi di trasformazione dei dati da passaggi indipendenti.

Inoltre, tutta l'elaborazione viene eseguita in modo reattivo, non bloccante e asincrono.

2. Dipendenze di Maven

Per iniziare, dobbiamo aggiungere le librerie akka-stream e akka-stream-testkit nel nostro pom.xml:

 com.typesafe.akka akka-stream_2.11 2.5.2   com.typesafe.akka akka-stream-testkit_2.11 2.5.2 

3. API Akka Streams

Per lavorare con Akka Streams, dobbiamo essere consapevoli dei concetti chiave dell'API:

  • Sorgente - il punto di ingresso per l'elaborazione nella libreria akka-stream - possiamo creare un'istanza di questa classe da più sorgenti; per esempio, possiamo usare ilmetodo single () se vogliamo creare una Sorgente da una singola Stringa , oppure possiamo creare una Sorgente da un Iterable di elementi
  • Flow - l'elemento principale dell'elaborazione - ogniistanza Flow ha un valore di input e uno di output
  • Materializer - possiamo usarne uno se vogliamo che il nostro flusso abbia alcuni effetti collaterali come la registrazione o il salvataggio dei risultati ; più comunemente, passeremo l' alias NotUsed come Materializer per indicare che il nostro flusso non dovrebbe avere effetti collaterali
  • Lavandino funzionamento - quando stiamo costruendo un flusso, esso non viene eseguita finché avremo registrare un lavandino operazione su di esso - è un'operazione terminale che attiva tutti i calcoli nell'intero flusso

4. Creazione di flussi in Akka Streams

Iniziamo costruendo un semplice esempio, in cui mostreremo come creare e combinare più flussi s - per elaborare un flusso di numeri interi e calcolare la finestra mobile media delle coppie di interi dal flusso.

Analizzeremo una stringa di numeri interi delimitati da punto e virgola come input per creare la nostra sorgente akka-stream per l'esempio.

4.1. Utilizzo di un flusso per analizzare l'input

Per prima cosa, creiamo una classe DataImporter che prenderà un'istanza di ActorSystem che useremo in seguito per creare il nostro flusso :

public class DataImporter { private ActorSystem actorSystem; // standard constructors, getters... }

Successivamente, creiamo un metodo parseLine che genererà un elenco di numeri interi dalla nostra stringa di input delimitata . Tieni presente che stiamo utilizzando Java Stream API qui solo per l'analisi:

private List parseLine(String line) { String[] fields = line.split(";"); return Arrays.stream(fields) .map(Integer::parseInt) .collect(Collectors.toList()); }

Il nostro flusso iniziale applicherà parseLine al nostro input per creare un flusso con tipo di input String e tipo di output Integer :

private Flow parseContent() { return Flow.of(String.class) .mapConcat(this::parseLine); }

Quando chiamiamo il metodo parseLine () , il compilatore sa che l'argomento di quella funzione lambda sarà una stringa , lo stesso del tipo di input del nostro flusso .

Si noti che stiamo usando il mapConcat () il metodo - equivalente al Java 8 flatMap () il metodo - perché vogliamo appiattire la lista dei numeri interi restituito da parseLine () in un flusso di Integer in modo che le successive fasi della nostra lavorazione non è necessario per affrontare la lista .

4.2. Utilizzo di un flusso per eseguire calcoli

A questo punto, abbiamo il nostro flusso di interi analizzati. Ora, dobbiamo implementare la logica che raggrupperà tutti gli elementi di input in coppie e calcolerà una media di tali coppie .

Ora creeremo un flusso di numeri interi e li raggrupperemo usando il metodo grouped () .

Successivamente, vogliamo calcolare una media.

Poiché non siamo interessati all'ordine in cui tali medie verranno elaborate, possiamo avere medie calcolate in parallelo utilizzando più thread utilizzando il metodo mapAsyncUnordered () , passando il numero di thread come argomento a questo metodo.

L'azione che verrà passata come lambda al flusso deve restituire un CompletableFuture perché tale azione verrà calcolata in modo asincrono nel thread separato:

private Flow computeAverage() { return Flow.of(Integer.class) .grouped(2) .mapAsyncUnordered(8, integers -> CompletableFuture.supplyAsync(() -> integers.stream() .mapToDouble(v -> v) .average() .orElse(-1.0))); }

Stiamo calcolando le medie in otto thread paralleli. Tieni presente che stiamo utilizzando l'API Java 8 Stream per calcolare una media.

4.3. Composizione di più flussi in un unico flusso

L' API Flow è un'astrazione fluente che ci consente di comporre più istanze di Flow per raggiungere il nostro obiettivo di elaborazione finale . Possiamo avere flussi granulari in cui uno, ad esempio, sta analizzando JSON, un altro sta eseguendo alcune trasformazioni e un altro sta raccogliendo alcune statistiche.

Tale granularità ci aiuterà a creare un codice più testabile perché possiamo testare ogni fase di elaborazione in modo indipendente.

Abbiamo creato due flussi sopra che possono funzionare indipendentemente l'uno dall'altro. Ora vogliamo comporli insieme.

Innanzitutto, vogliamo analizzare la nostra stringa di input e, successivamente, vogliamo calcolare una media su un flusso di elementi.

Possiamo comporre i nostri flussi utilizzando il metodo via () :

Flow calculateAverage() { return Flow.of(String.class) .via(parseContent()) .via(computeAverage()); }

Abbiamo creato un flusso con il tipo di input String e altri due flussi dopo. Il flusso parseContent () accetta un input String e restituisce un Integer come output. Il computeAverage () Flusso sta prendendo che integer e calcola una media ritorno doppio come tipo di output.

5. Aggiunta di lavandino al flusso

Come abbiamo accennato, a questo punto l'intero Flow non è ancora stato eseguito perché è pigro. Per iniziare l'esecuzione del flusso dobbiamo definire un Sink . L' operazione Sink può, ad esempio, salvare i dati in un database o inviare i risultati a qualche servizio web esterno.

Supponiamo di avere una classe AverageRepository con il seguente metodo save () che scrive i risultati nel nostro database:

CompletionStage save(Double average) { return CompletableFuture.supplyAsync(() -> { // write to database return average; }); }

Ora, vogliamo creare un'operazione Sink che utilizzi questo metodo per salvare i risultati della nostra elaborazione del flusso . Per creare il nostro sink, dobbiamo prima creare un flusso che prenda un risultato della nostra elaborazione come tipo di input . Successivamente, vogliamo salvare tutti i nostri risultati nel database.

Ancora una volta, non ci interessa l'ordinamento degli elementi, quindi possiamo eseguire le operazioni save () in parallelo utilizzando il metodo mapAsyncUnordered () .

Per creare un Sink dal Flow dobbiamo chiamare toMat () con Sink.ignore () come primo argomento e Keep.right () come secondo perché vogliamo restituire uno stato dell'elaborazione:

private Sink
    
      storeAverages() { return Flow.of(Double.class) .mapAsyncUnordered(4, averageRepository::save) .toMat(Sink.ignore(), Keep.right()); }
    

6. Definizione di una sorgente per il flusso

L'ultima cosa che dobbiamo fare è creare una sorgente dalla stringa di input . Possiamo applicare un flusso calcolareAverage () a questa sorgente utilizzando il metodo via () .

Quindi, per aggiungere il Sink all'elaborazione, dobbiamo chiamare il metodo runWith () e passare il Sink storeAverages () che abbiamo appena creato:

CompletionStage calculateAverageForContent(String content) { return Source.single(content) .via(calculateAverage()) .runWith(storeAverages(), ActorMaterializer.create(actorSystem)) .whenComplete((d, e) -> { if (d != null) { System.out.println("Import finished "); } else { e.printStackTrace(); } }); }

Note that when the processing is finished we are adding the whenComplete() callback, in which we can perform some action depending on the outcome of the processing.

7. Testing Akka Streams

We can test our processing using the akka-stream-testkit.

The best way to test the actual logic of the processing is to test all Flow logic and use TestSink to trigger the computation and assert on the results.

In our test, we are creating the Flow that we want to test, and next, we are creating a Source from the test input content:

@Test public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() { // given Flow tested = new DataImporter(actorSystem).calculateAverage(); String input = "1;9;11;0"; // when Source flow = Source.single(input).via(tested); // then flow .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem)) .request(4) .expectNextUnordered(5d, 5.5); }

We are checking that we are expecting four input arguments, and two results that are averages can arrive in any order because our processing is done in the asynchronous and parallel way.

8. Conclusion

In this article, we were looking at the akka-stream library.

We defined a process that combines multiple Flows to calculate moving average of elements. Then, we defined a Source that is an entry point of the stream processing and a Sink that triggers the actual processing.

Finally, we wrote a test for our processing using the akka-stream-testkit.

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.