Guida al ConcurrentSkipListMap

1. Panoramica

In questo rapido articolo, esamineremo la classe ConcurrentSkipListMap dal pacchetto java.util.concurrent .

Questo costrutto ci consente di creare logica thread-safe in modo privo di blocchi. È l'ideale per i problemi quando vogliamo creare un'istantanea immutabile dei dati mentre altri thread stanno ancora inserendo dati nella mappa.

Risolveremo il problema di ordinare un flusso di eventi e ottenere un'istantanea degli eventi che sono arrivati ​​negli ultimi 60 secondi usando quel costrutto .

2. Logica di ordinamento dei flussi

Supponiamo di avere un flusso di eventi che provengono continuamente da più thread. Dobbiamo essere in grado di prendere eventi degli ultimi 60 secondi e anche eventi più vecchi di 60 secondi.

Per prima cosa, definiamo la struttura dei nostri dati dell'evento:

public class Event { private ZonedDateTime eventTime; private String content; // standard constructors/getters }

Vogliamo mantenere i nostri eventi ordinati utilizzando il campo eventTime . Per ottenere ciò utilizzando ConcurrentSkipListMap, dobbiamo passare un Comparator al suo costruttore durante la creazione di un'istanza di esso:

ConcurrentSkipListMap events = new ConcurrentSkipListMap( Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));

Confronteremo tutti gli eventi arrivati ​​utilizzando i loro timestamp. Stiamo usando il metodo comparingLong () e stiamo passando la funzione di estrazione che può richiedere un lungo timestamp da ZonedDateTime.

Quando i nostri eventi stanno arrivando, dobbiamo solo aggiungerli alla mappa usando il metodo put () . Tieni presente che questo metodo non richiede alcuna sincronizzazione esplicita:

public void acceptEvent(Event event) { events.put(event.getEventTime(), event.getContent()); }

Il ConcurrentSkipListMap gestirà la selezione di quegli eventi sotto utilizzando il comparatore che è stato passato ad esso nel costruttore.

I vantaggi più notevoli di ConcurrentSkipListMap sono i metodi che possono creare un'istantanea immutabile dei suoi dati senza blocchi. Per ottenere tutti gli eventi che sono arrivati ​​nell'ultimo minuto, possiamo usare il metodo tailMap () e passare il tempo da cui vogliamo ottenere elementi:

public ConcurrentNavigableMap getEventsFromLastMinute() { return events.tailMap(ZonedDateTime.now().minusMinutes(1)); } 

Restituirà tutti gli eventi dell'ultimo minuto. Sarà un'istantanea immutabile e la cosa più importante è che altri thread di scrittura possano aggiungere nuovi eventi a ConcurrentSkipListMap senza alcuna necessità di eseguire un blocco esplicito.

Ora possiamo ottenere tutti gli eventi che sono arrivati ​​dopo quel minuto da adesso - usando il metodo headMap () :

public ConcurrentNavigableMap getEventsOlderThatOneMinute() { return events.headMap(ZonedDateTime.now().minusMinutes(1)); }

Ciò restituirà un'istantanea immutabile di tutti gli eventi più vecchi di un minuto. Tutti i metodi precedenti appartengono alla classe EventWindowSort , che useremo nella sezione successiva.

3. Verifica della logica del flusso di ordinamento

Una volta implementata la nostra logica di ordinamento utilizzando ConcurrentSkipListMap, ora possiamo testarla creando due thread di scrittura che invieranno cento eventi ciascuno:

ExecutorService executorService = Executors.newFixedThreadPool(3); EventWindowSort eventWindowSort = new EventWindowSort(); int numberOfThreads = 2; Runnable producer = () -> IntStream .rangeClosed(0, 100) .forEach(index -> eventWindowSort.acceptEvent( new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString())) ); for (int i = 0; i < numberOfThreads; i++) { executorService.execute(producer); } 

Ogni thread sta invocando il metodo acceptEvent () , inviando gli eventi che hanno eventTime da ora a "ora meno cento secondi".

Nel frattempo, possiamo invocare il metodo getEventsFromLastMinute () che restituirà l'istantanea degli eventi che si trovano all'interno della finestra di un minuto:

ConcurrentNavigableMap eventsFromLastMinute = eventWindowSort.getEventsFromLastMinute();

Il numero di eventi in eventsFromLastMinute varierà in ogni esecuzione di test a seconda della velocità con cui i thread del produttore invieranno gli eventi a EventWindowSort. Possiamo affermare che non c'è un singolo evento nello snapshot restituito che sia più vecchio di un minuto:

long eventsOlderThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1))) .count(); assertEquals(eventsOlderThanOneMinute, 0);

E che ci sono più di zero eventi nell'istantanea all'interno della finestra di un minuto:

long eventYoungerThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1))) .count(); assertTrue(eventYoungerThanOneMinute > 0);

Il nostro getEventsFromLastMinute () utilizza tailMap () sottostante.

Proviamo ora getEventsOlderThatOneMinute () che utilizza il metodo headMap () da ConcurrentSkipListMap:

ConcurrentNavigableMap eventsFromLastMinute = eventWindowSort.getEventsOlderThatOneMinute();

Questa volta otteniamo un'istantanea degli eventi più vecchi di un minuto. Possiamo affermare che ci sono più di zero di tali eventi:

long eventsOlderThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1))) .count(); assertTrue(eventsOlderThanOneMinute > 0);

E poi, che non c'è un singolo evento che proviene dall'ultimo minuto:

long eventYoungerThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1))) .count(); assertEquals(eventYoungerThanOneMinute, 0);

La cosa più importante da notare è che possiamo acquisire l'istantanea dei dati mentre altri thread stanno ancora aggiungendo nuovi valori a ConcurrentSkipListMap.

4. Conclusione

In questo breve tutorial, abbiamo esaminato le basi di ConcurrentSkipListMap , insieme ad alcuni esempi pratici .

Abbiamo sfruttato le elevate prestazioni di ConcurrentSkipListMap per implementare un algoritmo non bloccante che può fornirci un'istantanea immutabile dei dati anche se allo stesso tempo più thread stanno aggiornando la mappa.

L'implementazione di tutti questi esempi e frammenti di codice può essere trovata nel progetto GitHub; questo è un progetto Maven, quindi dovrebbe essere facile da importare ed eseguire così com'è.