Panoramica di java.util.concurrent

1. Panoramica

Il pacchetto java.util.concurrent fornisce strumenti per la creazione di applicazioni simultanee.

In questo articolo faremo una panoramica dell'intero pacchetto.

2. Componenti principali

Il java.util.concurrent contiene troppi caratteristiche per discutere in un unico articolo. In questo articolo, ci concentreremo principalmente su alcune delle utilità più utili di questo pacchetto come:

  • Esecutore
  • ExecutorService
  • ScheduledExecutorService
  • Futuro
  • CountDownLatch
  • CyclicBarrier
  • Semaforo
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Serrature
  • Phaser

Puoi anche trovare molti articoli dedicati alle singole classi qui.

2.1. Esecutore

Executor è un'interfaccia che rappresenta un oggetto che esegue le attività fornite.

Dipende dalla particolare implementazione (da cui viene avviata la chiamata) se l'attività deve essere eseguita su un thread nuovo o corrente. Quindi, utilizzando questa interfaccia, possiamo separare il flusso di esecuzione dell'attività dal meccanismo di esecuzione dell'attività effettiva.

Un punto da notare qui è che Executor non richiede strettamente che l'esecuzione dell'attività sia asincrona. Nel caso più semplice, un esecutore può richiamare immediatamente l'attività inviata nel thread di richiamo.

Dobbiamo creare un invoker per creare l'istanza dell'esecutore:

public class Invoker implements Executor { @Override public void execute(Runnable r) { r.run(); } }

Ora possiamo usare questo invoker per eseguire l'attività.

public void execute() { Executor executor = new Invoker(); executor.execute( () -> { // task to be performed }); }

Il punto da notare qui è che se l'esecutore non può accettare l'attività per l'esecuzione, genererà RejectedExecutionException .

2.2. ExecutorService

ExecutorService è una soluzione completa per l'elaborazione asincrona. Gestisce una coda in memoria e pianifica le attività inviate in base alla disponibilità del thread.

Per utilizzare ExecutorService, dobbiamo creare una classe Runnable .

public class Task implements Runnable { @Override public void run() { // task details } }

Ora possiamo creare l' istanza ExecutorService e assegnare questa attività. Al momento della creazione, dobbiamo specificare la dimensione del pool di thread.

ExecutorService executor = Executors.newFixedThreadPool(10);

Se vogliamo creare un'istanza ExecutorService a thread singolo , possiamo usare newSingleThreadExecutor (ThreadFactory threadFactory) per creare l'istanza.

Una volta creato l'esecutore, possiamo usarlo per inviare l'attività.

public void execute() { executor.submit(new Task()); }

Possiamo anche creare l' istanza eseguibile durante l'invio dell'attività.

executor.submit(() -> { new Task(); });

Viene inoltre fornito con due metodi di terminazione dell'esecuzione predefiniti. Il primo è shutdown () ; attende che tutte le attività inviate finiscano di essere eseguite. L'altro metodo è shutdownNow () whic h termina immediatamente tutti i pendenti / compiti di esecuzione.

C'è anche un altro metodo awaitTermination (timeout lungo, unità TimeUnit) che si blocca forzatamente fino a quando tutte le attività non hanno completato l'esecuzione dopo che si è verificato un evento di arresto o si è verificato il timeout di esecuzione, o il thread di esecuzione stesso viene interrotto,

try { executor.awaitTermination( 20l, TimeUnit.NANOSECONDS ); } catch (InterruptedException e) { e.printStackTrace(); }

2.3. ScheduledExecutorService

ScheduledExecutorService è un'interfaccia simile a ExecutorService, ma può eseguire attività periodicamente.

I metodi di Executor ed ExecutorService sono programmati in loco senza introdurre alcun ritardo artificiale. Zero o qualsiasi valore negativo indica che la richiesta deve essere eseguita immediatamente.

Possiamo usare sia l' interfaccia Runnable che quella Callable per definire l'attività.

public void execute() { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); Future future = executorService.schedule(() -> { // ... return "Hello world"; }, 1, TimeUnit.SECONDS); ScheduledFuture scheduledFuture = executorService.schedule(() -> { // ... }, 1, TimeUnit.SECONDS); executorService.shutdown(); }

ScheduledExecutorService può anche pianificare l'attività dopo un determinato ritardo fisso :

executorService.scheduleAtFixedRate(() -> { // ... }, 1, 10, TimeUnit.SECONDS); executorService.scheduleWithFixedDelay(() -> { // ... }, 1, 10, TimeUnit.SECONDS);

Qui, il metodo scheduleAtFixedRate (Runnable command, long initialDelay, long period, TimeUnit unit) crea ed esegue un'azione periodica che viene richiamata prima dopo il ritardo iniziale fornito e successivamente con il periodo specificato fino all'arresto dell'istanza del servizio.

The scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit ) method creates and executes a periodic action that is invoked firstly after the provided initial delay, and repeatedly with the given delay between the termination of the executing one and the invocation of the next one.

2.4. Future

Future is used to represent the result of an asynchronous operation. It comes with methods for checking if the asynchronous operation is completed or not, getting the computed result, etc.

What's more, the cancel(boolean mayInterruptIfRunning) API cancels the operation and releases the executing thread. If the value of mayInterruptIfRunning is true, the thread executing the task will be terminated instantly.

Otherwise, in-progress tasks will be allowed to complete.

We can use below code snippet to create a future instance:

public void invoke() { ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit(() -> { // ... Thread.sleep(10000l); return "Hello world"; }); }

We can use following code snippet to check if the future result is ready and fetch the data if the computation is done:

if (future.isDone() && !future.isCancelled()) { try { str = future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }

We can also specify a timeout for a given operation. If the task takes more than this time, a TimeoutException is thrown:

try { future.get(10, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); }

2.5. CountDownLatch

CountDownLatch (introduced in JDK 5) is a utility class which blocks a set of threads until some operation completes.

A CountDownLatch is initialized with a counter(Integer type); this counter decrements as the dependent threads complete execution. But once the counter reaches zero, other threads get released.

You can learn more about CountDownLatch here.

2.6. CyclicBarrier

CyclicBarrier works almost the same as CountDownLatch except that we can reuse it. Unlike CountDownLatch, it allows multiple threads to wait for each other using await() method(known as barrier condition) before invoking the final task.

We need to create a Runnable task instance to initiate the barrier condition:

public class Task implements Runnable { private CyclicBarrier barrier; public Task(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { LOG.info(Thread.currentThread().getName() + " is waiting"); barrier.await(); LOG.info(Thread.currentThread().getName() + " is released"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }

Now we can invoke some threads to race for the barrier condition:

public void start() { CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { // ... LOG.info("All previous tasks are completed"); }); Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); if (!cyclicBarrier.isBroken()) { t1.start(); t2.start(); t3.start(); } }

Here, the isBroken() method checks if any of the threads got interrupted during the execution time. We should always perform this check before performing the actual process.

2.7. Semaphore

The Semaphore is used for blocking thread level access to some part of the physical or logical resource. A semaphore contains a set of permits; whenever a thread tries to enter the critical section, it needs to check the semaphore if a permit is available or not.

If a permit is not available (via tryAcquire()), the thread is not allowed to jump into the critical section; however, if the permit is available the access is granted, and the permit counter decreases.

Once the executing thread releases the critical section, again the permit counter increases (done by release() method).

We can specify a timeout for acquiring access by using the tryAcquire(long timeout, TimeUnit unit) method.

We can also check the number of available permits or the number of threads waiting to acquire the semaphore.

Following code snippet can be used to implement a semaphore:

static Semaphore semaphore = new Semaphore(10); public void execute() throws InterruptedException { LOG.info("Available permit : " + semaphore.availablePermits()); LOG.info("Number of threads waiting to acquire: " + semaphore.getQueueLength()); if (semaphore.tryAcquire()) { try { // ... } finally { semaphore.release(); } } }

We can implement a Mutex like data-structure using Semaphore. More details on this can be found here.

2.8. ThreadFactory

As the name suggests, ThreadFactory acts as a thread (non-existing) pool which creates a new thread on demand. It eliminates the need of a lot of boilerplate coding for implementing efficient thread creation mechanisms.

We can define a ThreadFactory:

public class BaeldungThreadFactory implements ThreadFactory { private int threadId; private String name; public BaeldungThreadFactory(String name) { threadId = 1; this.name = name; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, name + "-Thread_" + threadId); LOG.info("created new thread with id : " + threadId + " and name : " + t.getName()); threadId++; return t; } }

We can use this newThread(Runnable r) method to create a new thread at runtime:

BaeldungThreadFactory factory = new BaeldungThreadFactory( "BaeldungThreadFactory"); for (int i = 0; i < 10; i++) { Thread t = factory.newThread(new Task()); t.start(); }

2.9. BlockingQueue

In asynchronous programming, one of the most common integration patterns is the producer-consumer pattern. The java.util.concurrent package comes with a data-structure know as BlockingQueue – which can be very useful in these async scenarios.

More information and a working example on this is available here.

2.10. DelayQueue

DelayQueue is an infinite-size blocking queue of elements where an element can only be pulled if it's expiration time (known as user defined delay) is completed. Hence, the topmost element (head) will have the most amount delay and it will be polled last.

More information and a working example on this is available here.

2.11. Locks

Not surprisingly, Lock is a utility for blocking other threads from accessing a certain segment of code, apart from the thread that's executing it currently.

The main difference between a Lock and a Synchronized block is that synchronized block is fully contained in a method; however, we can have Lock API’s lock() and unlock() operation in separate methods.

More information and a working example on this is available here.

2.12. Phaser

Phaser è una soluzione più flessibile di CyclicBarrier e CountDownLatch , utilizzati per fungere da barriera riutilizzabile su cui il numero dinamico di thread deve attendere prima di continuare l'esecuzione. Possiamo coordinare più fasi di esecuzione, riutilizzando un'istanza Phaser per ogni fase del programma.

Ulteriori informazioni e un esempio funzionante su questo sono disponibili qui.

3. Conclusione

In questo articolo di panoramica di alto livello, ci siamo concentrati sulle diverse utilità disponibili del pacchetto java.util.concurrent .

Come sempre, il codice sorgente completo è disponibile su GitHub.