Java Concurrency Utility con JCTools

1. Panoramica

In questo tutorial, introdurremo la libreria JCTools (Java Concurrency Tools).

In poche parole, questo fornisce una serie di strutture di dati di utilità adatte per lavorare in un ambiente multi-thread.

2. Algoritmi non bloccanti

Tradizionalmente, il codice multi-thread che funziona su uno stato condiviso mutabile utilizza blocchi per garantire la coerenza dei dati e le pubblicazioni (modifiche apportate da un thread che sono visibili a un altro).

Questo approccio presenta una serie di inconvenienti:

  • i thread potrebbero bloccarsi nel tentativo di acquisire un blocco, senza fare progressi fino al termine dell'operazione di un altro thread - questo impedisce efficacemente il parallelismo
  • maggiore è la contesa di blocco, maggiore è il tempo che la JVM dedica alla pianificazione dei thread, alla gestione della contesa e alle code di thread in attesa e meno lavoro reale sta facendo
  • i deadlock sono possibili se è coinvolto più di un blocco e vengono acquisiti / rilasciati in ordine errato
  • è possibile un rischio di inversione di priorità: un thread ad alta priorità viene bloccato nel tentativo di ottenere un blocco mantenuto da un thread a bassa priorità
  • la maggior parte delle volte vengono utilizzati lucchetti a grana grossa, che danneggiano molto il parallelismo: il bloccaggio a grana fine richiede una progettazione più attenta, aumenta il sovraccarico di bloccaggio ed è più soggetto a errori

Un'alternativa è usare un algoritmo non bloccante, cioè un algoritmo in cui il fallimento o la sospensione di qualsiasi thread non può causare il fallimento o la sospensione di un altro thread .

Un algoritmo non bloccante è privo di blocchi se è garantito che almeno uno dei thread coinvolti proceda per un periodo di tempo arbitrario, ovvero non possono verificarsi deadlock durante l'elaborazione.

Inoltre, questi algoritmi sono esenti da attese se c'è anche un progresso per thread garantito.

Ecco un esempio di Stack non bloccante tratto dall'eccellente libro Java Concurrency in Practice; definisce lo stato di base:

public class ConcurrentStack { AtomicReference
    
      top = new AtomicReference
     
      (); private static class Node { public E item; public Node next; // standard constructor } }
     
    

E anche un paio di metodi API:

public void push(E item){ Node newHead = new Node(item); Node oldHead; do { oldHead = top.get(); newHead.next = oldHead; } while(!top.compareAndSet(oldHead, newHead)); } public E pop() { Node oldHead; Node newHead; do { oldHead = top.get(); if (oldHead == null) { return null; } newHead = oldHead.next; } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item; }

Possiamo vedere che l'algoritmo utilizza istruzioni di confronto e scambio (CAS) a grana fine ed è privo di blocchi (anche se più thread chiamano top.compareAndSet () contemporaneamente, uno di essi è garantito per avere successo) ma non attendere- gratuito in quanto non vi è alcuna garanzia che CAS abbia successo per un thread particolare.

3. Dipendenza

Per prima cosa, aggiungiamo la dipendenza JCTools al nostro pom.xml :

 org.jctools jctools-core 2.1.2 

Si noti che l'ultima versione disponibile è disponibile su Maven Central.

4. Code JCTools

La libreria offre un numero di code da utilizzare in un ambiente multi-thread, cioè uno o più thread scrivono su una coda e uno o più thread leggono da essa in modo thread-safe senza blocchi.

L'interfaccia comune per tutte le implementazioni di Queue è org.jctools.queues.MessagePassingQueue .

4.1. Tipi di code

Tutte le code possono essere classificate nelle rispettive politiche produttore / consumatore:

  • singolo produttore, singolo consumatore: tali classi vengono denominate utilizzando il prefisso Spsc , ad esempio SpscArrayQueue
  • singolo produttore, più consumatori: utilizzare il prefisso Spmc , ad esempio SpmcArrayQueue
  • più produttori, singolo consumatore: utilizzare il prefisso Mpsc , ad esempio MpscArrayQueue
  • più produttori, più consumatori: utilizzare il prefisso Mpmc , ad esempio MpmcArrayQueue

It's important to note that there are no policy checks internally, i.e. a queue might silently misfunction in case of incorrect usage.

E.g. the test below populates a single-producer queue from two threads and passes even though the consumer is not guaranteed to see data from different producers:

SpscArrayQueue queue = new SpscArrayQueue(2); Thread producer1 = new Thread(() -> queue.offer(1)); producer1.start(); producer1.join(); Thread producer2 = new Thread(() -> queue.offer(2)); producer2.start(); producer2.join(); Set fromQueue = new HashSet(); Thread consumer = new Thread(() -> queue.drain(fromQueue::add)); consumer.start(); consumer.join(); assertThat(fromQueue).containsOnly(1, 2);

4.2. Queue Implementations

Summarizing the classifications above, here is the list of JCTools queues:

  • SpscArrayQueue single producer, single consumer, uses an array internally, bound capacity
  • SpscLinkedQueue single producer, single consumer, uses linked list internally, unbound capacity
  • SpscChunkedArrayQueue single producer, single consumer, starts with initial capacity and grows up to max capacity
  • SpscGrowableArrayQueue single producer, single consumer, starts with initial capacity and grows up to max capacity. This is the same contract as SpscChunkedArrayQueue, the only difference is internal chunks management. It's recommended to use SpscChunkedArrayQueue because it has a simplified implementation
  • SpscUnboundedArrayQueue single producer, single consumer, uses an array internally, unbound capacity
  • SpmcArrayQueue single producer, multiple consumers, uses an array internally, bound capacity
  • MpscArrayQueue multiple producers, single consumer, uses an array internally, bound capacity
  • MpscLinkedQueue multiple producers, single consumer, uses a linked list internally, unbound capacity
  • MpmcArrayQueue multiple producers, multiple consumers, uses an array internally, bound capacity

4.3. Atomic Queues

All queues mentioned in the previous section use sun.misc.Unsafe. However, with the advent of Java 9 and the JEP-260 this API becomes inaccessible by default.

So, there are alternative queues which use java.util.concurrent.atomic.AtomicLongFieldUpdater (public API, less performant) instead of sun.misc.Unsafe.

They are generated from the queues above and their names have the word Atomic inserted in between, e.g. SpscChunkedAtomicArrayQueue or MpmcAtomicArrayQueue.

It's recommended to use ‘regular' queues if possible and resort to AtomicQueues only in environments where sun.misc.Unsafe is prohibited/ineffective like HotSpot Java9+ and JRockit.

4.4. Capacity

All JCTools queues might also have a maximum capacity or be unbound. When a queue is full and it's bound by capacity, it stops accepting new elements.

In the following example, we:

  • fill the queue
  • ensure that it stops accepting new elements after that
  • drain from it and ensure that it's possible to add more elements afterward

Please note that a couple of code statements are dropped for readability. The complete implementation can be found over on GitHub:

SpscChunkedArrayQueue queue = new SpscChunkedArrayQueue(8, 16); CountDownLatch startConsuming = new CountDownLatch(1); CountDownLatch awakeProducer = new CountDownLatch(1); Thread producer = new Thread(() -> { IntStream.range(0, queue.capacity()).forEach(i -> { assertThat(queue.offer(i)).isTrue(); }); assertThat(queue.offer(queue.capacity())).isFalse(); startConsuming.countDown(); awakeProducer.await(); assertThat(queue.offer(queue.capacity())).isTrue(); }); producer.start(); startConsuming.await(); Set fromQueue = new HashSet(); queue.drain(fromQueue::add); awakeProducer.countDown(); producer.join(); queue.drain(fromQueue::add); assertThat(fromQueue).containsAll( IntStream.range(0, 17).boxed().collect(toSet()));

5. Other JCTools Data Structures

JCTools offers a couple of non-Queue data structures as well.

All of them are listed below:

  • NonBlockingHashMap a lock-free ConcurrentHashMap alternative with better-scaling properties and generally lower mutation costs. It's implemented via sun.misc.Unsafe, so, it's not recommended to use this class in a HotSpot Java9+ or JRockit environment
  • NonBlockingHashMapLong like NonBlockingHashMap but uses primitive long keys
  • NonBlockingHashSet a simple wrapper around NonBlockingHashMaplike JDK's java.util.Collections.newSetFromMap()
  • NonBlockingIdentityHashMap like NonBlockingHashMap but compares keys by identity.
  • NonBlockingSetInta multi-threaded bit-vector set implemented as an array of primitive longs. Works ineffectively in case of silent autoboxing

6. Performance Testing

Let's use JMH for comparing the JDK's ArrayBlockingQueue vs. JCTools queue's performance. JMH is an open-source micro-benchmark framework from Sun/Oracle JVM gurus which protects us from indeterminism of compiler/jvm optimization algorithms). Please feel free to get more details on it in this article.

Note that the code snippet below misses a couple of statements in order to improve readability. Please find the complete source code on GitHub:

public class MpmcBenchmark { @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK}) public volatile String implementation; public volatile Queue queue; @Benchmark @Group(GROUP_NAME) @GroupThreads(PRODUCER_THREADS_NUMBER) public void write(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && !queue.offer(1L)) { // intentionally left blank } } @Benchmark @Group(GROUP_NAME) @GroupThreads(CONSUMER_THREADS_NUMBER) public void read(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && queue.poll() == null) { // intentionally left blank } } }

Results (excerpt for the 95th percentile, nanoseconds per-operation):

MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op

We can see thatMpmcArrayQueue performs just slightly better than MpmcAtomicArrayQueue and ArrayBlockingQueue is slower by a factor of two.

7. Drawbacks of Using JCTools

Using JCTools has an important drawback – it's not possible to enforce that the library classes are used correctly. For example, consider a situation when we start using MpscArrayQueue in our large and mature project (note that there must be a single consumer).

Unfortunately, as the project is big, there is a possibility that someone makes a programming or configuration error and the queue is now read from more than one thread. The system seems to work as before but now there is a chance that consumers miss some messages. That is a real problem which might have a big impact and is very hard to debug.

Ideally, it should be possible to run a system with a particular system property which forces JCTools to ensure thread access policy. E.g. local/test/staging environments (but not production) might have it turned on. Sadly, JCTools does not provide such a property.

Un'altra considerazione è che, anche se ci siamo assicurati che JCTools sia significativamente più veloce della controparte di JDK, ciò non significa che la nostra applicazione guadagni la stessa velocità quando iniziamo a utilizzare le implementazioni della coda personalizzata. La maggior parte delle applicazioni non scambia molti oggetti tra i thread e sono per lo più vincolate all'I / O.

8. Conclusione

Ora abbiamo una conoscenza di base delle classi di utilità offerte da JCTools e abbiamo visto come si comportano bene, rispetto alle controparti di JDK sotto carico pesante.

In conclusione, vale la pena usare la libreria solo se scambiamo molti oggetti tra i thread e anche in questo caso è necessario stare molto attenti a preservare la politica di accesso ai thread.

Come sempre, il codice sorgente completo per gli esempi sopra può essere trovato su GitHub.