Introduction

As mentioned previously I decided to expose the API for an async version of the pac4j API via the CompletableFuture class, using this to wrap values which will be determined in the future.

As this work progressed, I learned that some of my default assumptions regarding error propagation in CompletableFuture (strictly CompletionStage) pipelines is incorrect. I’ve decided to document these, along with some exception-handling patterns, but ran into the fact that to do so, I probably needed to write an introduction to CompletableFutures and the CompletionStage API they implement. This article is a basic (and quick) introduction to the CompletableFuture/CompletionStage API, and will lead into subsequent articles on error propagation in CompletionStage pipelines, and approaches to handling exceptions raised during the pipeline processing.

The CompletionStage interface and CompletableFuture implementation

CompletableFuture implements the CompletionStage interface. A CompletionStage represents a computation which can be a stage in a larger computation (hence the name). This means that, given a CompletionStage which will evaluate to a result, a computational pipeline can be assembled by applying further operations to that result.

To make this a little more concrete (the paragraph above seems very abstract), let’s talk specifically about CompletableFutures. The CompletableFuture class is strictly a CompletionStage implementation dealing with an asynchronous computation which will complete and provide a value at some unspecified time. For those who have encountered, for example, promises, or Scala Futures this will sound quite familiar.

CompletionStages offer methods for taking one CompletionStage and transforming it into another CompletionStage. Where the CompletionStage is a CompletableFuture, the CompletionStage returned following transformation will be another CompletableFuture. Therefore, while you may not know when a CompletableFuture will be completed with a value, you can use CompletionStage methods to define the subsequent processing to be performed based on that value. Some of these methods are analogues (with different names) of methods available on the Java 8 Streams api or classes such as Optional, and we will also compare to some of these.

The thenApply Method

A convenient example of this is the thenApply method, which is analogous to map on the streaming API (or on the Optional class, for example). In effect the contract of thenApply is it returns a new CompletionStage. The result of this future is the result of the original CompletionStage following application of the function supplied to thenApply, and this CompletionStage will be completed once the function completes (either successfully or exceptionally). Initially we will restrict the discussion, for clarity, to successful completion only, and we will return to exceptional behaviours subsequently.

In the context of the CompletableFuture class you can say "given future A which will evaluate successfully to value a, I want to convert it to a future B which will evaluate to value b by transforming the result using thenApply to apply a function which converts value a to value b".

A simple demonstration of this is the following JUnit test method (we’ll reuse delayedValueSupplier in subsequent discussions):-

thenApply.java
// We'll allow a 2-sized thread pool for async execution, rather than using the fork/join pool for asynchronous
// value generation
final Executor executor = Executors.newFixedThreadPool(2);


@Test
public void demoThenApply() throws Exception {
    final CompletableFuture<Integer> future = CompletableFuture
        .supplyAsync(delayedValueSupplier(1), executor) (1)
        .thenApply(i -> i + 3);
    assertThat(future.get(), is(4));
}

private Supplier<Integer> delayedValueSupplier(final int value) {
    return delayedValueSupplier(value, 1000);
}

private Supplier<Integer> delayedValueSupplier(final int value, final int delayMs) {
    return () -> {
        try {
            Thread.sleep(delayMs);
        } catch (InterruptedException e) {
            throw new RuntimeException("Problem while waiting to return value");
        }
        return value;
    };
}
1 supplyAsync is a simple factory method for CompletableFutures and enables us to define a delayed result via the delayedValueSupplier methods

This is shown schematically below:-

CFthenApplySuccess
Figure 1. thenApply transforms similar to map

Figure 1 also lets me introduce some of the diagrammatic notation I’ll be using in this and subsequent articles.

The blue square boxes represent CompletionStages (or for the purposes of our discussion CompletableFutures).

A green box inside a CompletionStage containing a white letter means that that stage will complete successfully yielding a value represented by the letter.

A blue arrow represents an application of a function which will complete successfully, via thenApply, and the f: a → b represents that the function applied will convert the value a to value b.

We can therefore see that, for successful completion, thenApply behaves similarly to the map method on a non-empty Optional. If you simply change the meaning of the blue square box to "an Optional", green to "containing the specified value" and the blue arrow to "map", using the same function, assuming successful completion, the diagram is identical.

It is worth noting at this point that the cyclops-react library offers the FutureW class which offers the more usual map/flatMap semantics for transformations.

Finally, for the thenApply method (and most other then…​ methods) there are also asynchronous analogues which will cause the transformation to be executed asynchronously on either the default asynchronous execution capability (the fork/join pool) or one specified by the caller. While the specifics of the threading model vary (i.e. which thread will run the new CompletionStage) the essential semantics are the same.

The thenCompose Method

Now we have explored thenApply, we can examine another similar method - thenCompose. This is rather like thenApply, except that the function applied returns a CompletionStage implementor). This is useful for the scenario where you want to initiate a new asynchronous computation which can only be defined once you have the result of the current computation, and is therefore analogous to the flatMap method on the Optional class.

The thenApply method covers many scenarios, but it might be that the work you want to do in the CompletionStage you are defining can only be accessed via an asynchronous API, which you will have to wrap in a CompletableFuture. Rather than dig through a hierarchy of CompletionStages, thenCompose provides a CompletionStage which will complete when the one generated by application of the function to the result completes. The result of this is that the programmer only has to consider a CompletableFuture which will complete with a result.

A simple (though contrived) demonstration of this is the following JUnit test method :-

thenCompose.java
@Test
public void demoThenCompose() throws Exception {
    // We'll use one future to specify how long a subsequent future will take
    final CompletableFuture<Integer> future = CompletableFuture
        .supplyAsync(delayedValueSupplier(3), executor)
        .thenCompose(i -> CompletableFuture
            .supplyAsync(delayedValueSupplier(2, i * 100), executor)); (1)
    assertThat(future.get(), is(2));
}
1 By using supplyAsync within the thenCompose we ensure that we will generate another CompletableFuture which may complete on yet another thread.

A schematic view based on the diagrammatic conventions established previously is shown in Figure 2 below:-

CFthenComposeSuccess
Figure 2. thenCompose transforms similar to flatMap

We’ve represented thenCompose by a different coloured arrow and introduced the cf(x) notation in the function definition - what this means is that the function will return a CompletableFuture (could be any CompletionStage implementor really) which will complete successfully with the value x.

Again, if you replace the meaning of the blue square wrapping green square with "Optional containing specified value" and cf(x) with Optional.of(x) you can see that thenCompose is analogous to flatMap on the Optional class, and cyclops-react’s FutureW class has a flatMap method.

thenCompose also offers async variants, but again the essential semantics remain the same.

Pipelines

Since the thenApply/thenCompose methods of CompletionStage also return CompletionStage implementors, transformations can easily be chained together so that once once stage completes, the processing of the next will commence, and so on and so forth. Figure 3 shows an example pipeline using both thenApply and thenCompose for processing.

CFPipeline
Figure 3. Pipeline of CompletionStages (example)

We begin with a CompletionStage which will complete with value a but following the application of thenApply and thenCompose we are left with a CompletionStage which will complete with value c. In effect we combine multiple computations into one via this mechanism.

The ease of definition of such a pipeline given a CompletionStage/CompletableFuture can be seen in the following code snippet:-

multiStagePipeline.java
@Test
public void demoMultiStagePipeline() throws Exception {
    // We'll use one future to specify how long a subsequent future will take
    final CompletableFuture<Integer> future = CompletableFuture
        .supplyAsync(delayedValueSupplier(3), executor)
        .thenApply(i -> {
            System.out.println("First future completed, " + i);
            return i + 1;
        })
        .thenCompose(i -> CompletableFuture
            .supplyAsync(delayedValueSupplier(i + 2, i * 100), executor));
    assertThat(future.get(), is(6));
}

This can be particularly convenient when dealing with asynchronous processing using CompletableFutures. It may be necessary to call out to an asynchronous API, and then do some subsequent processing (which itself may be partially or wholly asynchronous) on the result.

You could call CompletableFuture::get() to block and return the value when complete (see below), but that blocks your current thread until the result is ready.

You could alternatively use thenApply/thenCompose (or one of the async variations) to define the subsequent processing to be applied to generate a new result, and avoid using the current thread for the subsequent computation elements unless or until you absolutely cannot proceed without the result. This may even prove that you don’t need to return the asynchronous processing to the original calling thread at all.

Consumption

So, once we have composed multiple computations together into one, to give our final CompletionStage, how do we use the final result?

There are multiple options for this. For this discussion (since I will be writing a subsequent article on exception handling in CompletionStage pipelines) we’ll restrict to the assumption that all computations are sufficiently straightforward as to guarantee success. This is an unrealistic assumption in most use-cases for CompletableFutures, but it provides us with a simple discussion point on which we can expand later.

The "simplest" approach to consuming the result of a CompletableFuture is to wait until the future is complete (possibly up to a specified timeout period) and then continue processing with the completion value. The get() and join() methods offer this functionality - calling any of these methods blocks the current thread until the future is complete, and then returns the completion value. The get method can optionally take a timeout value, after which time to throw an exception The join method does not offer a timeout, but unlike get will only throw unchecked exceptions, making it more convenient for some use cases. Note that get and join methods are not CompletionStage methods, but only arise on the CompletableFuture implementation.

The above methods are useful if you actually want to block the current thread until processing is complete. However, there is not always a requirement to do this (and indeed sometimes it’s undesirable, for example when using non-blocking event-driven frameworks such as vert.x, blocking an event loop thread causes serious problems). Therefore there are additional sets of methods which simply lead to consumption of the result (or exception) when it is ready.

These include the thenAccept, handle and whenComplete methods. all of which have async variants if you choose to migrate subsequent processing to yet another thread for any reason. The thenAccept method returns a CompletableFuture<Void> (so you can continue the pipeline if you wish) and takes a consumer for the result. The handle method is akin to thenApply, except that it permits you to handle exceptional completion of the previous stage, as well as successful. The whenComplete method doesn’t transform the result or exception, but allows you to act on them (it returns a CompletionStage with the same result or exception, but allows you to apply a BiConsumer to them). There is one final method of interest - the exceptionally method permits you to handle the exceptions raised during processing of a specific stage and convert to a known value, where this is useful. We’ll return to exception handling in a subsequent article.

In conclusion, it is possible to use the CompletionStage API to assemble processing pipelines for asynchronously generated results expressed as CompletableFutures, and the key elements of this API are semantically analogous to map and flatMap on classes such as Optional. These pipelines can be used to continue off-thread processing after a future is completed (and possibly even to perform all subsequent processing off the originating thread).