Guida a Java Phaser

1. Panoramica

In questo articolo, esamineremo il costrutto Phaser dal pacchetto java.util.concurrent . È un costrutto molto simile al CountDownLatch che ci consente di coordinare l'esecuzione dei thread. Rispetto al CountDownLatch , ha alcune funzionalità aggiuntive.

Il Phaser è una barriera su cui il numero dinamico di thread deve attendere prima di continuare l'esecuzione. In CountDownLatch quel numero non può essere configurato dinamicamente e deve essere fornito durante la creazione dell'istanza.

2. Phaser API

Il Phaser ci consente di costruire una logica in cui i thread devono attendere sulla barriera prima di passare alla fase successiva di esecuzione .

Possiamo coordinare più fasi di esecuzione, riutilizzando un'istanza Phaser per ogni fase del programma. Ogni fase può avere un numero diverso di thread in attesa di passare a un'altra fase. Più avanti daremo uno sguardo a un esempio di utilizzo delle fasi.

Per partecipare al coordinamento, il thread deve registrarsi () con l' istanza di Phaser . Nota che questo aumenta solo il numero di parti registrate e non possiamo controllare se il thread corrente è registrato: dovremmo sottoclassare l'implementazione per supportarlo.

Il thread segnala che è arrivato alla barriera chiamando arrivaAndAwaitAdvance () , che è un metodo di blocco. Quando il numero di partiti arrivati ​​è uguale al numero di partiti registrati, l'esecuzione del programma continuerà e il numero di fase aumenterà. Possiamo ottenere il numero di fase corrente chiamando il metodo getPhase () .

Quando il thread termina il suo lavoro, dovremmo chiamare il metodo arrivaAndDeregister () per segnalare che il thread corrente non dovrebbe più essere considerato in questa particolare fase.

3. Implementazione della logica utilizzando Phaser API

Diciamo che vogliamo coordinare più fasi di azioni. Tre thread elaboreranno la prima fase e due thread elaboreranno la seconda fase.

Creeremo una classe LongRunningAction che implementa l' interfaccia Runnable :

class LongRunningAction implements Runnable { private String threadName; private Phaser ph; LongRunningAction(String threadName, Phaser ph) { this.threadName = threadName; this.ph = ph; ph.register(); } @Override public void run() { ph.arriveAndAwaitAdvance(); try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } ph.arriveAndDeregister(); } }

Quando viene creata un'istanza della nostra classe di azione, ci stiamo registrando all'istanza di Phaser utilizzando il metodo register () . Ciò aumenterà il numero di thread che utilizzano quel Phaser specifico .

La chiamata all'arrivoAndAwaitAdvance () farà sì che il thread corrente attenda sulla barriera. Come già accennato, quando il numero di partiti arrivati ​​diventa uguale al numero di partiti registrati, l'esecuzione continuerà.

Al termine dell'elaborazione, il thread corrente si annulla da solo chiamando il metodo arrivaAndDeregister () .

Creiamo un test case in cui avvieremo tre thread LongRunningAction e bloccheremo sulla barriera. Successivamente, al termine dell'azione, creeremo due thread LongRunningAction aggiuntivi che eseguiranno l'elaborazione della fase successiva.

Quando si crea l' istanza di Phaser dal thread principale, si passa 1 come argomento. Ciò equivale a chiamare il metodo register () dal thread corrente. Lo stiamo facendo perché, quando creiamo tre thread di lavoro, il thread principale è un coordinatore e quindi il Phaser deve avere quattro thread registrati su di esso:

ExecutorService executorService = Executors.newCachedThreadPool(); Phaser ph = new Phaser(1); assertEquals(0, ph.getPhase());

La fase dopo l'inizializzazione è uguale a zero.

La classe Phaser ha un costruttore in cui possiamo passarle un'istanza genitore. È utile nei casi in cui abbiamo un gran numero di parti che subirebbero enormi costi di contesa di sincronizzazione. In tali situazioni, le istanze di Phaser possono essere impostati in modo che i gruppi di sub-phaser condividono un genitore comune.

Successivamente, iniziamo tre thread di azioni LongRunningAction , che saranno in attesa sulla barriera fino a quando non chiameremo il metodo arrivaAndAwaitAdvance () dal thread principale.

Tieni presente che abbiamo inizializzato il nostro Phaser con 1 e chiamato register () altre tre volte. Ora, tre thread di azione hanno annunciato di essere arrivati ​​alla barriera, quindi è necessaria un'altra chiamata di arrivalAndAwaitAdvance (), quella dal thread principale:

executorService.submit(new LongRunningAction("thread-1", ph)); executorService.submit(new LongRunningAction("thread-2", ph)); executorService.submit(new LongRunningAction("thread-3", ph)); ph.arriveAndAwaitAdvance(); assertEquals(1, ph.getPhase());

Dopo il completamento di quella fase, il metodo getPhase () ne restituirà uno perché il programma ha terminato l'elaborazione della prima fase di esecuzione.

Diciamo che due thread dovrebbero condurre la fase successiva dell'elaborazione. Possiamo sfruttare Phaser per raggiungere questo obiettivo perché ci consente di configurare dinamicamente il numero di thread che dovrebbero attendere sulla barriera. Stiamo avviando due nuovi thread, ma questi non procederanno all'esecuzione fino alla chiamata a arrivalAndAwaitAdvance () dal thread principale (come nel caso precedente):

executorService.submit(new LongRunningAction("thread-4", ph)); executorService.submit(new LongRunningAction("thread-5", ph)); ph.arriveAndAwaitAdvance(); assertEquals(2, ph.getPhase()); ph.arriveAndDeregister();

Dopodiché, il metodo getPhase () restituirà un numero di fase uguale a due. Quando vogliamo finire il nostro programma, dobbiamo chiamare il metodo arrivaAndDeregister () poiché il thread principale è ancora registrato nel Phaser. Quando la cancellazione fa sì che il numero di parti registrate diventi zero, il Phaser viene terminato. Tutte le chiamate ai metodi di sincronizzazione non verranno più bloccate e verranno restituite immediatamente.

L'esecuzione del programma produrrà il seguente output (il codice sorgente completo con le istruzioni della riga di stampa può essere trovato nel repository del codice):

This is phase 0 This is phase 0 This is phase 0 Thread thread-2 before long running action Thread thread-1 before long running action Thread thread-3 before long running action This is phase 1 This is phase 1 Thread thread-4 before long running action Thread thread-5 before long running action

Vediamo che tutti i thread sono in attesa di esecuzione fino all'apertura della barriera. La fase successiva dell'esecuzione viene eseguita solo quando quella precedente è terminata con successo.

4. Conclusione

In questo tutorial, abbiamo esaminato il costrutto Phaser da java.util.concurrent e abbiamo implementato la logica di coordinamento con più fasi utilizzando la classe Phaser .

L'implementazione di tutti questi esempi e frammenti di codice può essere trovata nel progetto GitHub: questo è un progetto Maven, quindi dovrebbe essere facile da importare ed eseguire così com'è.