Introduzione a Reactor Core

1. Introduzione

Reactor Core è una libreria Java 8 che implementa il modello di programmazione reattiva. È costruito sulla base della specifica Reactive Streams, uno standard per la creazione di applicazioni reattive.

Dal background dello sviluppo Java non reattivo, diventare reattivo può essere una curva di apprendimento piuttosto ripida. Questo diventa più impegnativo se confrontato con l' API Java 8 Stream , poiché potrebbero essere scambiate per le stesse astrazioni di alto livello.

In questo articolo tenteremo di demistificare questo paradigma. Faremo piccoli passi attraverso Reactor fino a quando non avremo costruito un'immagine di come comporre codice reattivo, gettando le basi per articoli più avanzati che verranno in una serie successiva.

2. Specifiche dei flussi reattivi

Prima di esaminare Reactor, dovremmo esaminare la specifica Reactive Streams. Questo è ciò che Reactor implementa e getta le basi per la libreria.

Essenzialmente, Reactive Streams è una specifica per l'elaborazione di flussi asincroni.

In altre parole, un sistema in cui vengono prodotti e consumati molti eventi in modo asincrono. Pensa a un flusso di migliaia di aggiornamenti azionari al secondo che arrivano in un'applicazione finanziaria e che deve rispondere a tali aggiornamenti in modo tempestivo.

Uno degli obiettivi principali di questa operazione è affrontare il problema della contropressione. Se abbiamo un produttore che sta emettendo eventi a un consumatore più velocemente di quanto possa elaborarli, alla fine il consumatore sarà sopraffatto dagli eventi, esaurendo le risorse di sistema.

La contropressione significa che il nostro consumatore dovrebbe essere in grado di dire al produttore quanti dati inviare per evitare ciò, e questo è ciò che è stabilito nelle specifiche.

3. Dipendenze di Maven

Prima di iniziare, aggiungiamo le nostre dipendenze Maven:

 io.projectreactor reactor-core 3.3.9.RELEASE   ch.qos.logback logback-classic 1.1.3 

Stiamo anche aggiungendo Logback come dipendenza. Questo perché registreremo l'output di Reactor per comprendere meglio il flusso di dati.

4. Produzione di un flusso di dati

Affinché un'applicazione sia reattiva, la prima cosa che deve essere in grado di fare è produrre un flusso di dati.

Questo potrebbe essere qualcosa di simile all'esempio di aggiornamento delle scorte fornito in precedenza. Senza questi dati, non avremmo nulla a cui reagire, motivo per cui questo è un primo passo logico.

Reactive Core ci fornisce due tipi di dati che ci consentono di farlo.

4.1. Flusso

Il primo modo per farlo è con un Flux. È un flusso che può emettere 0..n elementi. Proviamo a crearne uno semplice:

Flux just = Flux.just(1, 2, 3, 4);

In questo caso, abbiamo un flusso statico di quattro elementi.

4.2. Mono

Il secondo modo per farlo è con un Mono, che è un flusso di 0..1 elementi. Proviamo a istanziarne uno:

Mono just = Mono.just(1);

Questo sembra e si comporta quasi esattamente come il Flux , solo che questa volta siamo limitati a non più di un elemento.

4.3. Perché non solo Flux?

Prima di sperimentare ulteriormente, vale la pena sottolineare perché abbiamo questi due tipi di dati.

Innanzitutto, va notato che sia Flux che Mono sono implementazioni dell'interfaccia Reactive Streams Publisher . Entrambe le classi sono conformi alle specifiche e potremmo usare questa interfaccia al loro posto:

Publisher just = Mono.just("foo");

Ma in realtà, conoscere questa cardinalità è utile. Questo perché poche operazioni hanno senso solo per uno dei due tipi e perché può essere più espressivo (immagina findOne () in un repository).

5. Sottoscrizione a uno stream

Ora abbiamo una panoramica di alto livello su come produrre un flusso di dati, dobbiamo iscriverci per far sì che emetta gli elementi.

5.1. Raccolta di elementi

Usiamo il metodo subscribe () per raccogliere tutti gli elementi in uno stream:

List elements = new ArrayList(); Flux.just(1, 2, 3, 4) .log() .subscribe(elements::add); assertThat(elements).containsExactly(1, 2, 3, 4);

I dati non inizieranno a fluire fino a quando non ci iscriveremo. Si noti che abbiamo aggiunto anche alcune registrazioni, che saranno utili quando guarderemo cosa sta succedendo dietro le quinte.

5.2. Il flusso degli elementi

Con la registrazione in atto, possiamo usarlo per visualizzare come i dati fluiscono attraverso il nostro flusso:

20:25:19.550 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | request(unbounded) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(1) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(2) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(3) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(4) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onComplete()

Prima di tutto, tutto è in esecuzione sul thread principale. Non entriamo nei dettagli su questo, poiché daremo un'ulteriore occhiata alla concorrenza più avanti in questo articolo. Rende le cose semplici, tuttavia, poiché possiamo gestire tutto in ordine.

Ora passiamo alla sequenza che abbiamo registrato uno per uno:

  1. onSubscribe () - Viene chiamato quando ci iscriviamo al nostro flusso
  2. request(unbounded) – When we call subscribe, behind the scenes we are creating a Subscription. This subscription requests elements from the stream. In this case, it defaults to unbounded, meaning it requests every single element available
  3. onNext() – This is called on every single element
  4. onComplete() – This is called last, after receiving the last element. There's actually a onError() as well, which would be called if there is an exception, but in this case, there isn't

This is the flow laid out in the Subscriber interface as part of the Reactive Streams Specification, and in reality, that's what's been instantiated behind the scenes in our call to onSubscribe(). It's a useful method, but to better understand what's happening let's provide a Subscriber interface directly:

Flux.just(1, 2, 3, 4) .log() .subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { elements.add(integer); } @Override public void onError(Throwable t) {} @Override public void onComplete() {} });

We can see that each possible stage in the above flow maps to a method in the Subscriber implementation. It just happens that the Flux has provided us with a helper method to reduce this verbosity.

5.3. Comparison to Java 8 Streams

It still might appear that we have something synonymous to a Java 8 Stream doing collect:

List collected = Stream.of(1, 2, 3, 4) .collect(toList());

Only we don't.

The core difference is that Reactive is a push model, whereas the Java 8 Streams are a pull model. In a reactive approach, events are pushed to the subscribers as they come in.

The next thing to notice is a Streams terminal operator is just that, terminal, pulling all the data and returning a result. With Reactive we could have an infinite stream coming in from an external resource, with multiple subscribers attached and removed on an ad hoc basis. We can also do things like combine streams, throttle streams and apply backpressure, which we will cover next.

6. Backpressure

The next thing we should consider is backpressure. In our example, the subscriber is telling the producer to push every single element at once. This could end up becoming overwhelming for the subscriber, consuming all of its resources.

Backpressure is when a downstream can tell an upstream to send it fewer data in order to prevent it from being overwhelmed.

We can modify our Subscriber implementation to apply backpressure. Let's tell the upstream to only send two elements at a time by using request():

Flux.just(1, 2, 3, 4) .log() .subscribe(new Subscriber() { private Subscription s; int onNextAmount; @Override public void onSubscribe(Subscription s) { this.s = s; s.request(2); } @Override public void onNext(Integer integer) { elements.add(integer); onNextAmount++; if (onNextAmount % 2 == 0) { s.request(2); } } @Override public void onError(Throwable t) {} @Override public void onComplete() {} });

Now if we run our code again, we'll see the request(2) is called, followed by two onNext() calls, then request(2) again.

23:31:15.395 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 23:31:15.397 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.397 [main] INFO reactor.Flux.Array.1 - | onNext(1) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(3) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(4) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onComplete()

Essentially, this is reactive pull backpressure. We are requesting the upstream to only push a certain amount of elements, and only when we are ready.

If we imagine we were being streamed tweets from twitter, it would then be up to the upstream to decide what to do. If tweets were coming in but there are no requests from the downstream, then the upstream could drop items, store them in a buffer, or some other strategy.

7. Operating on a Stream

We can also perform operations on the data in our stream, responding to events as we see fit.

7.1. Mapping Data in a Stream

A simple operation that we can perform is applying a transformation. In this case, let's just double all the numbers in our stream:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .subscribe(elements::add);

map() will be applied when onNext() is called.

7.2. Combining Two Streams

We can then make things more interesting by combining another stream with this one. Let's try this by using zip() function:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .zipWith(Flux.range(0, Integer.MAX_VALUE), (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two)) .subscribe(elements::add); assertThat(elements).containsExactly( "First Flux: 2, Second Flux: 0", "First Flux: 4, Second Flux: 1", "First Flux: 6, Second Flux: 2", "First Flux: 8, Second Flux: 3");

Here, we are creating another Flux that keeps incrementing by one and streaming it together with our original one. We can see how these work together by inspecting the logs:

20:04:38.064 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:04:38.065 [main] INFO reactor.Flux.Array.1 - | onNext(1) 20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onNext(0) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(2) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(1) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(3) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(2) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(4) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(3) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onComplete() 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | cancel() 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | cancel()

Note how we now have one subscription per Flux. The onNext() calls are also alternated, so the index of each element in the stream will match when we apply the zip() function.

8. Hot Streams

Currently, we've focused primarily on cold streams. These are static, fixed-length streams that are easy to deal with. A more realistic use case for reactive might be something that happens infinitely.

For example, we could have a stream of mouse movements that constantly needs to be reacted to or a twitter feed. These types of streams are called hot streams, as they are always running and can be subscribed to at any point in time, missing the start of the data.

8.1. Creating a ConnectableFlux

One way to create a hot stream is by converting a cold stream into one. Let's create a Flux that lasts forever, outputting the results to the console, which would simulate an infinite stream of data coming from an external resource:

ConnectableFlux publish = Flux.create(fluxSink -> { while(true) { fluxSink.next(System.currentTimeMillis()); } }) .publish();

By calling publish() we are given a ConnectableFlux. This means that calling subscribe() won't cause it to start emitting, allowing us to add multiple subscriptions:

publish.subscribe(System.out::println); publish.subscribe(System.out::println);

If we try running this code, nothing will happen. It's not until we call connect(), that the Flux will start emitting:

publish.connect();

8.2. Throttling

If we run our code, our console will be overwhelmed with logging. This is simulating a situation where too much data is being passed to our consumers. Let's try getting around this with throttling:

ConnectableFlux publish = Flux.create(fluxSink -> { while(true) { fluxSink.next(System.currentTimeMillis()); } }) .sample(ofSeconds(2)) .publish();

Here, we've introduced a sample() method with an interval of two seconds. Now values will only be pushed to our subscriber every two seconds, meaning the console will be a lot less hectic.

Of course, there are multiple strategies to reduce the amount of data sent downstream, such as windowing and buffering, but they will be left out of scope for this article.

9. Concurrency

All of our above examples have currently run on the main thread. However, we can control which thread our code runs on if we want. The Scheduler interface provides an abstraction around asynchronous code, for which many implementations are provided for us. Let's try subscribing to a different thread to main:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .subscribeOn(Schedulers.parallel()) .subscribe(elements::add);

The Parallel scheduler will cause our subscription to be run on a different thread, which we can prove by looking at the logs. We see the first entry comes from the main thread and the Flux is running in another thread called parallel-1.

20:03:27.505 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 20:03:27.529 [parallel-1] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | request(unbounded) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(1) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(2) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(3) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(4) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onComplete()

Concurrency get's more interesting than this, and it will be worth us exploring it in another article.

10. Conclusion

In questo articolo, abbiamo fornito una panoramica end-to-end di alto livello di Reactive Core. Abbiamo spiegato come possiamo pubblicare e sottoscrivere flussi, applicare contropressione, operare sui flussi e anche gestire i dati in modo asincrono. Si spera che questo dovrebbe gettare le basi per scrivere applicazioni reattive.

Gli articoli successivi di questa serie tratteranno di concorrenza più avanzata e altri concetti reattivi. C'è anche un altro articolo che riguarda Reactor with Spring.

Il codice sorgente della nostra applicazione è disponibile su GitHub; questo è un progetto Maven che dovrebbe essere in grado di funzionare così com'è.