Java 9 Reactive Streams

1. Panoramica

In questo articolo, esamineremo i Java 9 Reactive Streams. In poche parole, saremo in grado di utilizzare la classe Flow , che racchiude gli elementi costitutivi principali per la creazione della logica di elaborazione del flusso reattivo.

Reactive Streams è uno standard per l'elaborazione di flussi asincroni con contropressione non bloccante. Questa specifica è definita nel Reactive Manifesto e ci sono varie implementazioni di essa, ad esempio, RxJava o Akka-Streams.

2. Panoramica dell'API reattiva

Per costruire un flusso , possiamo utilizzare tre astrazioni principali e comporle in una logica di elaborazione asincrona.

Ogni flusso deve elaborare gli eventi che vengono pubblicati su di esso da un'istanza di Publisher ; l' editore ha un metodo: subscribe ().

Se uno qualsiasi degli abbonati desidera ricevere eventi da esso pubblicati, deve iscriversi al determinato Editore.

Il destinatario dei messaggi deve implementare l' interfaccia del Sottoscrittore . In genere questa è la fine di ogni elaborazione del flusso perché l'istanza di esso non invia ulteriori messaggi.

Possiamo pensare all'Abbonato come a un Sink. Questo ha quattro metodi che devono essere sovrascritti: onSubscribe (), onNext (), onError () e onComplete (). Li esamineremo nella prossima sezione.

Se vogliamo trasformare il messaggio in arrivo e passarlo ulteriormente al successivo Sottoscrittore, dobbiamo implementare l' interfaccia del processore . Funziona sia come Sottoscrittore perché riceve messaggi, sia come Editore perché elabora quei messaggi e li invia per un'ulteriore elaborazione.

3. Pubblicazione e consumo di messaggi

Supponiamo di voler creare un flusso semplice , in cui abbiamo un editore che pubblica i messaggi e un semplice Sottoscrittore che consuma i messaggi man mano che arrivano, uno alla volta.

Creiamo una classe EndSubscriber . Dobbiamo implementare l' interfaccia del Sottoscrittore . Successivamente, sovrascriveremo i metodi richiesti.

Il metodo onSubscribe () viene chiamato prima dell'inizio dell'elaborazione. L'istanza della sottoscrizione viene passata come argomento. È una classe utilizzata per controllare il flusso di messaggi tra il Sottoscrittore e il Publisher:

public class EndSubscriber implements Subscriber { private Subscription subscription; public List consumedElements = new LinkedList(); @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } }

Abbiamo anche inizializzato un elenco vuoto di consumedElements che verrà utilizzato nei test.

Ora, dobbiamo implementare i metodi rimanenti dall'interfaccia del Sottoscrittore . Il metodo principale qui è onNext () - viene chiamato ogni volta che l' editore pubblica un nuovo messaggio:

@Override public void onNext(T item) { System.out.println("Got : " + item); subscription.request(1); }

Si noti che quando abbiamo iniziato la sottoscrizione nel onSubscribe () metodo e quando abbiamo elaborato un messaggio che dobbiamo chiamare la richiesta () metodo sulla sottoscrizione al segnale che l'attuale abbonato è pronto a consumare più messaggi.

Infine, dobbiamo implementare onError () - che viene chiamato ogni volta che verrà generata un'eccezione durante l'elaborazione, così come onComplete () - chiamato quando il Publisher è chiuso:

@Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { System.out.println("Done"); }

Scriviamo un test per il flusso di elaborazione . Useremo la classe SubmissionPublisher , un costrutto di java.util.concurrent , che implementa l' interfaccia di Publisher .

Stiamo per essere presentando N elementi per l' Editore - che il nostro EndSubscriber riceverà:

@Test public void whenSubscribeToIt_thenShouldConsumeAll() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); EndSubscriber subscriber = new EndSubscriber(); publisher.subscribe(subscriber); List items = List.of("1", "x", "2", "x", "3", "x"); // when assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until( () -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(items) ); }

Nota che stiamo chiamando il metodo close () sull'istanza di EndSubscriber. Esso richiamerà onComplete () richiamata sotto su ogni abbonato della data di pubblicazione.

L'esecuzione di quel programma produrrà il seguente output:

Got : 1 Got : x Got : 2 Got : x Got : 3 Got : x Done

4. Trasformazione dei messaggi

Supponiamo di voler creare una logica simile tra un publisher e un abbonato , ma anche applicare alcune trasformazioni.

Creeremo la classe TransformProcessor che implementa Processor ed estende SubmissionPublisher, poiché sarà sia P ublisher che S ubscriber.

Passeremo una funzione che trasformerà gli input in output:

public class TransformProcessor extends SubmissionPublisher implements Flow.Processor { private Function function; private Flow.Subscription subscription; public TransformProcessor(Function function) { super(); this.function = function; } @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { submit(function.apply(item)); subscription.request(1); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { close(); } }

Scriviamo ora un rapido test con un flusso di elaborazione in cui l' editore pubblica elementi String .

Il nostro TransformProcessor analizzerà la stringa come Integer , il che significa che una conversione deve avvenire qui:

@Test public void whenSubscribeAndTransformElements_thenShouldConsumeAll() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); TransformProcessor transformProcessor = new TransformProcessor(Integer::parseInt); EndSubscriber subscriber = new EndSubscriber(); List items = List.of("1", "2", "3"); List expectedResult = List.of(1, 2, 3); // when publisher.subscribe(transformProcessor); transformProcessor.subscribe(subscriber); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until(() -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(expectedResult) ); }

Notare che la chiamata al metodo close () sul Publisher di base provocherà il richiamo del metodo onComplete () sul TransformProcessor .

Tieni presente che tutti gli editori nella catena di elaborazione devono essere chiusi in questo modo.

5. Controllo della richiesta di messaggi tramite l' abbonamento

Diciamo che vogliamo consumare solo il primo elemento dalla sottoscrizione, applicare una logica e terminare l'elaborazione. Possiamo usare il metodo request () per ottenere ciò.

Modifichiamo il nostro EndSubscriber per consumare solo N numero di messaggi. Passeremo quel numero come argomento del costruttore howMuchMessagesConsume :

public class EndSubscriber implements Subscriber { private AtomicInteger howMuchMessagesConsume; private Subscription subscription; public List consumedElements = new LinkedList(); public EndSubscriber(Integer howMuchMessagesConsume) { this.howMuchMessagesConsume = new AtomicInteger(howMuchMessagesConsume); } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { howMuchMessagesConsume.decrementAndGet(); System.out.println("Got : " + item); consumedElements.add(item); if (howMuchMessagesConsume.get() > 0) { subscription.request(1); } } //... }

Possiamo richiedere elementi per tutto il tempo che vogliamo.

Scriviamo un test in cui vogliamo consumare solo un elemento dall'abbonamento dato :

@Test public void whenRequestForOnlyOneElement_thenShouldConsumeOne() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); EndSubscriber subscriber = new EndSubscriber(1); publisher.subscribe(subscriber); List items = List.of("1", "x", "2", "x", "3", "x"); List expected = List.of("1"); // when assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until(() -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(expected) ); }

Sebbene l' editore pubblichi sei elementi, il nostro EndSubscriber utilizzerà solo un elemento perché segnala la richiesta di elaborazione solo per quello singolo.

Utilizzando il metodo request () sulla sottoscrizione, possiamo implementare un meccanismo di contropressione più sofisticato per controllare la velocità di consumo del messaggio.

6. Conclusione

In questo articolo, abbiamo dato uno sguardo ai Java 9 Reactive Streams.

Abbiamo visto come creare un flusso di elaborazione composto da un editore e un abbonato. Abbiamo creato un flusso di elaborazione più complesso con la trasformazione degli elementi tramite Processori .

Infine, abbiamo utilizzato l' abbonamento per controllare la richiesta di elementi da parte dell'abbonato.

L'implementazione di tutti questi esempi e frammenti di codice può essere trovata nel progetto GitHub: questo è un progetto Maven, quindi dovrebbe essere facile da importare ed eseguire così com'è.