Implementazione di un Ring Buffer in Java

1. Panoramica

In questo tutorial impareremo come implementare un Ring Buffer in Java.

2. Ring Buffer

Ring Buffer (o Circular Buffer) è una struttura di dati circolari limitata utilizzata per il buffering dei dati tra due o più thread . Mentre continuiamo a scrivere su un buffer circolare, si riavvolge non appena raggiunge la fine.

2.1. Come funziona

Un Ring Buffer viene implementato utilizzando un array di dimensioni fisse che si avvolge ai confini .

Oltre all'array, tiene traccia di tre cose:

  • il prossimo slot disponibile nel buffer per inserire un elemento,
  • il prossimo elemento non letto nel buffer,
  • e la fine dell'array, il punto in cui il buffer si avvolge all'inizio dell'array

I meccanismi di come un buffer circolare gestisce questi requisiti variano con l'implementazione. Ad esempio, la voce di Wikipedia sull'argomento mostra un metodo che utilizza quattro puntatori.

Prenderemo in prestito l'approccio dall'implementazione di Disruptor del ring buffer usando le sequenze.

La prima cosa che dobbiamo sapere è la capacità, la dimensione massima fissa del buffer. Successivamente, useremo due sequenze crescenti in modo monotono :

  • Scrivi sequenza: a partire da -1, aumenta di 1 man mano che inseriamo un elemento
  • Sequenza di lettura: a partire da 0, aumenta di 1 man mano che consumiamo un elemento

Possiamo mappare una sequenza a un indice nell'array utilizzando un'operazione mod:

arrayIndex = sequence % capacity 

L' operazione mod avvolge la sequenza attorno ai confini per derivare uno slot nel buffer :

Vediamo come inseriremmo un elemento:

buffer[++writeSequence % capacity] = element 

Stiamo pre-incrementando la sequenza prima di inserire un elemento.

Per consumare un elemento facciamo un post-incremento:

element = buffer[readSequence++ % capacity] 

In questo caso, eseguiamo un post-incremento sulla sequenza. Il consumo di un elemento non lo rimuove dal buffer: rimane nell'array finché non viene sovrascritto .

2.2. Buffer vuoti e pieni

Mentre avvolgiamo l'array, inizieremo a sovrascrivere i dati nel buffer. Se il buffer è pieno, possiamo scegliere di sovrascrivere i dati più vecchi indipendentemente dal fatto che il lettore li abbia consumati o impedire la sovrascrittura dei dati che non sono stati letti .

Se il lettore può permettersi di perdere i valori intermedi o vecchi (ad esempio, un ticker del prezzo di un'azione), possiamo sovrascrivere i dati senza attendere che vengano consumati. D'altra parte, se il lettore deve consumare tutti i valori (come con le transazioni di e-commerce), dovremmo aspettare (blocco / occupato-attesa) fino a quando il buffer non ha uno slot disponibile.

Il buffer è pieno se la dimensione del buffer è uguale alla sua capacità , dove la sua dimensione è uguale al numero di elementi non letti:

size = (writeSequence - readSequence) + 1 isFull = (size == capacity) 

Se la sequenza di scrittura è in ritardo rispetto alla sequenza di lettura, il buffer è vuoto :

isEmpty = writeSequence < readSequence 

Il buffer restituisce un valore nullo se è vuoto.

2.2. Vantaggi e svantaggi

Un buffer circolare è un buffer FIFO efficiente. Utilizza un array di dimensioni fisse che può essere pre-allocato in anticipo e consente un modello di accesso alla memoria efficiente. Tutte le operazioni di buffer sono a tempo costante O (1) , incluso il consumo di un elemento, poiché non richiede uno spostamento di elementi.

Il rovescio della medaglia, determinare la dimensione corretta del buffer circolare è fondamentale. Ad esempio, le operazioni di scrittura possono bloccarsi a lungo se il buffer è sottodimensionato e le letture sono lente. Possiamo usare il dimensionamento dinamico, ma richiederebbe lo spostamento dei dati e perderemo la maggior parte dei vantaggi discussi sopra.

3. Implementazione in Java

Ora che abbiamo capito come funziona un ring buffer, procediamo con l'implementazione in Java.

3.1. Inizializzazione

Per prima cosa, definiamo un costruttore che inizializza il buffer con una capacità predefinita:

public CircularBuffer(int capacity) { this.capacity = capacity; this.data = (E[]) new Object[capacity]; this.readSequence = 0; this.writeSequence = -1; } 

Questo creerà un buffer vuoto e inizializzerà i campi della sequenza come discusso nella sezione precedente.

3.3. Offrire

Successivamente, implementeremo l' operazione di offerta che inserisce un elemento nel buffer nel successivo slot disponibile e restituisce true in caso di successo. Restituisce false se il buffer non riesce a trovare uno slot vuoto, ovvero non possiamo sovrascrivere i valori non letti .

Implementiamo il metodo dell'offerta in Java:

public boolean offer(E element) { boolean isFull = (writeSequence - readSequence) + 1 == capacity; if (!isFull) { int nextWriteSeq = writeSequence + 1; data[nextWriteSeq % capacity] = element; writeSequence++; return true; } return false; } 

Quindi, stiamo incrementando la sequenza di scrittura e calcolando l'indice nell'array per il successivo slot disponibile. Quindi, stiamo scrivendo i dati nel buffer e memorizzando la sequenza di scrittura aggiornata.

Proviamolo:

@Test public void givenCircularBuffer_whenAnElementIsEnqueued_thenSizeIsOne() { CircularBuffer buffer = new CircularBuffer(defaultCapacity); assertTrue(buffer.offer("Square")); assertEquals(1, buffer.size()); } 

3.4. Poll

Finally, we'll implement the poll operation that retrieves and removes the next unread element. The poll operation doesn't remove the element but increments the read sequence.

Let's implement it:

public E poll() { boolean isEmpty = writeSequence < readSequence; if (!isEmpty) { E nextValue = data[readSequence % capacity]; readSequence++; return nextValue; } return null; } 

Here, we're reading the data at the current read sequence by computing the index in the array. Then, we're incrementing the sequence and returning the value, if the buffer is not empty.

Let's test it out:

@Test public void givenCircularBuffer_whenAnElementIsDequeued_thenElementMatchesEnqueuedElement() { CircularBuffer buffer = new CircularBuffer(defaultCapacity); buffer.offer("Triangle"); String shape = buffer.poll(); assertEquals("Triangle", shape); } 

4. Producer-Consumer Problem

We've talked about the use of a ring buffer for exchanging data between two or more threads, which is an example of a synchronization problem called the Producer-Consumer problem. In Java, we can solve the producer-consumer problem in various ways using semaphores, bounded queues, ring buffers, etc.

Let's implement a solution based on a ring buffer.

4.1. volatile Sequence Fields

Our implementation of the ring buffer is not thread-safe. Let's make it thread-safe for the simple single-producer and single-consumer case.

The producer writes data to the buffer and increments the writeSequence, while the consumer only reads from the buffer and increments the readSequence. So, the backing array is contention-free and we can get away without any synchronization.

But we still need to ensure that the consumer can see the latest value of the writeSequence field (visibility) and that the writeSequence is not updated before the data is actually available in the buffer (ordering).

We can make the ring buffer concurrent and lock-free in this case by making the sequence fields volatile:

private volatile int writeSequence = -1, readSequence = 0; 

In the offer method, a write to the volatile field writeSequence guarantees that the writes to the buffer happen before updating the sequence. At the same time, the volatile visibility guarantee ensures that the consumer will always see the latest value of writeSequence.

4.2. Producer

Let's implement a simple producer Runnable that writes to the ring buffer:

public void run() { for (int i = 0; i < items.length;) { if (buffer.offer(items[i])) { System.out.println("Produced: " + items[i]); i++; } } } 

The producer thread would wait for an empty slot in a loop (busy-waiting).

4.3. Consumer

We'll implement a consumer Callable that reads from the buffer:

public T[] call() { T[] items = (T[]) new Object[expectedCount]; for (int i = 0; i < items.length;) { T item = buffer.poll(); if (item != null) { items[i++] = item; System.out.println("Consumed: " + item); } } return items; } 

Il thread consumer continua senza stampare se riceve un valore null dal buffer.

Scriviamo il nostro codice del driver:

executorService.submit(new Thread(new Producer(buffer))); executorService.submit(new Thread(new Consumer(buffer))); 

L'esecuzione del nostro programma produttore-consumatore produce un output come di seguito:

Produced: Circle Produced: Triangle Consumed: Circle Produced: Rectangle Consumed: Triangle Consumed: Rectangle Produced: Square Produced: Rhombus Consumed: Square Produced: Trapezoid Consumed: Rhombus Consumed: Trapezoid Produced: Pentagon Produced: Pentagram Produced: Hexagon Consumed: Pentagon Consumed: Pentagram Produced: Hexagram Consumed: Hexagon Consumed: Hexagram 

5. Conclusione

In questo tutorial, abbiamo imparato come implementare un Ring Buffer ed esplorato come può essere utilizzato per risolvere il problema produttore-consumatore.

Come al solito, il codice sorgente di tutti gli esempi è disponibile su GitHub.