Guida a CountDownLatch in Java

1. Introduzione

In questo articolo, forniremo una guida alla classe CountDownLatch e dimostreremo come può essere utilizzata in alcuni esempi pratici.

In sostanza, utilizzando un CountDownLatch possiamo bloccare un thread fino a quando altri thread non hanno completato una determinata attività.

2. Utilizzo nella programmazione simultanea

In poche parole, un CountDownLatch ha un campo contatore , che puoi decrementare come richiesto. Possiamo quindi usarlo per bloccare un thread chiamante fino a quando non viene contato fino a zero.

Se stessimo eseguendo un'elaborazione parallela, potremmo istanziare CountDownLatch con lo stesso valore per il contatore di un numero di thread su cui vogliamo lavorare. Quindi, potremmo semplicemente chiamare countdown () al termine di ogni thread, garantendo che un thread dipendente che chiama await () si bloccherà fino al termine dei thread di lavoro.

3. In attesa del completamento di un pool di thread

Proviamo questo modello creando un Worker e utilizzando un campo CountDownLatch per segnalare quando è stato completato:

public class Worker implements Runnable { private List outputScraper; private CountDownLatch countDownLatch; public Worker(List outputScraper, CountDownLatch countDownLatch) { this.outputScraper = outputScraper; this.countDownLatch = countDownLatch; } @Override public void run() { doSomeWork(); outputScraper.add("Counted down"); countDownLatch.countDown(); } }

Quindi, creiamo un test per dimostrare che possiamo ottenere un CountDownLatch per attendere il completamento delle istanze di Worker :

@Test public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new Worker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); outputScraper.add("Latch released"); assertThat(outputScraper) .containsExactly( "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released" ); }

Naturalmente "Latch rilasciato" sarà sempre l'ultimo output, poiché dipende dal rilascio di CountDownLatch .

Nota che se non chiamassimo await () , non saremmo in grado di garantire l'ordine di esecuzione dei thread, quindi il test fallirebbe casualmente.

4. Un pool di thread in attesa di iniziare

Se abbiamo preso l'esempio precedente, ma questa volta abbiamo avviato migliaia di thread invece di cinque, è probabile che molti di quelli precedenti avranno terminato l'elaborazione prima ancora di aver chiamato start () su quelli successivi. Ciò potrebbe rendere difficile provare a riprodurre un problema di concorrenza, poiché non saremmo in grado di far funzionare tutti i nostri thread in parallelo.

Per aggirare questo problema, facciamo in modo che CountdownLatch funzioni in modo diverso rispetto all'esempio precedente. Invece di bloccare un thread padre fino al termine di alcuni thread figlio, possiamo bloccare ogni thread figlio finché tutti gli altri non sono stati avviati.

Modifichiamo il nostro metodo run () in modo che si blocchi prima dell'elaborazione:

public class WaitingWorker implements Runnable { private List outputScraper; private CountDownLatch readyThreadCounter; private CountDownLatch callingThreadBlocker; private CountDownLatch completedThreadCounter; public WaitingWorker( List outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter) { this.outputScraper = outputScraper; this.readyThreadCounter = readyThreadCounter; this.callingThreadBlocker = callingThreadBlocker; this.completedThreadCounter = completedThreadCounter; } @Override public void run() { readyThreadCounter.countDown(); try { callingThreadBlocker.await(); doSomeWork(); outputScraper.add("Counted down"); } catch (InterruptedException e) { e.printStackTrace(); } finally { completedThreadCounter.countDown(); } } }

Ora, modifichiamo il nostro test in modo che si blocchi fino a quando tutti i lavoratori non hanno iniziato, sblocca i lavoratori e quindi si blocca finché i lavoratori non hanno terminato:

@Test public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch readyThreadCounter = new CountDownLatch(5); CountDownLatch callingThreadBlocker = new CountDownLatch(1); CountDownLatch completedThreadCounter = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new WaitingWorker( outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter))) .limit(5) .collect(toList()); workers.forEach(Thread::start); readyThreadCounter.await(); outputScraper.add("Workers ready"); callingThreadBlocker.countDown(); completedThreadCounter.await(); outputScraper.add("Workers complete"); assertThat(outputScraper) .containsExactly( "Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete" ); }

Questo modello è davvero utile per provare a riprodurre bug di concorrenza, poiché può essere utilizzato per forzare migliaia di thread a provare ad eseguire una logica in parallelo.

5. Terminare anticipatamente un CountdownLatch

A volte, potremmo imbatterci in una situazione in cui i Worker terminano per errore prima di eseguire il conto alla rovescia del CountDownLatch. Ciò potrebbe comportare che non raggiunga mai lo zero e await () non termini mai:

@Override public void run() { if (true) { throw new RuntimeException("Oh dear, I'm a BrokenWorker"); } countDownLatch.countDown(); outputScraper.add("Counted down"); }

Modifichiamo il nostro test precedente per utilizzare BrokenWorker, al fine di mostrare come await () si bloccherà per sempre:

@Test public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); }

Chiaramente, questo non è il comportamento che vogliamo: sarebbe molto meglio che l'applicazione continuasse piuttosto che bloccarsi all'infinito.

Per aggirare questo problema, aggiungiamo un argomento di timeout alla nostra chiamata a await ().

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS); assertThat(completed).isFalse();

Come possiamo vedere, il test alla fine scadrà e await () restituirà false .

6. Conclusione

In questa guida rapida, abbiamo dimostrato come possiamo usare un CountDownLatch per bloccare un thread fino a quando altri thread non hanno terminato l'elaborazione.

Abbiamo anche mostrato come può essere utilizzato per aiutare a eseguire il debug dei problemi di concorrenza assicurandosi che i thread vengano eseguiti in parallelo.

L'implementazione di questi esempi può essere trovata su GitHub; questo è un progetto basato su Maven, quindi dovrebbe essere facile da eseguire così com'è.