Introduzione ai pool di thread in Java

1. Introduzione

Questo articolo è uno sguardo ai pool di thread in Java, a partire dalle diverse implementazioni nella libreria Java standard e poi guardando la libreria Guava di Google.

2. Il pool di thread

In Java, i thread vengono mappati ai thread a livello di sistema che sono le risorse del sistema operativo. Se crei thread in modo incontrollabile, potresti esaurire rapidamente queste risorse.

Anche il cambio di contesto tra i thread viene eseguito dal sistema operativo, al fine di emulare il parallelismo. Una visione semplicistica è che: più thread vengono generati, meno tempo ogni thread trascorre a svolgere il lavoro effettivo.

Il pattern Thread Pool aiuta a risparmiare risorse in un'applicazione multithread e anche a contenere il parallelismo in determinati limiti predefiniti.

Quando si utilizza un pool di thread, si scrive il codice simultaneo sotto forma di attività parallele e le si invia per l'esecuzione a un'istanza di un pool di thread . Questa istanza controlla diversi thread riutilizzati per eseguire queste attività.

Il modello consente di controllare il numero di thread che l'applicazione sta creando , il loro ciclo di vita, nonché di pianificare l'esecuzione delle attività e mantenere le attività in entrata in una coda.

3. Pool di thread in Java

3.1. Executors , Executor e ExecutorService

La classe helper Executors contiene diversi metodi per la creazione di istanze di pool di thread preconfigurate. Queste classi sono un buon punto di partenza: usale se non è necessario applicare alcuna regolazione fine personalizzata.

Le interfacce Executor ed ExecutorService vengono utilizzate per lavorare con diverse implementazioni di pool di thread in Java. Di solito, è necessario mantenere il codice separato dall'attuale implementazione del pool di thread e utilizzare queste interfacce in tutta l'applicazione.

L' esecutore interfaccia ha un unico eseguire metodo per inviare Runnable le istanze per l'esecuzione.

Di seguito è riportato un rapido esempio di come utilizzare l' API Executors per acquisire un'istanza Executor supportata da un singolo pool di thread e una coda illimitata per eseguire le attività in sequenza. Qui, eseguiamo una singola operazione che stampa semplicemente " Hello World " sullo schermo. L'attività viene inviata come lambda (una funzionalità Java 8) che si deduce come eseguibile .

Executor executor = Executors.newSingleThreadExecutor(); executor.execute(() -> System.out.println("Hello World"));

L' interfaccia ExecutorService contiene un gran numero di metodi per controllare lo stato di avanzamento delle attività e gestire la cessazione del servizio . Utilizzando questa interfaccia, è possibile inviare le attività per l'esecuzione e anche controllare la loro esecuzione utilizzando l' istanza Future restituita .

Nell'esempio seguente , creiamo un ExecutorService , inviare un compito e quindi utilizzare il restituita Futuro 's get metodo per attendere che il compito presentato è finito e il valore viene restituito:

ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit(() -> "Hello World"); // some operations String result = future.get();

Ovviamente, in uno scenario di vita reale, di solito non si desidera chiamare subito future.get () , ma rimandare la chiamata finché non si ha effettivamente bisogno del valore del calcolo.

Il metodo submit viene sovraccaricato per accettare Runnable o Callable, entrambe interfacce funzionali e possono essere passati come lambda (a partire da Java 8).

Il metodo singolo di Runnable non genera un'eccezione e non restituisce un valore. L' interfaccia Callable può essere più comoda, poiché ci consente di generare un'eccezione e restituire un valore.

Infine, per consentire al compilatore di dedurre il tipo Callable , restituisci semplicemente un valore da lambda.

Per ulteriori esempi di utilizzo dell'interfaccia ExecutorService e dei futures, consultare "A Guide to the Java ExecutorService".

3.2. ThreadPoolExecutor

Il ThreadPoolExecutor è un'implementazione estensibile piscina filo con molti parametri e ganci per la messa a punto.

I principali parametri di configurazione che discuteremo qui sono: corePoolSize , maximumPoolSize e keepAliveTime .

Il pool è costituito da un numero fisso di thread principali che vengono mantenuti all'interno tutto il tempo e da alcuni thread in eccesso che possono essere generati e quindi terminati quando non sono più necessari. Il parametro corePoolSize è il numero di thread principali di cui verrà creata un'istanza e conservati nel pool. Quando arriva una nuova attività, se tutti i thread principali sono occupati e la coda interna è piena, il pool può crescere fino al massimoPoolSize .

Il parametro keepAliveTime è l'intervallo di tempo per il quale i thread in eccesso (istanziati in eccesso rispetto a corePoolSize ) possono esistere nello stato inattivo. Per impostazione predefinita, ThreadPoolExecutor considera solo i thread non core per la rimozione. Per applicare la stessa politica di rimozione ai thread principali, possiamo utilizzare il metodo allowCoreThreadTimeOut (true) .

Questi parametri coprono un'ampia gamma di casi d'uso, ma le configurazioni più tipiche sono predefinite nei metodi statici di Executors .

Ad esempio , il metodo newFixedThreadPool crea un ThreadPoolExecutor con valori dei parametri corePoolSize e maximumPoolSize uguali e un keepAliveTime zero . Ciò significa che il numero di thread in questo pool di thread è sempre lo stesso:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(2, executor.getPoolSize()); assertEquals(1, executor.getQueue().size());

Nell'esempio precedente creiamo un'istanza di ThreadPoolExecutor con un conteggio thread fisso di 2. Ciò significa che se il numero di attività in esecuzione simultanea è sempre inferiore o uguale a due, vengono eseguite immediatamente. Altrimenti, alcune di queste attività potrebbero essere messe in coda in attesa del loro turno .

Abbiamo creato tre attività richiamabili che imitano il lavoro pesante dormendo per 1000 millisecondi. Le prime due attività verranno eseguite contemporaneamente e la terza dovrà attendere in coda. Possiamo verificarlo chiamando i metodi getPoolSize () e getQueue (). Size () immediatamente dopo aver inviato le attività.

Un altro ThreadPoolExecutor preconfigurato può essere creato con il metodo Executors.newCachedThreadPool () . Questo metodo non riceve affatto un numero di thread. Il corePoolSize è effettivamente impostato a 0, e il maximumPoolSize è impostato Integer.MAX_VALUE per questo esempio. Il KeepAliveTime è di 60 secondi per questo.

Questi valori di parametro indicano che il pool di thread memorizzato nella cache può crescere senza limiti per adattarsi a qualsiasi numero di attività inoltrate . Ma quando i thread non saranno più necessari, verranno eliminati dopo 60 secondi di inattività. Un tipico caso d'uso è quando nella tua applicazione sono presenti molte attività di breve durata.

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(3, executor.getPoolSize()); assertEquals(0, executor.getQueue().size());

La dimensione della coda nell'esempio precedente sarà sempre zero perché internamente viene utilizzata un'istanza SynchronousQueue . In una SynchronousQueue , le coppie di operazioni di inserimento e rimozione avvengono sempre contemporaneamente, quindi la coda non contiene mai nulla.

The Executors.newSingleThreadExecutor() API creates another typical form of ThreadPoolExecutor containing a single thread. The single thread executor is ideal for creating an event loop. The corePoolSize and maximumPoolSize parameters are equal to 1, and the keepAliveTime is zero.

Tasks in the above example will be executed sequentially, so the flag value will be 2 after the task's completion:

AtomicInteger counter = new AtomicInteger(); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { counter.set(1); }); executor.submit(() -> { counter.compareAndSet(1, 2); });

Additionally, this ThreadPoolExecutor is decorated with an immutable wrapper, so it cannot be reconfigured after creation. Note that also this is the reason we cannot cast it to a ThreadPoolExecutor.

3.3. ScheduledThreadPoolExecutor

The ScheduledThreadPoolExecutor extends the ThreadPoolExecutor class and also implements the ScheduledExecutorService interface with several additional methods:

  • schedule method allows to execute a task once after a specified delay;
  • scheduleAtFixedRate method allows to execute a task after a specified initial delay and then execute it repeatedly with a certain period; the period argument is the time measured between the starting times of the tasks, so the execution rate is fixed;
  • scheduleWithFixedDelay method is similar to scheduleAtFixedRate in that it repeatedly executes the given task, but the specified delay is measured between the end of the previous task and the start of the next; the execution rate may vary depending on the time it takes to execute any given task.

The Executors.newScheduledThreadPool() method is typically used to create a ScheduledThreadPoolExecutor with a given corePoolSize, unbounded maximumPoolSize and zero keepAliveTime. Here's how to schedule a task for execution in 500 milliseconds:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); executor.schedule(() -> { System.out.println("Hello World"); }, 500, TimeUnit.MILLISECONDS);

The following code shows how to execute a task after 500 milliseconds delay and then repeat it every 100 milliseconds. After scheduling the task, we wait until it fires three times using the CountDownLatch lock, then cancel it using the Future.cancel() method.

CountDownLatch lock = new CountDownLatch(3); ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); ScheduledFuture future = executor.scheduleAtFixedRate(() -> { System.out.println("Hello World"); lock.countDown(); }, 500, 100, TimeUnit.MILLISECONDS); lock.await(1000, TimeUnit.MILLISECONDS); future.cancel(true);

3.4. ForkJoinPool

ForkJoinPool is the central part of the fork/join framework introduced in Java 7. It solves a common problem of spawning multiple tasks in recursive algorithms. Using a simple ThreadPoolExecutor, you will run out of threads quickly, as every task or subtask requires its own thread to run.

In a fork/join framework, any task can spawn (fork) a number of subtasks and wait for their completion using the join method. The benefit of the fork/join framework is that it does not create a new thread for each task or subtask, implementing the Work Stealing algorithm instead. This framework is thoroughly described in the article “Guide to the Fork/Join Framework in Java”

Let’s look at a simple example of using ForkJoinPool to traverse a tree of nodes and calculate the sum of all leaf values. Here’s a simple implementation of a tree consisting of a node, an int value and a set of child nodes:

static class TreeNode { int value; Set children; TreeNode(int value, TreeNode... children) { this.value = value; this.children = Sets.newHashSet(children); } }

Now if we want to sum all values in a tree in parallel, we need to implement a RecursiveTask interface. Each task receives its own node and adds its value to the sum of values of its children. To calculate the sum of children values, task implementation does the following:

  • streams the children set,
  • maps over this stream, creating a new CountingTask for each element,
  • executes each subtask by forking it,
  • collects the results by calling the join method on each forked task,
  • sums the results using the Collectors.summingInt collector.
public static class CountingTask extends RecursiveTask { private final TreeNode node; public CountingTask(TreeNode node) { this.node = node; } @Override protected Integer compute() { return node.value + node.children.stream() .map(childNode -> new CountingTask(childNode).fork()) .collect(Collectors.summingInt(ForkJoinTask::join)); } }

The code to run the calculation on an actual tree is very simple:

TreeNode tree = new TreeNode(5, new TreeNode(3), new TreeNode(2, new TreeNode(2), new TreeNode(8))); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); int sum = forkJoinPool.invoke(new CountingTask(tree));

4. Thread Pool's Implementation in Guava

Guava is a popular Google library of utilities. It has many useful concurrency classes, including several handy implementations of ExecutorService. The implementing classes are not accessible for direct instantiation or subclassing, so the only entry point for creating their instances is the MoreExecutors helper class.

4.1. Adding Guava as a Maven Dependency

Add the following dependency to your Maven pom file to include the Guava library to your project. You can find the latest version of Guava library in the Maven Central repository:

 com.google.guava guava 19.0 

4.2. Direct Executor and Direct Executor Service

Sometimes you want to execute the task either in the current thread or in a thread pool, depending on some conditions. You would prefer to use a single Executor interface and just switch the implementation. Although it is not so hard to come up with an implementation of Executor or ExecutorService that executes the tasks in the current thread, it still requires writing some boilerplate code.

Gladly, Guava provides predefined instances for us.

Here's an example that demonstrates the execution of a task in the same thread. Although the provided task sleeps for 500 milliseconds, it blocks the current thread, and the result is available immediately after the execute call is finished:

Executor executor = MoreExecutors.directExecutor(); AtomicBoolean executed = new AtomicBoolean(); executor.execute(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } executed.set(true); }); assertTrue(executed.get());

The instance returned by the directExecutor() method is actually a static singleton, so using this method does not provide any overhead on object creation at all.

You should prefer this method to the MoreExecutors.newDirectExecutorService() because that API creates a full-fledged executor service implementation on every call.

4.3. Exiting Executor Services

Another common problem is shutting down the virtual machine while a thread pool is still running its tasks. Even with a cancellation mechanism in place, there is no guarantee that the tasks will behave nicely and stop their work when the executor service shuts down. This may cause JVM to hang indefinitely while the tasks keep doing their work.

To solve this problem, Guava introduces a family of exiting executor services. They are based on daemon threads that terminate together with the JVM.

These services also add a shutdown hook with the Runtime.getRuntime().addShutdownHook() method and prevent the VM from terminating for a configured amount of time before giving up on hung tasks.

In the following example, we're submitting the task that contains an infinite loop, but we use an exiting executor service with a configured time of 100 milliseconds to wait for the tasks upon VM termination. Without the exitingExecutorService in place, this task would cause the VM to hang indefinitely:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); ExecutorService executorService = MoreExecutors.getExitingExecutorService(executor, 100, TimeUnit.MILLISECONDS); executorService.submit(() -> { while (true) { } });

4.4. Listening Decorators

Listening decorators allow you to wrap the ExecutorService and receive ListenableFuture instances upon task submission instead of simple Future instances. The ListenableFuture interface extends Future and has a single additional method addListener. This method allows adding a listener that is called upon future completion.

Raramente vorrai usare direttamente il metodo ListenableFuture.addListener () , ma è essenziale per la maggior parte dei metodi di supporto nella classe di utilità Futures . Ad esempio, con il metodo Futures.allAsList () puoi combinare diverse istanze di ListenableFuture in un unico ListenableFuture che si completa al completamento con successo di tutti i futures combinati:

ExecutorService executorService = Executors.newCachedThreadPool(); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService); ListenableFuture future1 = listeningExecutorService.submit(() -> "Hello"); ListenableFuture future2 = listeningExecutorService.submit(() -> "World"); String greeting = Futures.allAsList(future1, future2).get() .stream() .collect(Collectors.joining(" ")); assertEquals("Hello World", greeting);

5. conclusione

In questo articolo, abbiamo discusso il pattern Thread Pool e le sue implementazioni nella libreria Java standard e nella libreria Guava di Google.

Il codice sorgente dell'articolo è disponibile su GitHub.