Utilità di pianificazione in RxJava

1. Panoramica

In questo articolo, stiamo andando a concentrarsi su diversi tipi di Schedulatori che stiamo andando ad utilizzare in scrittura di programmi multithreading basate su RxJava subscribeOn di osservabili e observeOn metodi.

Gli schedulatori danno l'opportunità di specificare dove e probabilmente quando eseguire attività relative al funzionamento di una catena osservabile .

Possiamo ottenere uno Scheduler dai metodi di fabbrica descritti nella classe Scheduler.

2. Comportamento di threading predefinito

Per impostazione predefinita, Rx è a thread singolo, il che implica che un Observable e la catena di operatori che possiamo applicare ad esso notificheranno i suoi osservatori sullo stesso thread su cui è chiamato il suo metodo subscribe () .

I metodi ObservOn e subscribeOn prendono come argomento uno Scheduler, che, come suggerisce il nome, è uno strumento che possiamo usare per programmare singole azioni.

Creeremo la nostra implementazione di uno Scheduler utilizzando il metodo create Worker , che restituisce uno Scheduler.Worker. Un worker accetta le azioni e le esegue in sequenza su un singolo thread.

In un certo senso, un lavoratore è esso stesso uno S cheduler, ma non lo chiameremo Scheduler per evitare confusione.

2.1. Pianificazione di un'azione

Possiamo pianificare un lavoro su qualsiasi Scheduler creando un nuovo lavoratore e pianificando alcune azioni:

Scheduler scheduler = Schedulers.immediate(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> result += "action"); Assert.assertTrue(result.equals("action"));

L'azione viene quindi accodata sul thread a cui è assegnato il lavoratore.

2.2. Annullamento di un'azione

Scheduler.Worker estende la sottoscrizione . La chiamata al metodo di annullamento dell'iscrizione su un lavoratore comporterà lo svuotamento della coda e l'annullamento di tutte le attività in sospeso. Possiamo vederlo con un esempio:

Scheduler scheduler = Schedulers.newThread(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += "First_Action"; worker.unsubscribe(); }); worker.schedule(() -> result += "Second_Action"); Assert.assertTrue(result.equals("First_Action"));

La seconda attività non viene mai eseguita perché quella precedente ha annullato l'intera operazione. Le azioni che erano in corso di esecuzione verranno interrotte.

3. Schedulers.newThread

Questo scheduler avvia semplicemente un nuovo thread ogni volta che viene richiesto tramite subscribeOn () o ObservOn () .

Non è quasi mai una buona scelta, non solo a causa della latenza coinvolta all'avvio di un thread ma anche perché questo thread non viene riutilizzato:

Observable.just("Hello") .observeOn(Schedulers.newThread()) .doOnNext(s -> result2 += Thread.currentThread().getName() ) .observeOn(Schedulers.newThread()) .subscribe(s -> result1 += Thread.currentThread().getName() ); Thread.sleep(500); Assert.assertTrue(result1.equals("RxNewThreadScheduler-1")); Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));

Quando il lavoratore ha finito, il thread termina semplicemente. Questo Scheduler può essere utilizzato solo quando le attività sono a grana grossa: richiede molto tempo per il completamento, ma ce ne sono pochissime, quindi è improbabile che i thread vengano riutilizzati.

Scheduler scheduler = Schedulers.newThread(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "_Start"; worker.schedule(() -> result += "_worker_"); result += "_End"; }); Thread.sleep(3000); Assert.assertTrue(result.equals( "RxNewThreadScheduler-1_Start_End_worker_"));

Quando abbiamo pianificato il worker su un NewThreadScheduler, abbiamo visto che il worker era associato a un thread particolare.

4. Schedulers.immediate

Schedulers.immediate è uno speciale scheduler che richiama un'attività all'interno del thread del client in modo bloccante, piuttosto che in modo asincrono e restituisce quando l'azione è completata:

Scheduler scheduler = Schedulers.immediate(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "_Start"; worker.schedule(() -> result += "_worker_"); result += "_End"; }); Thread.sleep(500); Assert.assertTrue(result.equals( "main_Start_worker__End"));

In effetti, iscriversi a un Observable tramite Scheduler immediato ha in genere lo stesso effetto di non iscriversi a nessun particolare S cheduler:

Observable.just("Hello") .subscribeOn(Schedulers.immediate()) .subscribe(s -> result += Thread.currentThread().getName() ); Thread.sleep(500); Assert.assertTrue(result.equals("main"));

5. Schedulers.trampoline

Il trampolino Scheduler è molto simile a immediato perché pianifica anche le attività nello stesso thread, bloccando efficacemente.

Tuttavia, l'attività imminente viene eseguita quando tutte le attività pianificate in precedenza vengono completate:

Observable.just(2, 4, 6, 8) .subscribeOn(Schedulers.trampoline()) .subscribe(i -> result += "" + i); Observable.just(1, 3, 5, 7, 9) .subscribeOn(Schedulers.trampoline()) .subscribe(i -> result += "" + i); Thread.sleep(500); Assert.assertTrue(result.equals("246813579"));

Immediate richiama immediatamente una determinata attività, mentre il trampolino attende il termine dell'attività corrente.

Il trampolino 's lavoratore esegue ogni operazione sul thread che in programma il primo compito. La prima chiamata da programmare si blocca fino a quando la coda non viene svuotata:

Scheduler scheduler = Schedulers.trampoline(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "Start"; worker.schedule(() -> { result += "_middleStart"; worker.schedule(() -> result += "_worker_" ); result += "_middleEnd"; }); result += "_mainEnd"; }); Thread.sleep(500); Assert.assertTrue(result .equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Schedulers.from

Gli scheduler sono internamente più complessi degli Executor da java.util.concurrent , quindi era necessaria un'astrazione separata.

Ma perché sono concettualmente molto simili, ovviamente v'è un wrapper che può trasformare esecutore nella pianificazione utilizzando il da metodo factory:

private ThreadFactory threadFactory(String pattern) { return new ThreadFactoryBuilder() .setNameFormat(pattern) .build(); } @Test public void givenExecutors_whenSchedulerFrom_thenReturnElements() throws InterruptedException { ExecutorService poolA = newFixedThreadPool( 10, threadFactory("Sched-A-%d")); Scheduler schedulerA = Schedulers.from(poolA); ExecutorService poolB = newFixedThreadPool( 10, threadFactory("Sched-B-%d")); Scheduler schedulerB = Schedulers.from(poolB); Observable observable = Observable.create(subscriber -> { subscriber.onNext("Alfa"); subscriber.onNext("Beta"); subscriber.onCompleted(); });; observable .subscribeOn(schedulerA) .subscribeOn(schedulerB) .subscribe( x -> result += Thread.currentThread().getName() + x + "_", Throwable::printStackTrace, () -> result += "_Completed" ); Thread.sleep(2000); Assert.assertTrue(result.equals( "Sched-A-0Alfa_Sched-A-0Beta__Completed")); }

SchedulerB viene utilizzato per un breve periodo di tempo, ma pianifica a malapena una nuova azione su schedulerA , che fa tutto il lavoro. Pertanto, più metodi di subscribeOn non solo vengono ignorati, ma introducono anche un piccolo overhead.

7. Schedulers.io

Questo Scheduler è simile al newThread tranne per il fatto che i thread già avviati vengono riciclati e possono eventualmente gestire richieste future.

Questa implementazione funziona in modo simile a ThreadPoolExecutor da java.util.concurrent con un pool illimitato di thread. Ogni volta che viene richiesto un nuovo worker , viene avviato un nuovo thread (e successivamente tenuto inattivo per un po 'di tempo) oppure viene riutilizzato quello inattivo:

Observable.just("io") .subscribeOn(Schedulers.io()) .subscribe(i -> result += Thread.currentThread().getName()); Assert.assertTrue(result.equals("RxIoScheduler-2"));

Dobbiamo stare attenti con risorse illimitate di qualsiasi tipo: in caso di dipendenze esterne lente o che non rispondono come i servizi web, lo scheduler io potrebbe avviare un numero enorme di thread, portando la nostra stessa applicazione a non rispondere.

In practice, following Schedulers.io is almost always a better choice.

8. Schedulers.computation

Computation Scheduler by default limits the number of threads running in parallel to the value of availableProcessors(), as found in the Runtime.getRuntime() utility class.

So we should use a computation scheduler when tasks are entirely CPU-bound; that is, they require computational power and have no blocking code.

It uses an unbounded queue in front of every thread, so if the task is scheduled, but all cores are occupied, it will be queued. However, the queue just before each thread will keep growing:

Observable.just("computation") .subscribeOn(Schedulers.computation()) .subscribe(i -> result += Thread.currentThread().getName()); Assert.assertTrue(result.equals("RxComputationScheduler-1"));

If for some reason, we need a different number of threads than the default, we can always use the rx.scheduler.max-computation-threads system property.

By taking fewer threads we can ensure that there is always one or more CPU cores idle, and even under heavy load, computation thread pool does not saturate the server. It's simply not possible to have more computation threads than cores.

9. Schedulers.test

This Scheduler is used only for testing purposes, and we'll never see it in production code. Its main advantage is the ability to advance the clock, simulating time passing by arbitrarily:

List letters = Arrays.asList("A", "B", "C"); TestScheduler scheduler = Schedulers.test(); TestSubscriber subscriber = new TestSubscriber(); Observable tick = Observable .interval(1, TimeUnit.SECONDS, scheduler); Observable.from(letters) .zipWith(tick, (string, index) -> index + "-" + string) .subscribeOn(scheduler) .subscribe(subscriber); subscriber.assertNoValues(); subscriber.assertNotCompleted(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); subscriber.assertNoErrors(); subscriber.assertValueCount(1); subscriber.assertValues("0-A"); scheduler.advanceTimeTo(3, TimeUnit.SECONDS); subscriber.assertCompleted(); subscriber.assertNoErrors(); subscriber.assertValueCount(3); assertThat( subscriber.getOnNextEvents(), hasItems("0-A", "1-B", "2-C"));

10. Default Schedulers

Some Observable operators in RxJava have alternate forms that allow us to set which Scheduler the operator will use for its operation. Others don't operate on any particular Scheduler or operate on a particular default Scheduler.

For example, the delay operator takes upstream events and pushes them downstream after a given time. Obviously, it cannot hold the original thread during that period, so it must use a different Scheduler:

ExecutorService poolA = newFixedThreadPool( 10, threadFactory("Sched1-")); Scheduler schedulerA = Schedulers.from(poolA); Observable.just('A', 'B') .delay(1, TimeUnit.SECONDS, schedulerA) .subscribe(i -> result+= Thread.currentThread().getName() + i + " "); Thread.sleep(2000); Assert.assertTrue(result.equals("Sched1-A Sched1-B "));

Without supplying a custom schedulerA, all operators below delay would use the computation Scheduler.

Other important operators that support custom Schedulers are buffer, interval, range, timer, skip, take, timeout, and several others. If we don't provide a Scheduler to such operators, computation scheduler is utilized, which is a safe default in most cases.

11. Conclusion

In truly reactive applications, for which all long-running operations are asynchronous, very few threads and thus Schedulers are needed.

Gli scheduler mastering sono essenziali per scrivere codice scalabile e sicuro utilizzando RxJava. La differenza tra subscribeOn e ObservOn è particolarmente importante in condizioni di carico elevato in cui ogni attività deve essere eseguita esattamente quando ci aspettiamo.

Ultimo ma non meno importante, dobbiamo essere sicuri che gli Scheduler utilizzati a valle possano tenere il passo con lo lo ad generato dagli Scheduler a monte. Per ulteriori informazioni, c'è questo articolo sulla contropressione.

L'implementazione di tutti questi esempi e frammenti di codice può essere trovata nel progetto GitHub: questo è un progetto Maven, quindi dovrebbe essere facile da importare ed eseguire così com'è.