Guida a CompletableFuture

1. Introduzione

Questo tutorial è una guida alla funzionalità e ai casi d'uso della classe CompletableFuture che è stata introdotta come miglioramento dell'API di concorrenza di Java 8.

2. Calcolo asincrono in Java

Il calcolo asincrono è difficile da ragionare. Di solito vogliamo pensare a qualsiasi calcolo come una serie di passaggi, ma nel caso del calcolo asincrono, le azioni rappresentate come callback tendono a essere sparse nel codice o profondamente annidate l'una nell'altra . Le cose peggiorano ancora quando dobbiamo gestire gli errori che potrebbero verificarsi durante uno dei passaggi.

L' interfaccia Future è stata aggiunta in Java 5 come risultato di un calcolo asincrono, ma non disponeva di metodi per combinare questi calcoli o gestire possibili errori.

Java 8 ha introdotto la classe CompletableFuture . Insieme all'interfaccia Future , ha anche implementato l' interfaccia CompletionStage . Questa interfaccia definisce il contratto per un passaggio di calcolo asincrono che possiamo combinare con altri passaggi.

CompletableFuture è allo stesso tempo un elemento costitutivo e un framework, con circa 50 metodi diversi per la composizione, la combinazione e l'esecuzione di passaggi di calcolo asincroni e la gestione degli errori .

Un'API così grande può essere schiacciante, ma per lo più ricade in diversi casi d'uso chiari e distinti.

3. Usare CompletableFuture come un futuro semplice

Prima di tutto, la classe CompletableFuture implementa l' interfaccia Future , quindi possiamo usarla come un'implementazione Future , ma con logica di completamento aggiuntiva .

Ad esempio, possiamo creare un'istanza di questa classe con un costruttore senza argomenti per rappresentare un risultato futuro, distribuirlo ai consumatori e completarlo in un momento futuro utilizzando il metodo completo . I consumatori possono utilizzare il metodo get per bloccare il thread corrente finché non viene fornito questo risultato.

Nell'esempio riportato di seguito, abbiamo un metodo che crea un'istanza CompletableFuture , quindi esegue una rotazione di alcuni calcoli in un altro thread e restituisce immediatamente Future .

Quando il calcolo è terminato, il metodo completa il futuro fornendo il risultato al metodo completo :

public Future calculateAsync() throws InterruptedException { CompletableFuture completableFuture = new CompletableFuture(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.complete("Hello"); return null; }); return completableFuture; }

Per scorporare il calcolo, utilizziamo l' API Executor . Questo metodo di creazione e completamento di un CompletableFuture può essere utilizzato insieme a qualsiasi meccanismo di concorrenza o API, inclusi i thread non elaborati.

Si noti che il calculateAsync metodo restituisce un futuro istanza .

Chiamiamo semplicemente il metodo, riceviamo l' istanza Future e chiamiamo il metodo get su di essa quando siamo pronti per bloccare il risultato.

Osserva anche che il metodo get genera alcune eccezioni controllate, vale a dire ExecutionException (che incapsula un'eccezione che si è verificata durante un calcolo) e InterructedException (un'eccezione che indica che un thread che esegue un metodo è stato interrotto):

Future completableFuture = calculateAsync(); // ... String result = completableFuture.get(); assertEquals("Hello", result);

Se conosciamo già il risultato di un calcolo , possiamo utilizzare il metodo static completedFuture con un argomento che rappresenta un risultato di questo calcolo. Di conseguenza, il metodo get del Future non si bloccherà mai, restituendo invece immediatamente questo risultato:

Future completableFuture = CompletableFuture.completedFuture("Hello"); // ... String result = completableFuture.get(); assertEquals("Hello", result);

In alternativa, potremmo voler annullare l'esecuzione di un Future .

4. CompletableFuture con logica di calcolo incapsulata

Il codice sopra ci consente di scegliere qualsiasi meccanismo di esecuzione simultanea, ma cosa succede se vogliamo saltare questo boilerplate ed eseguire semplicemente del codice in modo asincrono?

I metodi statici runAsync e supplyAsync ci consentono di creare un'istanza CompletableFuture dai tipi funzionali Runnable e Supplier corrispondentemente.

Sia Runnable che Supplier sono interfacce funzionali che consentono di passare le loro istanze come espressioni lambda grazie alla nuova funzionalità Java 8.

L' interfaccia Runnable è la stessa vecchia interfaccia utilizzata nei thread e non consente di restituire un valore.

L' interfaccia del fornitore è un'interfaccia funzionale generica con un unico metodo che non ha argomenti e restituisce un valore di un tipo parametrizzato.

Ciò ci consente di fornire un'istanza del fornitore come espressione lambda che esegue il calcolo e restituisce il risultato . È semplice come:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello"); // ... assertEquals("Hello", future.get());

5. Elaborazione dei risultati di calcoli asincroni

Il modo più generico per elaborare il risultato di un calcolo è fornirlo a una funzione. Il metodo thenApply fa esattamente questo; accetta un'istanza di Function , la utilizza per elaborare il risultato e restituisce un Future che contiene un valore restituito da una funzione:

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApply(s -> s + " World"); assertEquals("Hello World", future.get());

Se non è necessario restituire un valore lungo la catena del futuro , è possibile utilizzare un'istanza dell'interfaccia funzionale del consumatore . Il suo unico metodo accetta un parametro e restituisce void .

C'è un metodo per questo caso d'uso nel CompletableFuture. Il metodo thenAccept riceve un Consumer e gli passa il risultato del calcolo. Quindi l'ultima chiamata future.get () restituisce un'istanza di tipo Void :

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenAccept(s -> System.out.println("Computation returned: " + s)); future.get();

Infine, se non abbiamo bisogno del valore del calcolo, né vogliamo restituire un valore alla fine della catena, possiamo passare un Lambda Runnable al metodo thenRun . Nell'esempio seguente, stampiamo semplicemente una riga nella console dopo aver chiamato future.get ():

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenRun(() -> System.out.println("Computation finished.")); future.get();

6. Combinazione di futures

The best part of the CompletableFuture API is the ability to combine CompletableFuture instances in a chain of computation steps.

The result of this chaining is itself a CompletableFuture that allows further chaining and combining. This approach is ubiquitous in functional languages and is often referred to as a monadic design pattern.

In the following example we use the thenCompose method to chain two Futures sequentially.

Notice that this method takes a function that returns a CompletableFuture instance. The argument of this function is the result of the previous computation step. This allows us to use this value inside the next CompletableFuture‘s lambda:

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); assertEquals("Hello World", completableFuture.get());

The thenCompose method, together with thenApply, implement basic building blocks of the monadic pattern. They closely relate to the map and flatMap methods of Stream and Optional classes also available in Java 8.

Both methods receive a function and apply it to the computation result, but the thenCompose (flatMap) method receives a function that returns another object of the same type. This functional structure allows composing the instances of these classes as building blocks.

If we want to execute two independent Futures and do something with their results, we can use the thenCombine method that accepts a Future and a Function with two arguments to process both results:

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCombine(CompletableFuture.supplyAsync( () -> " World"), (s1, s2) -> s1 + s2)); assertEquals("Hello World", completableFuture.get());

A simpler case is when we want to do something with two Futures‘ results, but don't need to pass any resulting value down a Future chain. The thenAcceptBoth method is there to help:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> System.out.println(s1 + s2));

7. Difference Between thenApply() and thenCompose()

In our previous sections, we've shown examples regarding thenApply() and thenCompose(). Both APIs help chain different CompletableFuture calls, but the usage of these 2 functions is different.

7.1. thenApply()

We can use this method to work with a result of the previous call. However, a key point to remember is that the return type will be combined of all calls.

So this method is useful when we want to transform the result of a CompletableFuture call:

CompletableFuture finalResult = compute().thenApply(s-> s + 1);

7.2. thenCompose()

The thenCompose() method is similar to thenApply() in that both return a new Completion Stage. However, thenCompose() uses the previous stage as the argument. It will flatten and return a Future with the result directly, rather than a nested future as we observed in thenApply():

CompletableFuture computeAnother(Integer i){ return CompletableFuture.supplyAsync(() -> 10 + i); } CompletableFuture finalResult = compute().thenCompose(this::computeAnother);

So if the idea is to chain CompletableFuture methods then it’s better to use thenCompose().

Also, note that the difference between these two methods is analogous to the difference between map() and flatMap().

8. Running Multiple Futures in Parallel

When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results.

The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg:

CompletableFuture future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture combinedFuture = CompletableFuture.allOf(future1, future2, future3); // ... combinedFuture.get(); assertTrue(future1.isDone()); assertTrue(future2.isDone()); assertTrue(future3.isDone());

Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture. The limitation of this method is that it does not return the combined results of all Futures. Instead, we have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:

String combined = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); assertEquals("Hello Beautiful World", combined);

The CompletableFuture.join() method is similar to the get method, but it throws an unchecked exception in case the Future does not complete normally. This makes it possible to use it as a method reference in the Stream.map() method.

9. Handling Errors

For error handling in a chain of asynchronous computation steps, we have to adapt the throw/catch idiom in a similar fashion.

Instead of catching an exception in a syntactic block, the CompletableFuture class allows us to handle it in a special handle method. This method receives two parameters: a result of a computation (if it finished successfully), and the exception thrown (if some computation step did not complete normally).

In the following example, we use the handle method to provide a default value when the asynchronous computation of a greeting was finished with an error because no name was provided:

String name = null; // ... CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { if (name == null) { throw new RuntimeException("Computation error!"); } return "Hello, " + name; })}).handle((s, t) -> s != null ? s : "Hello, Stranger!"); assertEquals("Hello, Stranger!", completableFuture.get());

As an alternative scenario, suppose we want to manually complete the Future with a value, as in the first example, but also have the ability to complete it with an exception. The completeExceptionally method is intended for just that. The completableFuture.get() method in the following example throws an ExecutionException with a RuntimeException as its cause:

CompletableFuture completableFuture = new CompletableFuture(); // ... completableFuture.completeExceptionally( new RuntimeException("Calculation failed!")); // ... completableFuture.get(); // ExecutionException

In the example above, we could have handled the exception with the handle method asynchronously, but with the get method we can use the more typical approach of a synchronous exception processing.

10. Async Methods

Most methods of the fluent API in CompletableFuture class have two additional variants with the Async postfix. These methods are usually intended for running a corresponding step of execution in another thread.

The methods without the Async postfix run the next execution stage using a calling thread. In contrast, the Async method without the Executor argument runs a step using the common fork/join pool implementation of Executor that is accessed with the ForkJoinPool.commonPool() method. Finally, the Async method with an Executor argument runs a step using the passed Executor.

Here's a modified example that processes the result of a computation with a Function instance. The only visible difference is the thenApplyAsync method, but under the hood the application of a function is wrapped into a ForkJoinTask instance (for more information on the fork/join framework, see the article “Guide to the Fork/Join Framework in Java”). This allows us to parallelize our computation even more and use system resources more efficiently:

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApplyAsync(s -> s + " World"); assertEquals("Hello World", future.get());

11. JDK 9 CompletableFuture API

Java 9 enhances the CompletableFuture API with the following changes:

  • New factory methods added
  • Support for delays and timeouts
  • Improved support for subclassing

and new instance APIs:

  • Executor defaultExecutor()
  • CompletableFuture newIncompleteFuture()
  • CompletableFuture copy()
  • CompletionStage minimalCompletionStage()
  • CompletableFuture completeAsync(Supplier supplier, Executor executor)
  • CompletableFuture completeAsync(Supplier supplier)
  • CompletableFuture orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit)

We also now have a few static utility methods:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • CompletionStage completedStage(U value)
  • CompletionStage failedStage(Throwable ex)
  • CompletableFuture failedFuture(Throwable ex)

Finally, to address timeout, Java 9 has introduced two more new functions:

  • orTimeout()
  • completeOnTimeout()

Here's the detailed article for further reading: Java 9 CompletableFuture API Improvements.

12. Conclusion

In questo articolo, abbiamo descritto i metodi e i casi d'uso tipici della classe CompletableFuture .

Il codice sorgente dell'articolo è disponibile su GitHub.