Una guida a Java SynchronousQueue

1. Panoramica

In questo articolo, esamineremo la SynchronousQueue dal pacchetto java.util.concurrent .

In poche parole, questa implementazione ci consente di scambiare informazioni tra i thread in modo thread-safe.

2. Panoramica API

L' SynchronousQueue ha solo due operazioni sostenute: take () e put (), e ciascuno di essi bloccano .

Ad esempio, quando vogliamo aggiungere un elemento alla coda, dobbiamo chiamare il metodo put () . Quel metodo si bloccherà fino a quando un altro thread non chiamerà il metodo take () , segnalando che è pronto a prendere un elemento.

Sebbene SynchronousQueue abbia un'interfaccia di una coda, dovremmo considerarlo come un punto di scambio per un singolo elemento tra due thread, in cui un thread sta consegnando un elemento e un altro thread sta prendendo quell'elemento.

3. Implementazione di Handoff utilizzando una variabile condivisa

Per vedere perché SynchronousQueue può essere così utile, implementeremo una logica utilizzando una variabile condivisa tra due thread e successivamente riscriveremo quella logica utilizzando SynchronousQueue rendendo il nostro codice molto più semplice e leggibile.

Diciamo che abbiamo due thread - un produttore e un consumatore - e quando il produttore sta impostando un valore di una variabile condivisa, vogliamo segnalare questo fatto al thread consumatore. Successivamente, il thread consumer recupererà un valore da una variabile condivisa.

Useremo CountDownLatch per coordinare questi due thread, per prevenire una situazione in cui il consumatore accede a un valore di una variabile condivisa che non è stata ancora impostata.

Definiremo una variabile sharedState e un CountDownLatch che verranno utilizzati per il coordinamento dell'elaborazione:

ExecutorService executor = Executors.newFixedThreadPool(2); AtomicInteger sharedState = new AtomicInteger(); CountDownLatch countDownLatch = new CountDownLatch(1);

Il produttore salverà un numero intero casuale nella variabile sharedState ed eseguirà il metodo countDown () sul countDownLatch, segnalando al consumatore che può recuperare un valore da sharedState:

Runnable producer = () -> { Integer producedElement = ThreadLocalRandom .current() .nextInt(); sharedState.set(producedElement); countDownLatch.countDown(); };

Il consumatore attenderà sul countDownLatch usando il metodo await () . Quando il produttore segnala che la variabile è stata impostata, il consumatore la recupererà da sharedState:

Runnable consumer = () -> { try { countDownLatch.await(); Integer consumedElement = sharedState.get(); } catch (InterruptedException ex) { ex.printStackTrace(); } };

Ultimo ma non meno importante, iniziamo il nostro programma:

executor.execute(producer); executor.execute(consumer); executor.awaitTermination(500, TimeUnit.MILLISECONDS); executor.shutdown(); assertEquals(countDownLatch.getCount(), 0);

Produrrà il seguente output:

Saving an element: -1507375353 to the exchange point consumed an element: -1507375353 from the exchange point

Possiamo vedere che questo è un sacco di codice per implementare una funzionalità così semplice come lo scambio di un elemento tra due thread. Nella prossima sezione, proveremo a renderlo migliore.

4. Implementazione di Handoff utilizzando SynchronousQueue

Implementiamo ora la stessa funzionalità della sezione precedente, ma con una SynchronousQueue. Ha un doppio effetto perché possiamo usarlo per lo scambio di stato tra i thread e per coordinare quell'azione in modo da non dover usare nient'altro che SynchronousQueue.

In primo luogo, definiremo una coda:

ExecutorService executor = Executors.newFixedThreadPool(2); SynchronousQueue queue = new SynchronousQueue();

Il produttore chiamerà un metodo put () che si bloccherà fino a quando un altro thread non prenderà un elemento dalla coda:

Runnable producer = () -> { Integer producedElement = ThreadLocalRandom .current() .nextInt(); try { queue.put(producedElement); } catch (InterruptedException ex) { ex.printStackTrace(); } };

Il consumatore recupererà semplicemente quell'elemento usando il metodo take () :

Runnable consumer = () -> { try { Integer consumedElement = queue.take(); } catch (InterruptedException ex) { ex.printStackTrace(); } };

Successivamente, inizieremo il nostro programma:

executor.execute(producer); executor.execute(consumer); executor.awaitTermination(500, TimeUnit.MILLISECONDS); executor.shutdown(); assertEquals(queue.size(), 0);

Produrrà il seguente output:

Saving an element: 339626897 to the exchange point consumed an element: 339626897 from the exchange point

Possiamo vedere che un SynchronousQueue viene utilizzato come punto di scambio tra i thread, il che è molto meglio e più comprensibile rispetto all'esempio precedente che utilizzava lo stato condiviso insieme a CountDownLatch.

5. conclusione

In questo breve tutorial, abbiamo esaminato il costrutto SynchronousQueue . Abbiamo creato un programma che scambia dati tra due thread utilizzando lo stato condiviso, quindi abbiamo riscritto quel programma per sfruttare il costrutto SynchronousQueue . Questo funge da punto di scambio che coordina il thread produttore e consumatore.

L'implementazione di tutti questi esempi e frammenti di codice può essere trovata nel progetto GitHub: questo è un progetto Maven, quindi dovrebbe essere facile da importare ed eseguire così com'è.