CyclicBarrier in Java

1. Introduzione

CyclicBarriers sono costrutti di sincronizzazione introdotti con Java 5 come parte del pacchetto java.util.concurrent .

In questo articolo esploreremo questa implementazione in uno scenario di concorrenza.

2. Concorrenza Java - sincronizzatori

Il pacchetto java.util.concurrent contiene diverse classi che aiutano a gestire una serie di thread che collaborano tra loro. Alcuni di questi includono:

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Scambiatore
  • Semaforo
  • SynchronousQueue

Queste classi offrono funzionalità predefinite per schemi di interazione comuni tra i thread.

Se abbiamo una serie di thread che comunicano tra loro e assomigliano a uno dei modelli comuni, possiamo semplicemente riutilizzare le classi di libreria appropriate (chiamate anche sincronizzatori ) invece di provare a creare uno schema personalizzato utilizzando un insieme di blocchi e condizioni oggetti e la parola chiave sincronizzata .

Concentriamoci sul CyclicBarrier in futuro.

3. CyclicBarrier

Un CyclicBarrier è un sincronizzatore che consente a un insieme di thread di attendere l'un l'altro per raggiungere un punto di esecuzione comune, chiamato anche barriera .

Le CyclicBarrier vengono utilizzate nei programmi in cui abbiamo un numero fisso di thread che devono attendere l'un l'altro per raggiungere un punto comune prima di continuare l'esecuzione.

La barriera è chiamata ciclica perché può essere riutilizzata dopo il rilascio dei thread in attesa.

4. Utilizzo

Il costruttore per un CyclicBarrier è semplice. Richiede un singolo numero intero che denota il numero di thread che devono chiamare il metodo await () sull'istanza di barriera per indicare il raggiungimento del punto di esecuzione comune:

public CyclicBarrier(int parties)

I thread che devono sincronizzare la loro esecuzione sono anche chiamati parti e chiamando il metodo await () è possibile registrare che un determinato thread ha raggiunto il punto barriera.

Questa chiamata è sincrona e il thread che chiama questo metodo sospende l'esecuzione fino a quando un numero specificato di thread non ha chiamato lo stesso metodo sulla barriera. Questa situazione in cui il numero richiesto di thread ha chiamato await () , viene chiamata intervento della barriera .

Facoltativamente, possiamo passare il secondo argomento al costruttore, che è un'istanza eseguibile . Questo ha una logica che verrebbe eseguita dall'ultimo thread che fa scattare la barriera:

public CyclicBarrier(int parties, Runnable barrierAction)

5. Attuazione

Per vedere CyclicBarrier in azione, consideriamo il seguente scenario:

C'è un'operazione che esegue un numero fisso di thread e memorizza i risultati corrispondenti in un elenco. Quando tutti i thread finiscono di eseguire la loro azione, uno di loro (in genere l'ultimo che fa scattare la barriera) inizia a elaborare i dati che sono stati recuperati da ciascuno di essi.

Implementiamo la classe principale in cui avviene tutta l'azione:

public class CyclicBarrierDemo { private CyclicBarrier cyclicBarrier; private List
    
      partialResults = Collections.synchronizedList(new ArrayList()); private Random random = new Random(); private int NUM_PARTIAL_RESULTS; private int NUM_WORKERS; // ... }
    

Questa classe è piuttosto semplice: NUM_WORKERS è il numero di thread che verranno eseguiti e NUM_PARTIAL_RESULTS è il numero di risultati che ciascuno dei thread di lavoro produrrà.

Infine, abbiamo partialResults che sono un elenco che memorizzerà i risultati di ciascuno di questi thread di lavoro. Si noti che questo elenco è un SynchronizedList perché più thread scriveranno su di esso contemporaneamente e il metodo add () non è thread-safe su un semplice ArrayList .

Ora implementiamo la logica di ciascuno dei thread di lavoro:

public class CyclicBarrierDemo { // ... class NumberCruncherThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); List partialResult = new ArrayList(); // Crunch some numbers and store the partial result for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) { Integer num = random.nextInt(10); System.out.println(thisThreadName + ": Crunching some numbers! Final result - " + num); partialResult.add(num); } partialResults.add(partialResult); try { System.out.println(thisThreadName + " waiting for others to reach barrier."); cyclicBarrier.await(); } catch (InterruptedException e) { // ... } catch (BrokenBarrierException e) { // ... } } } }

Ora implementeremo la logica che viene eseguita quando la barriera è stata scattata.

Per semplificare le cose, aggiungiamo semplicemente tutti i numeri nell'elenco dei risultati parziali:

public class CyclicBarrierDemo { // ... class AggregatorThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); System.out.println( thisThreadName + ": Computing sum of " + NUM_WORKERS + " workers, having " + NUM_PARTIAL_RESULTS + " results each."); int sum = 0; for (List threadResult : partialResults) { System.out.print("Adding "); for (Integer partialResult : threadResult) { System.out.print(partialResult+" "); sum += partialResult; } System.out.println(); } System.out.println(thisThreadName + ": Final result = " + sum); } } }

Il passo finale sarebbe costruire CyclicBarrier e dare il via alle cose con un metodo main () :

public class CyclicBarrierDemo { // Previous code public void runSimulation(int numWorkers, int numberOfPartialResults) { NUM_PARTIAL_RESULTS = numberOfPartialResults; NUM_WORKERS = numWorkers; cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread()); System.out.println("Spawning " + NUM_WORKERS + " worker threads to compute " + NUM_PARTIAL_RESULTS + " partial results each"); for (int i = 0; i < NUM_WORKERS; i++) { Thread worker = new Thread(new NumberCruncherThread()); worker.setName("Thread " + i); worker.start(); } } public static void main(String[] args) { CyclicBarrierDemo demo = new CyclicBarrierDemo(); demo.runSimulation(5, 3); } } 

Nel codice precedente, abbiamo inizializzato la barriera ciclica con 5 thread che producono ciascuno 3 numeri interi come parte del loro calcolo e memorizzano lo stesso nell'elenco risultante.

Una volta che la barriera è scattata, l'ultimo thread che ha fatto scattare la barriera esegue la logica specificata in AggregatorThread, ovvero - aggiungi tutti i numeri prodotti dai thread.

6. Risultati

Ecco l'output di un'esecuzione del programma precedente: ogni esecuzione potrebbe creare risultati diversi poiché i thread possono essere generati in un ordine diverso:

Spawning 5 worker threads to compute 3 partial results each Thread 0: Crunching some numbers! Final result - 6 Thread 0: Crunching some numbers! Final result - 2 Thread 0: Crunching some numbers! Final result - 2 Thread 0 waiting for others to reach barrier. Thread 1: Crunching some numbers! Final result - 2 Thread 1: Crunching some numbers! Final result - 0 Thread 1: Crunching some numbers! Final result - 5 Thread 1 waiting for others to reach barrier. Thread 3: Crunching some numbers! Final result - 6 Thread 3: Crunching some numbers! Final result - 4 Thread 3: Crunching some numbers! Final result - 0 Thread 3 waiting for others to reach barrier. Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 0 Thread 2 waiting for others to reach barrier. Thread 4: Crunching some numbers! Final result - 9 Thread 4: Crunching some numbers! Final result - 3 Thread 4: Crunching some numbers! Final result - 5 Thread 4 waiting for others to reach barrier. Thread 4: Computing final sum of 5 workers, having 3 results each. Adding 6 2 2 Adding 2 0 5 Adding 6 4 0 Adding 1 1 0 Adding 9 3 5 Thread 4: Final result = 46 

Come mostra l'output precedente, il thread 4 è quello che fa scattare la barriera ed esegue anche la logica di aggregazione finale. Inoltre, non è necessario che i thread vengano effettivamente eseguiti nell'ordine in cui sono stati avviati, come mostra l'esempio precedente.

7. Conclusione

In questo articolo, abbiamo visto cos'è un CyclicBarrier e in che tipo di situazioni è utile.

Abbiamo anche implementato uno scenario in cui avevamo bisogno di un numero fisso di thread per raggiungere un punto di esecuzione fisso, prima di continuare con un'altra logica di programma.

Come sempre, il codice per il tutorial può essere trovato su GitHub.