Introduzione a Exchanger in Java

1. Panoramica

In questo tutorial, esamineremo java.util.concurrent.Exchanger. Questo funziona come un punto comune per due thread in Java per lo scambio di oggetti tra di loro.

2. Introduzione a Exchanger

L'Exchanger classe Java può essere utilizzato per condividere oggetti tra due fili di tipo T . La classe fornisce solo un singolo scambio di metodi di overload (T t) .

Quando viene invocato, lo scambio attende che anche l'altro thread nella coppia lo chiami. A questo punto, il secondo thread trova il primo thread in attesa con il suo oggetto. Il thread scambia gli oggetti che stanno tenendo e segnala lo scambio, e ora possono tornare.

Diamo un'occhiata a un esempio per capire lo scambio di messaggi tra due thread con Exchanger :

@Test public void givenThreads_whenMessageExchanged_thenCorrect() { Exchanger exchanger = new Exchanger(); Runnable taskA = () -> { try { String message = exchanger.exchange("from A"); assertEquals("from B", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; Runnable taskB = () -> { try { String message = exchanger.exchange("from B"); assertEquals("from A", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; CompletableFuture.allOf( runAsync(taskA), runAsync(taskB)).join(); }

Qui, abbiamo i due thread che si scambiano messaggi tra loro utilizzando lo scambiatore comune. Vediamo un esempio in cui scambiamo un oggetto dal thread principale con un nuovo thread:

@Test public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException { Exchanger exchanger = new Exchanger(); Runnable runner = () -> { try { String message = exchanger.exchange("from runner"); assertEquals("to runner", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; CompletableFuture result = CompletableFuture.runAsync(runner); String msg = exchanger.exchange("to runner"); assertEquals("from runner", msg); result.join(); }

Nota che dobbiamo prima avviare il thread runner e successivamente chiamare exchange () nel thread principale.

Si noti inoltre che la chiamata del primo thread potrebbe scadere se il secondo thread non raggiunge il punto di scambio nel tempo. Il tempo di attesa del primo thread può essere controllato utilizzando lo scambio sovraccarico (T t, timeout lungo, TimeUnit timeUnit).

3. Nessuno scambio di dati GC

Exchanger potrebbe essere utilizzato per creare modelli di pipeline con il passaggio di dati da un thread all'altro. In questa sezione, creeremo un semplice stack di thread che passano continuamente i dati tra loro come una pipeline.

@Test public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException { Exchanger
    
      readerExchanger = new Exchanger(); Exchanger
     
       writerExchanger = new Exchanger(); Runnable reader = () -> { Queue readerBuffer = new ConcurrentLinkedQueue(); while (true) { readerBuffer.add(UUID.randomUUID().toString()); if (readerBuffer.size() >= BUFFER_SIZE) { readerBuffer = readerExchanger.exchange(readerBuffer); } } }; Runnable processor = () -> { Queue processorBuffer = new ConcurrentLinkedQueue(); Queue writerBuffer = new ConcurrentLinkedQueue(); processorBuffer = readerExchanger.exchange(processorBuffer); while (true) { writerBuffer.add(processorBuffer.poll()); if (processorBuffer.isEmpty()) { processorBuffer = readerExchanger.exchange(processorBuffer); writerBuffer = writerExchanger.exchange(writerBuffer); } } }; Runnable writer = () -> { Queue writerBuffer = new ConcurrentLinkedQueue(); writerBuffer = writerExchanger.exchange(writerBuffer); while (true) { System.out.println(writerBuffer.poll()); if (writerBuffer.isEmpty()) { writerBuffer = writerExchanger.exchange(writerBuffer); } } }; CompletableFuture.allOf( runAsync(reader), runAsync(processor), runAsync(writer)).join(); }
     
    

Qui abbiamo tre thread: lettore , processore e scrittore . Insieme, funzionano come un'unica pipeline per lo scambio di dati tra di loro.

Il readerExchanger è condiviso tra il reader e il thread del processore , mentre il writerExchanger è condiviso tra il processore e il thread del writer .

Notare che l'esempio qui è solo per dimostrazione. Dobbiamo stare attenti durante la creazione di loop infiniti con while (true) . Inoltre, per mantenere il codice leggibile, abbiamo omesso la gestione di alcune eccezioni.

Questo modello di scambio di dati durante il riutilizzo del buffer consente una minore raccolta di dati inutili. Il metodo di scambio restituisce le stesse istanze della coda e quindi non ci sarebbe alcun GC per questi oggetti. A differenza di qualsiasi coda di blocco, lo scambiatore non crea nodi o oggetti per conservare e condividere dati.

La creazione di una pipeline di questo tipo è simile al pattern Disrupter, con una differenza fondamentale, il pattern Disrupter supporta più produttori e consumatori, mentre uno scambiatore potrebbe essere utilizzato tra una coppia di consumatori e produttori.

4. Conclusione

Quindi, abbiamo imparato cos'è Exchanger in Java, come funziona e abbiamo visto come utilizzare la classe Exchanger . Inoltre, abbiamo creato una pipeline e dimostrato lo scambio di dati senza GC tra i thread.

Come sempre, il codice è disponibile su GitHub.