Filtraggio degli osservabili in RxJava

1. Introduzione

Dopo l'introduzione a RxJava, esamineremo gli operatori di filtro.

In particolare, ci concentreremo sul filtraggio, il salto, il filtraggio temporale e alcune operazioni di filtraggio più avanzate.

2. Filtraggio

Quando si lavora con Observable , a volte è utile selezionare solo un sottoinsieme di elementi emessi. A tal fine, RxJava offre varie funzionalità di filtro .

Cominciamo a guardare il metodo del filtro .

2.1. L' operatore di filtro

In poche parole, l' operatore di filtro filtra un osservabile assicurandosi che gli elementi emessi corrispondano alla condizione specificata , che si presenta sotto forma di un predicato .

Vediamo come possiamo filtrare solo i valori dispari da quelli emessi:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .filter(i -> i % 2 != 0); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 3, 5, 7, 9);

2.2. L' operatore di ripresa

Quando si filtra con take , la logica risulta nell'emissione dei primi n elementi ignorando gli elementi rimanenti.

Vediamo come possiamo filtrare la sorgente Observable ed emettere solo i primi due elementi:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.take(3); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3);

2.3. L' operatore takeWhile

Quando si utilizza takeWhile, l' osservabile filtrato continuerà a emettere elementi fino a quando non incontra un primo elemento che non corrisponde al predicato.

Vediamo come possiamo usare il takeWhile - con un predicato con filtro :

Observable sourceObservable = Observable.just(1, 2, 3, 4, 3, 2, 1); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .takeWhile(i -> i < 4); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3);

2.4. L' operatore takeFirst

Ogni volta che vogliamo emettere solo il primo elemento che corrisponde a una data condizione, possiamo usare takeFirst ().

Diamo una rapida occhiata a come possiamo emettere il primo elemento che è maggiore di 5:

Observable sourceObservable = Observable .just(1, 2, 3, 4, 5, 7, 6); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .takeFirst(x -> x > 5); filteredObservable.subscribe(subscriber); subscriber.assertValue(7);

2.5. operatori first e firstOrDefault

Un comportamento simile può essere ottenuto utilizzando la prima API:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.first(); filteredObservable.subscribe(subscriber); subscriber.assertValue(1);

Tuttavia, nel caso in cui desideriamo specificare un valore predefinito, se non vengono emessi elementi, possiamo usare f irstOrDefault :

Observable sourceObservable = Observable.empty(); Observable filteredObservable = sourceObservable.firstOrDefault(-1); filteredObservable.subscribe(subscriber); subscriber.assertValue(-1);

2.6. L' operatore takeLast

Successivamente, se vogliamo emettere solo gli ultimi n elementi emessi da un Observable , possiamo usare takeLast .

Vediamo come è possibile emettere solo gli ultimi tre elementi:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.takeLast(3); filteredObservable.subscribe(subscriber); subscriber.assertValues(8, 9, 10);

Dobbiamo ricordare che questo ritarda l'emissione di qualsiasi elemento dalla fonte Observable fino al suo completamento.

2.7. last and lastOrDefault

Se vogliamo emettere solo l'ultimo elemento, oltre a usare takeLast (1) , possiamo usare last .

Questo filtra l' Osservabile , emettendo solo l'ultimo elemento, che opzionalmente verifica un Predicato filtrante :

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .last(i -> i % 2 != 0); filteredObservable.subscribe(subscriber); subscriber.assertValue(9);

Nel caso in cui l' Observable sia vuoto, possiamo usare lastOrDefault , che filtra l' Observable emettendo il valore di default.

Il valore predefinito viene emesso anche se viene utilizzato l' operatore lastOrDefault e non ci sono elementi che verificano la condizione di filtraggio:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.lastOrDefault(-1, i -> i > 10); filteredObservable.subscribe(subscriber); subscriber.assertValue(-1);

2.8. elementAt e elementAtOrDefault Operatori

Con l' operatore elementAt possiamo scegliere un singolo elemento emesso dalla sorgente Observable , specificandone l'indice:

Observable sourceObservable = Observable .just(1, 2, 3, 5, 7, 11); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.elementAt(4); filteredObservable.subscribe(subscriber); subscriber.assertValue(7);

Tuttavia, elementAt genererà un'eccezione IndexOutOfBoundException se l'indice specificato supera il numero di elementi emessi.

Per evitare questa situazione, è possibile utilizzare elementAtOrDefault, che restituirà un valore predefinito nel caso in cui l'indice sia fuori intervallo:

Observable sourceObservable = Observable .just(1, 2, 3, 5, 7, 11); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.elementAtOrDefault(7, -1); filteredObservable.subscribe(subscriber); subscriber.assertValue(-1);

2.9. L' operatore ofType

Ogni volta che l' Osservabile emette elementi Oggetto , è possibile filtrarli in base al loro tipo.

Vediamo come possiamo filtrare solo gli elementi di tipo String emessi:

Observable sourceObservable = Observable.just(1, "two", 3, "five", 7, 11); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.ofType(String.class); filteredObservable.subscribe(subscriber); subscriber.assertValues("two", "five");

3. Salto

On the other hand, when we want to filter out or skip some of the items emitted by an Observable, RxJava offers a few operators as a counterpart of the filtering ones, that we've previously discussed.

Let's start looking at the skip operator, the counterpart of take.

3.1. The skip Operator

When an Observable emits a sequence of items, it's possible to filter out or skip some of the firsts emitted items using skip.

For example. let's see how it's possible to skip the first four elements:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.skip(4); filteredObservable.subscribe(subscriber); subscriber.assertValues(5, 6, 7, 8, 9, 10);

3.2. The skipWhile Operator

Whenever we want to filter out all the first values emitted by an Observable that fail a filtering predicate, we can use the skipWhile operator:

Observable sourceObservable = Observable .just(1, 2, 3, 4, 5, 4, 3, 2, 1); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .skipWhile(i -> i < 4); filteredObservable.subscribe(subscriber); subscriber.assertValues(4, 5, 4, 3, 2, 1);

3.3. The skipLast Operator

The skipLast operator allows us to skip the final items emitted by the Observable accepting only those emitted before them.

With this, we can, for example, skip the last five items:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.skipLast(5); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3, 4, 5);

3.4. distinct and distinctUntilChanged Operators

The distinct operator returns an Observable that emits all the items emitted by the sourceObservable that are distinct:

Observable sourceObservable = Observable .just(1, 1, 2, 2, 1, 3, 3, 1); TestSubscriber subscriber = new TestSubscriber(); Observable distinctObservable = sourceObservable.distinct(); distinctObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3);

However, if we want to obtain an Observable that emits all the items emitted by the sourceObservable that are distinct from their immediate predecessor, we can use the distinctUntilChanged operator:

Observable sourceObservable = Observable .just(1, 1, 2, 2, 1, 3, 3, 1); TestSubscriber subscriber = new TestSubscriber(); Observable distinctObservable = sourceObservable.distinctUntilChanged(); distinctObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 1, 3, 1);

3.5. The ignoreElements Operator

Whenever we want to ignore all the elements emitted by the sourceObservable, we can simply use the ignoreElements:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable ignoredObservable = sourceObservable.ignoreElements(); ignoredObservable.subscribe(subscriber); subscriber.assertNoValues();

4. Time Filtering Operators

When working with observable sequence, the time axis is unknown but sometimes getting timely data from a sequence could be useful.

With this purpose, RxJava offers a few methods that allow us to work with Observable using also the time axis.

Before moving on to the first one, let's define a timed Observable that will emit an item every second:

TestScheduler testScheduler = new TestScheduler(); Observable timedObservable = Observable .just(1, 2, 3, 4, 5, 6) .zipWith(Observable.interval( 0, 1, TimeUnit.SECONDS, testScheduler), (item, time) -> item);

The TestScheduler is a special scheduler that allows advancing the clock manually at whatever pace we prefer.

4.1. sample and throttleLast Operators

The sample operator filters the timedObservable, returning an Observable that emits the most recent items emitted by this API within period time intervals.

Let's see how we can sample the timedObservable, filtering only the last emitted item every 2.5 seconds:

TestSubscriber subscriber = new TestSubscriber(); Observable sampledObservable = timedObservable .sample(2500L, TimeUnit.MILLISECONDS, testScheduler); sampledObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(3, 5, 6);

This kind of behavior can be achieved also using the throttleLast operator.

4.2. The throttleFirst Operator

The throttleFirst operator differs from throttleLast/sample since it emits the first item emitted by the timedObservable in each sampling period instead of the most recently emitted one.

Let's see how we can emit the first items, using a sampling period of 4 seconds:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .throttleFirst(4100L, TimeUnit.SECONDS, testScheduler); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(1, 6);

4.3. debounce and throttleWithTimeout Operators

With the debounce operator, it's possible to emit only an item if a particular timespan has passed without emitting another item.

Therefore, if we select a timespan that is greater than the time interval between the emitted items of the timedObservable, it will only emit the last one. On the other hand, if it's smaller, it will emit all the items emitted by the timedObservable.

Let's see what happens in the first scenario:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .debounce(2000L, TimeUnit.MILLISECONDS, testScheduler); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValue(6);

This kind of behavior can also be achieved using throttleWithTimeout.

4.4. The timeout Operator

The timeout operator mirrors the source Observable, but issue a notification error, aborting the emission of items, if the source Observable fails to emit any items during a specified time interval.

Let's see what happens if we specify a timeout of 500 milliseconds to our timedObservable:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .timeout(500L, TimeUnit.MILLISECONDS, testScheduler); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertError(TimeoutException.class); subscriber.assertValues(1);

5. Multiple Observable Filtering

When working with Observable, it's definitely possible to decide if filtering or skipping items based on a second Observable.

Before moving on, let's define a delayedObservable, that will emit only 1 item after 3 seconds:

Observable delayedObservable = Observable.just(1) .delay(3, TimeUnit.SECONDS, testScheduler);

Let's start with takeUntil operator.

5.1. The takeUntil Operator

The takeUntil operator discards any item emitted by the source Observable (timedObservable) after a second Observable (delayedObservable) emits an item or terminates:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .skipUntil(delayedObservable); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(4, 5, 6);

5.2. The skipUntil Operator

On the other hand, skipUntil discards any item emitted by the source Observable (timedObservable) until a second Observable (delayedObservable) emits an item:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .takeUntil(delayedObservable); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(1, 2, 3);

6. Conclusion

In this extensive tutorial, we explored the different filtering operators available within RxJava, providing a simple example of each one.

As always, all the code examples in this article can be found over on GitHub.