La differenza tra l'API RxJava e l'API Java 9 Flow

1. Introduzione

Java Flow API è stata introdotta in Java 9 come implementazione di Reactive Stream Specification.

In questo tutorial, esamineremo prima i flussi reattivi. Quindi, impareremo la sua relazione con RxJava e Flow API.

2. Cosa sono i flussi reattivi?

Il Reactive Manifesto ha introdotto Reactive Streams per specificare uno standard per l'elaborazione di flussi asincroni con contropressione non bloccante.

Lo scopo della specifica Reactive Stream è definire un set minimo di interfacce per raggiungere questi obiettivi:

  • org.reactivestreams.Publisher è un fornitore di dati che pubblica i dati agli abbonati in base alla loro richiesta

  • org.reactivestreams.Subscriber è il consumatore di dati: può ricevere dati dopo essersi abbonato a un editore

  • org.reactivestreams.Subscription viene creato quando un editore accetta un abbonato

  • org.reactivestreams.Processor è sia un abbonato che un editore: si iscrive a un editore, elabora i dati e quindi passa i dati elaborati al sottoscrittore

L'API di flusso ha origine dalla specifica. RxJava lo precede, ma dalla 2.0 anche RxJava ha supportato le specifiche.

Entreremo in profondità in entrambi, ma prima, vediamo un caso d'uso pratico.

3. Caso d'uso

Per questo tutorial, utilizzeremo un servizio di video in live streaming come caso d'uso.

Un video in streaming live, contrariamente allo streaming video on demand, non dipende dal consumatore. Pertanto il server pubblica il flusso al proprio ritmo ed è responsabilità del consumatore adattarsi.

Nella forma più semplice, il nostro modello consiste in un editore di flusso video e un lettore video come abbonato.

Implementiamo VideoFrame come nostro elemento dati:

public class VideoFrame { private long number; // additional data fields // constructor, getters, setters }

Quindi esaminiamo la nostra API Flow e le implementazioni RxJava una alla volta.

4. Implementazione con Flow API

Le API Flow in JDK 9 corrispondono alla specifica Reactive Streams. Con l'API Flow, se l'applicazione richiede inizialmente N elementi, l'editore invia al sottoscrittore al massimo N elementi.

Le interfacce API Flow si trovano tutte nell'interfaccia java.util.concurrent.Flow . Sono semanticamente equivalenti alle rispettive controparti Reactive Streams.

Implementiamo VideoStreamServer come editore di VideoFrame .

public class VideoStreamServer extends SubmissionPublisher { public VideoStreamServer() { super(Executors.newSingleThreadExecutor(), 5); } }

Abbiamo esteso il nostro VideoStreamServer da SubmissionPublisher invece di implementare direttamente Flow :: Publisher. SubmissionPublisher è l'implementazione JDK di Flow :: Publisher per la comunicazione asincrona con gli abbonati, quindi consente al nostro VideoStreamServer di emettere al proprio ritmo.

Inoltre, è utile per la contropressione e la gestione del buffer, perché quando SubmissionPublisher :: subscribe viene chiamato, crea un'istanza di BufferedSubscription e quindi aggiunge la nuova sottoscrizione alla sua catena di sottoscrizioni. BufferedSubscription può memorizzare nel buffer gli elementi emessi fino a SubmissionPublisher # maxBufferCapacity .

Ora definiamo VideoPlayer, che consuma un flusso di VideoFrame. Quindi deve implementare Flow :: Subscriber .

public class VideoPlayer implements Flow.Subscriber { Flow.Subscription subscription = null; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(VideoFrame item) { log.info("play #{}" , item.getNumber()); subscription.request(1); } @Override public void onError(Throwable throwable) { log.error("There is an error in video streaming:{}" , throwable.getMessage()); } @Override public void onComplete() { log.error("Video has ended"); } }

VideoPlayer si iscrive a VideoStreamServer, quindi , dopo un abbonamento riuscito , viene chiamato il metodo VideoPlayer :: onSubscribe e richiede un frame. VideoPlayer :: onNext riceve il frame e ne richiede uno nuovo. Il numero di frame richiesti dipende dal caso d'uso e dalle implementazioni dell'abbonato .

Infine, mettiamo insieme le cose:

VideoStreamServer streamServer = new VideoStreamServer(); streamServer.subscribe(new VideoPlayer()); // submit video frames ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); AtomicLong frameNumber = new AtomicLong(); executor.scheduleWithFixedDelay(() -> { streamServer.offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> { subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber() + " droped because of backpressure")); return true; }); }, 0, 1, TimeUnit.MILLISECONDS); sleep(1000);

5. Implementazione con RxJava

RxJava è un'implementazione Java di ReactiveX. Il progetto ReactiveX (o Reactive Extensions) mira a fornire un concetto di programmazione reattiva. È una combinazione del pattern Observer, del pattern Iterator e della programmazione funzionale.

L'ultima versione principale di RxJava è 3.x. RxJava supporta Reactive Streams dalla versione 2.x con la sua classe base Flowable , ma è un insieme più significativo di Reactive Streams con diverse classi base come Flowable , Observable , Single , Completable.

Fluido come componente di conformità del flusso reattivo è un flusso da 0 a N elementi con gestione della contropressione. Flowable estende Publisher da Reactive Streams. Pertanto molti operatori RxJava accettano direttamente Publisher e consentono l'interoperabilità diretta con altre implementazioni di Reactive Streams.

Ora, creiamo il nostro generatore di flussi video che è un flusso pigro infinito:

Stream videoStream = Stream.iterate(new VideoFrame(0), videoFrame -> { // sleep for 1ms; return new VideoFrame(videoFrame.getNumber() + 1); });

Quindi definiamo un'istanza Flowable per generare frame su un thread separato:

Flowable .fromStream(videoStream) .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))

È importante notare che un flusso infinito è sufficiente per noi, ma se abbiamo bisogno di un modo più flessibile per generare il nostro flusso, Flowable.create è una buona scelta.

Flowable .create(new FlowableOnSubscribe() { AtomicLong frame = new AtomicLong(); @Override public void subscribe(@NonNull FlowableEmitter emitter) { while (true) { emitter.onNext(new VideoFrame(frame.incrementAndGet())); //sleep for 1 ms to simualte delay } } }, /* Set Backpressure Strategy Here */)

Quindi, al passaggio successivo, VideoPlayer si iscrive a questo Flowable e osserva gli elementi su un thread separato.

videoFlowable .observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) .subscribe(item -> { log.info("play #" + item.getNumber()); // sleep for 30 ms to simualate frame display });

Infine, configureremo la strategia per la contropressione. Se vogliamo fermare il video in caso di perdita di frame, dobbiamo quindi usare BackpressureOverflowStrategy :: ERROR quando il buffer è pieno.

Flowable .fromStream(videoStream) .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor())) .onBackpressureBuffer(5, null, BackpressureOverflowStrategy.ERROR) .observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) .subscribe(item -> { log.info("play #" + item.getNumber()); // sleep for 30 ms to simualate frame display });

6. Confronto tra RxJava e Flow API

Even in these two simple implementations, we can see how RxJava's API is rich, especially for buffer management, error handling, and backpressure strategy. It gives us more options and fewer lines of code with its fluent API. Now let's consider more complicated cases.

Assume that our player can't display video frames without a codec. Hence with Flow API, we need to implement a Processor to simulate the codec and sit between server and player. With RxJava, we can do it with Flowable::flatMap or Flowable::map.

Or let's imagine that our player is also going to broadcast live translation audio, so we have to combine streams of video and audio from separate publishers. With RxJava, we can use Flowable::combineLatest, but with Flow API, it is not an easy task.

Although, it is possible to write a custom Processor that subscribes to both streams and sends the combined data to our VideoPlayer. The implementation is a headache, however.

7. Why Flow API?

At this point, we may have a question, what is the philosophy behind the Flow API?

If we search for Flow API usages in the JDK, we can find something in java.net.http and jdk.internal.net.http.

Furthermore, we can find adapters in the reactor project or reactive stream package. For example, org.reactivestreams.FlowAdapters has methods for converting Flow API interfaces to Reactive Stream ones and vice-versa. Therefore it helps the interoperability between Flow API and libraries with reactive stream support.

All of these facts help us to understand the purpose of Flow API: It was created to be a group of reactive specification interfaces in JDK without relay on third parties. Moreover, Java expects Flow API to be accepted as standard interfaces for reactive specification and to be used in JDK or other Java-based libraries that implement the reactive specification for middlewares and utilities.

8. Conclusions

In questo tutorial, abbiamo un'introduzione a Reactive Stream Specification, Flow API e RxJava.

Inoltre, abbiamo visto un esempio pratico di implementazioni di API Flow e RxJava per un flusso video live.

Ma tutti gli aspetti di Flow API e RxJava come Flow :: Processor , Flowable :: map e Flowable :: flatMap o strategie di contropressione non sono trattati qui.

Come sempre, trovi il codice completo del tutorial su GitHub.