Combinazione di osservabili in RxJava

1. Introduzione

In questo rapido tutorial, discuteremo diversi modi di combinare gli osservabili in RxJava.

Se sei nuovo su RxJava, controlla prima questo tutorial introduttivo.

Ora, entriamo subito.

2. Osservabili

Le sequenze osservabili , o semplicemente osservabili , sono rappresentazioni di flussi di dati asincroni.

Questi sono basati sul modello Observer in cui un oggetto chiamato Observer , sottoscrive gli elementi emessi da un Observable .

L'abbonamento non è bloccante poiché l' Observer è pronto a reagire a qualsiasi cosa l' Osservabile emetterà in futuro. Questo, a sua volta, facilita la concorrenza.

Ecco una semplice dimostrazione in RxJava:

Observable .from(new String[] { "John", "Doe" }) .subscribe(name -> System.out.println("Hello " + name))

3. Combinazione di osservabili

Quando si programma utilizzando un framework reattivo, è un caso d'uso comune combinare vari osservabili .

In un'applicazione Web, ad esempio, potrebbe essere necessario ottenere due set di flussi di dati asincroni indipendenti l'uno dall'altro.

Invece di attendere il completamento del flusso precedente prima di richiedere il flusso successivo, possiamo chiamare entrambi contemporaneamente e iscriverti ai flussi combinati.

In questa sezione, discuteremo alcuni dei diversi modi in cui possiamo combinare più osservabili in RxJava e i diversi casi d'uso a cui si applica ciascun metodo.

3.1. Unisci

Possiamo usare l' operatore di unione per combinare l'output di più osservabili in modo che agiscano come uno:

@Test public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() { TestSubscriber testSubscriber = new TestSubscriber(); Observable.merge( Observable.from(new String[] {"Hello", "World"}), Observable.from(new String[] {"I love", "RxJava"}) ).subscribe(testSubscriber); testSubscriber.assertValues("Hello", "World", "I love", "RxJava"); }

3.2. MergeDelayError

Il metodo mergeDelayError è lo stesso di merge in quanto combina più osservabili in uno, ma se si verificano errori durante l'unione, consente agli elementi privi di errori di continuare prima di propagare gli errori :

@Test public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() { TestSubscriber testSubscriber = new TestSubscriber(); Observable.mergeDelayError( Observable.from(new String[] { "hello", "world" }), Observable.error(new RuntimeException("Some exception")), Observable.from(new String[] { "rxjava" }) ).subscribe(testSubscriber); testSubscriber.assertValues("hello", "world", "rxjava"); testSubscriber.assertError(RuntimeException.class); }

L'esempio precedente genera tutti i valori privi di errori :

hello world rxjava

Nota che se usiamo merge invece di mergeDelayError , la stringa " rxjava" non verrà emessa perché merge interrompe immediatamente il flusso di dati da Observables quando si verifica un errore.

3.3. Cerniera lampo

Il metodo di estensione zip riunisce due sequenze di valori come coppie :

@Test public void givenTwoObservables_whenZipped_thenReturnCombinedResults() { List zippedStrings = new ArrayList(); Observable.zip( Observable.from(new String[] { "Simple", "Moderate", "Complex" }), Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}), (str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add); assertThat(zippedStrings).isNotEmpty(); assertThat(zippedStrings.size()).isEqualTo(3); assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy"); }

3.4. Zip con intervallo

In questo esempio, comprimeremo un flusso con intervallo che in effetti ritarderà l'emissione di elementi del primo flusso:

@Test public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() { TestSubscriber testSubscriber = new TestSubscriber(); Observable data = Observable.just("one", "two", "three", "four", "five"); Observable interval = Observable.interval(1L, TimeUnit.SECONDS); Observable .zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData)) .toBlocking().subscribe(testSubscriber); testSubscriber.assertCompleted(); testSubscriber.assertValueCount(5); testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five"); }

4. Riepilogo

In questo articolo, abbiamo visto alcuni dei metodi per combinare Observables con RxJava. Puoi conoscere altri metodi come combinationLatest , join , groupJoin , switchOnNext , nella documentazione ufficiale di RxJava.

Come sempre, il codice sorgente di questo articolo è disponibile nel nostro repository GitHub.