Guida a DelayQueue

1. Panoramica

In questo articolo, esamineremo il costrutto DelayQueue dal pacchetto java.util.concurrent . Questa è una coda di blocco che potrebbe essere utilizzata nei programmi produttore-consumatore.

Ha una caratteristica molto utile: quando il consumatore vuole prendere un elemento dalla coda, può prenderlo solo quando il ritardo per quel particolare elemento è scaduto.

2. Implementazione del ritardo per gli elementi in DelayQueue

Ogni elemento che vogliamo inserire in DelayQueue deve implementare l' interfaccia Delayed . Diciamo che vogliamo creare una classe DelayObject . Le istanze di quella classe verranno inserite in DelayQueue.

Passeremo le stringhe di dati e delayInMilliseconds come e argomenti al suo costruttore:

public class DelayObject implements Delayed { private String data; private long startTime; public DelayObject(String data, long delayInMilliseconds) { this.data = data; this.startTime = System.currentTimeMillis() + delayInMilliseconds; }

Stiamo definendo un startTime - questo è un momento in cui l'elemento dovrebbe essere consumato dalla coda. Successivamente, dobbiamo implementare il metodo getDelay () : dovrebbe restituire il ritardo rimanente associato a questo oggetto nell'unità di tempo specificata.

Pertanto, è necessario utilizzare il metodo TimeUnit.convert () per restituire il ritardo rimanente nella TimeUnit corretta :

@Override public long getDelay(TimeUnit unit) { long diff = startTime - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); }

Quando il consumatore cerca di prendere un elemento dalla coda, DelayQueue eseguirà getDelay () per scoprire se quell'elemento può essere restituito dalla coda. Se il metodo getDelay () restituirà zero o un numero negativo, significa che potrebbe essere recuperato dalla coda.

Dobbiamo anche implementare il metodo compareTo () , perché gli elementi in DelayQueue verranno ordinati in base alla scadenza. L'elemento che scadrà per primo viene mantenuto in testa alla coda e l'elemento con il tempo di scadenza più alto viene mantenuto in coda:

@Override public int compareTo(Delayed o) { return Ints.saturatedCast( this.startTime - ((DelayObject) o).startTime); }

3. DelayQueue C onsumer e produttore

Per poter testare la nostra DelayQueue, dobbiamo implementare la logica del produttore e del consumatore. La classe producer prende come argomenti la coda, il numero di elementi da produrre e il ritardo di ogni messaggio in millisecondi.

Quindi, quando viene richiamato il metodo run () , inserisce gli elementi nella coda e rimane inattivo per 500 millisecondi dopo ogni inserimento:

public class DelayQueueProducer implements Runnable { private BlockingQueue queue; private Integer numberOfElementsToProduce; private Integer delayOfEachProducedMessageMilliseconds; // standard constructor @Override public void run() { for (int i = 0; i < numberOfElementsToProduce; i++) { DelayObject object = new DelayObject( UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds); System.out.println("Put object: " + object); try { queue.put(object); Thread.sleep(500); } catch (InterruptedException ie) { ie.printStackTrace(); } } } }

L'implementazione consumer è molto simile, ma tiene traccia anche del numero di messaggi consumati:

public class DelayQueueConsumer implements Runnable { private BlockingQueue queue; private Integer numberOfElementsToTake; public AtomicInteger numberOfConsumedElements = new AtomicInteger(); // standard constructors @Override public void run() { for (int i = 0; i < numberOfElementsToTake; i++) { try { DelayObject object = queue.take(); numberOfConsumedElements.incrementAndGet(); System.out.println("Consumer take: " + object); } catch (InterruptedException e) { e.printStackTrace(); } } } }

4. Test di utilizzo di DelayQueue

Per testare il comportamento di DelayQueue, creeremo un thread produttore e un thread consumatore.

Il produttore metterà () due oggetti sulla coda con un ritardo di 500 millisecondi. Il test afferma che il consumatore ha consumato due messaggi:

@Test public void givenDelayQueue_whenProduceElement _thenShouldConsumeAfterGivenDelay() throws InterruptedException { // given ExecutorService executor = Executors.newFixedThreadPool(2); BlockingQueue queue = new DelayQueue(); int numberOfElementsToProduce = 2; int delayOfEachProducedMessageMilliseconds = 500; DelayQueueConsumer consumer = new DelayQueueConsumer( queue, numberOfElementsToProduce); DelayQueueProducer producer = new DelayQueueProducer( queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds); // when executor.submit(producer); executor.submit(consumer); // then executor.awaitTermination(5, TimeUnit.SECONDS); executor.shutdown(); assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce); }

Possiamo osservare che l'esecuzione di questo programma produrrà il seguente output:

Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007} Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007} Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512} Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}

Il produttore inserisce l'oggetto e dopo un po 'viene consumato il primo oggetto per il quale è scaduto il ritardo.

La stessa situazione si è verificata per il secondo elemento.

5. Consumatore non in grado di consumare nel tempo dato

Diciamo che abbiamo un produttore che sta producendo un elemento che scadrà tra 10 secondi :

int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = 10_000; DelayQueueConsumer consumer = new DelayQueueConsumer( queue, numberOfElementsToProduce); DelayQueueProducer producer = new DelayQueueProducer( queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

Inizieremo il nostro test, ma terminerà dopo 5 secondi. A causa delle caratteristiche del DelayQueue, il consumatore non sarà in grado di consumare il messaggio dalla coda perché l'elemento non è ancora scaduto:

executor.submit(producer); executor.submit(consumer); executor.awaitTermination(5, TimeUnit.SECONDS); executor.shutdown(); assertEquals(consumer.numberOfConsumedElements.get(), 0);

Notare che numberOfConsumedElements del consumatore ha un valore uguale a zero.

6. Produzione di un elemento con scadenza immediata

Quando le implementazioni del metodo getDelay () del messaggio ritardato restituiscono un numero negativo, significa che l'elemento specificato è già scaduto. In questa situazione, il produttore consumerà immediatamente quell'elemento.

Possiamo testare la situazione di produrre un elemento con ritardo negativo:

int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = -10_000; DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce); DelayQueueProducer producer = new DelayQueueProducer( queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

Quando iniziamo il test case, il consumatore consumerà l'elemento immediatamente perché è già scaduto:

executor.submit(producer); executor.submit(consumer); executor.awaitTermination(1, TimeUnit.SECONDS); executor.shutdown(); assertEquals(consumer.numberOfConsumedElements.get(), 1);

7. Conclusione

In questo articolo, abbiamo esaminato il costrutto DelayQueue dal pacchetto java.util.concurrent .

Abbiamo implementato un elemento ritardato che è stato prodotto e consumato dalla coda.

Abbiamo sfruttato la nostra implementazione di DelayQueue per consumare elementi scaduti.

L'implementazione di tutti questi esempi e frammenti di codice può essere trovata nel progetto GitHub, che è un progetto Maven, quindi dovrebbe essere facile da importare ed eseguire così com'è.