Part 13 of 16

CompletableFuture: Async Pipelines and Non-Blocking Composition

The Problem with Future<T>

Fetching a product page on an e-commerce site requires at least three data sources: product details, live pricing, and inventory availability. Done sequentially, a 200 ms call to each service adds up to 600 ms. Done in parallel with a properly composed CompletableFuture pipeline, the wall-clock time drops to roughly the slowest of the three — around 200 ms. This article shows exactly how to build that pipeline and everything else you need to write robust async code in Java 8.

Java 5 introduced Future<T> for async computation. The problem: you can only get the result by calling get(), which blocks:

// Java 7
ExecutorService pool = Executors.newFixedThreadPool(4);
Future<User> future = pool.submit(() -> fetchUser(userId));

// ...

User user = future.get(); // BLOCKS until done

If you have multiple async operations that depend on each other, you end up with a chain of blocking get() calls — effectively synchronous execution with extra overhead.

CompletableFuture solves this with a fluent, callback-based API: instead of blocking to get a result, you register what should happen when the result is ready.


Creating CompletableFutures

supplyAsync — async computation returning a value

// Runs in ForkJoinPool.commonPool() by default
CompletableFuture<User> future = CompletableFuture.supplyAsync(() -> fetchUser(42L));

// With custom executor
ExecutorService pool = Executors.newFixedThreadPool(10);
CompletableFuture<User> future2 = CompletableFuture.supplyAsync(
    () -> fetchUser(42L), pool);

runAsync — async computation with no return value

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> sendEmail(user));

Completed futures

// Already-completed future — useful for testing and default values
CompletableFuture<String> done = CompletableFuture.completedFuture("Hello");

// Failed future
CompletableFuture<String> failed = new CompletableFuture<>();
failed.completeExceptionally(new RuntimeException("Something went wrong"));

Transforming Results

thenApply — transform the result (synchronous, same thread)

CompletableFuture<String> name = CompletableFuture
    .supplyAsync(() -> fetchUser(42L))
    .thenApply(User::getName);

thenApplyAsync — transform on a different thread

CompletableFuture<String> name = CompletableFuture
    .supplyAsync(() -> fetchUser(42L))
    .thenApplyAsync(User::getName, pool); // runs getName on pool thread

The Async suffix variants submit the callback to a thread pool (common pool or custom) rather than running on the completing thread. Use Async when the callback itself is expensive.

thenCompose — chain async operations that return CompletableFuture

// fetchUser returns CompletableFuture<User>
// fetchOrders(user) returns CompletableFuture<List<Order>>

// WRONG: thenApply gives CompletableFuture<CompletableFuture<List<Order>>>
CompletableFuture<CompletableFuture<List<Order>>> nested =
    CompletableFuture.supplyAsync(() -> fetchUser(42L))
        .thenApply(user -> fetchOrders(user.getId())); // wrong!

// RIGHT: thenCompose flattens
CompletableFuture<List<Order>> orders =
    CompletableFuture.supplyAsync(() -> fetchUser(42L))
        .thenCompose(user -> fetchOrders(user.getId())); // correct

thenCompose is to CompletableFuture what flatMap is to Optional and Stream.


Consuming Results

thenAccept — consume the result without returning a value

CompletableFuture.supplyAsync(() -> fetchUser(42L))
    .thenAccept(user -> System.out.println("Fetched: " + user.getName()));

thenRun — run a Runnable after completion (ignores the result)

CompletableFuture.supplyAsync(() -> processData())
    .thenRun(() -> System.out.println("Processing complete"));

Combining Multiple Futures

thenCombine — combine two independent futures

// Fetch user and orders in parallel, combine when both are done
CompletableFuture<User> userFuture   = CompletableFuture.supplyAsync(() -> fetchUser(id));
CompletableFuture<List<Order>> orderFuture = CompletableFuture.supplyAsync(() -> fetchOrders(id));

CompletableFuture<UserWithOrders> combined = userFuture
    .thenCombine(orderFuture, (user, orders) -> new UserWithOrders(user, orders));

allOf — wait for all futures to complete

List<CompletableFuture<User>> futures = userIds.stream()
    .map(id -> CompletableFuture.supplyAsync(() -> fetchUser(id)))
    .collect(Collectors.toList());

// Wait for all to complete
CompletableFuture<Void> all = CompletableFuture.allOf(
    futures.toArray(new CompletableFuture[0]));

// Collect results after all complete
CompletableFuture<List<User>> allUsers = all.thenApply(v ->
    futures.stream()
        .map(CompletableFuture::join) // join() doesn't throw checked exception
        .collect(Collectors.toList())
);

anyOf — return the result of the first future to complete

CompletableFuture<Object> fastest = CompletableFuture.anyOf(
    CompletableFuture.supplyAsync(() -> fetchFromPrimaryDB()),
    CompletableFuture.supplyAsync(() -> fetchFromReplicaDB()),
    CompletableFuture.supplyAsync(() -> fetchFromCache())
);

// Cast to the expected type
String result = (String) fastest.get();

Error Handling

exceptionally — provide a fallback on exception

CompletableFuture<User> safe = CompletableFuture
    .supplyAsync(() -> fetchUser(id))
    .exceptionally(ex -> {
        log.error("Failed to fetch user {}: {}", id, ex.getMessage());
        return User.anonymous();
    });

exceptionally only runs if the previous stage completed exceptionally. If successful, it passes the result through unchanged.

handle — handle both success and failure

CompletableFuture<String> result = CompletableFuture
    .supplyAsync(() -> fetchUser(id))
    .handle((user, ex) -> {
        if (ex != null) {
            log.error("Error: {}", ex.getMessage());
            return "fallback-user";
        }
        return user.getName();
    });

handle always runs — regardless of success or failure. The ex parameter is null on success and non-null on failure.

whenComplete — side effects on completion (doesn’t transform the result)

CompletableFuture<User> user = CompletableFuture
    .supplyAsync(() -> fetchUser(id))
    .whenComplete((result, ex) -> {
        if (ex != null) metrics.recordFailure();
        else metrics.recordSuccess();
    });
// The future's value/exception is still propagated unchanged

Blocking to Get the Result

When you need to block and get the final result (e.g., at the edge of an async system):

// get() — throws checked ExecutionException, InterruptedException
try {
    User user = future.get();
    User userWithTimeout = future.get(5, TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
    // handle
}

// join() — throws unchecked CompletionException (preferred in lambdas)
User user = future.join();

// getNow(fallback) — returns fallback if not yet complete
User user = future.getNow(User.anonymous());

Full Pipeline Example

Fetch a user, their orders, and the shipping status of each order — all in parallel where possible:

CompletableFuture<Report> report = CompletableFuture
    .supplyAsync(() -> fetchUser(userId), ioPool)          // fetch user
    .thenCompose(user ->
        CompletableFuture.supplyAsync(() -> fetchOrders(user.getId()), ioPool)
            .thenApply(orders -> new UserWithOrders(user, orders))
    )
    .thenCompose(uwo -> {
        // Fetch shipping status for each order in parallel
        List<CompletableFuture<OrderStatus>> statusFutures = uwo.getOrders().stream()
            .map(order -> CompletableFuture.supplyAsync(
                () -> fetchShippingStatus(order.getId()), ioPool))
            .collect(Collectors.toList());

        return CompletableFuture.allOf(statusFutures.toArray(new CompletableFuture[0]))
            .thenApply(v -> {
                List<OrderStatus> statuses = statusFutures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
                return new Report(uwo.getUser(), uwo.getOrders(), statuses);
            });
    })
    .exceptionally(ex -> {
        log.error("Report generation failed", ex);
        return Report.empty();
    });

// Blocking get at the HTTP handler boundary
Report r = report.get(10, TimeUnit.SECONDS);

Real-World Async Pipeline: Product Page Load

This is the canonical real-world CompletableFuture problem: a product detail page needs data from three independent downstream services. Calling them sequentially wastes time — each call sits idle waiting for the network while the other two services are available. The right approach is to fire all three concurrently and combine the results when all three are ready.

// Domain types
record ProductDetails(String id, String name, String description) {}
record PricingInfo(String productId, BigDecimal price, BigDecimal discountedPrice) {}
record InventoryStatus(String productId, int quantityAvailable, boolean inStock) {}
record ProductPage(ProductDetails details, PricingInfo pricing, InventoryStatus inventory) {}

public class ProductPageService {

    // Dedicated I/O pool — never use commonPool for blocking I/O
    private static final ExecutorService IO_POOL =
        Executors.newFixedThreadPool(20, r -> {
            Thread t = new Thread(r, "io-pool");
            t.setDaemon(true);
            return t;
        });

    private final ProductClient productClient;
    private final PricingClient pricingClient;
    private final InventoryClient inventoryClient;

    /**
     * Builds a complete product page by fetching all three data sources in parallel.
     * Total latency ≈ max(product, pricing, inventory) rather than their sum.
     */
    public CompletableFuture<ProductPage> buildProductPage(String productId) {

        // Fire all three requests simultaneously
        CompletableFuture<ProductDetails> detailsFuture =
            CompletableFuture.supplyAsync(
                () -> productClient.fetchDetails(productId), IO_POOL);

        CompletableFuture<PricingInfo> pricingFuture =
            CompletableFuture.supplyAsync(
                () -> pricingClient.getPrice(productId), IO_POOL);

        CompletableFuture<InventoryStatus> inventoryFuture =
            CompletableFuture.supplyAsync(
                () -> inventoryClient.checkStock(productId), IO_POOL);

        // Combine all three when they are all complete
        // thenCombine joins two futures; chain twice to join all three
        return detailsFuture
            .thenCombine(pricingFuture,
                (details, pricing) -> new Object[]{details, pricing})   // intermediate pair
            .thenCombine(inventoryFuture,
                (pair, inventory) -> new ProductPage(
                    (ProductDetails) pair[0],
                    (PricingInfo)    pair[1],
                    inventory))
            .exceptionally(ex -> {
                log.error("Failed to build product page for {}: {}", productId, ex.getMessage());
                return ProductPage.unavailable(productId);
            });
    }
}

A cleaner alternative using allOf when you have more than two futures:

public CompletableFuture<ProductPage> buildProductPageAllOf(String productId) {

    CompletableFuture<ProductDetails>  detailsFuture   = supplyAsync(() -> productClient.fetchDetails(productId), IO_POOL);
    CompletableFuture<PricingInfo>     pricingFuture   = supplyAsync(() -> pricingClient.getPrice(productId),     IO_POOL);
    CompletableFuture<InventoryStatus> inventoryFuture = supplyAsync(() -> inventoryClient.checkStock(productId), IO_POOL);

    // allOf returns CompletableFuture<Void> — we then join individual futures (safe: all done)
    return CompletableFuture
        .allOf(detailsFuture, pricingFuture, inventoryFuture)
        .thenApply(v -> new ProductPage(
            detailsFuture.join(),
            pricingFuture.join(),
            inventoryFuture.join()))
        .exceptionally(ex -> {
            log.error("Product page load failed for {}", productId, ex);
            return ProductPage.unavailable(productId);
        });
}

Why join() is safe inside thenApply here: allOf completes only when all supplied futures have completed. By the time thenApply runs, detailsFuture.join() is guaranteed to return immediately — it will never block.

Observed latency improvement:

Approach3 × 200 ms services
Sequential calls~600 ms
Parallel with allOf~205 ms
Saving~66%

Real-World Production Examples

E-Commerce Checkout: Fan-Out Pattern

Fetch product details, inventory, and pricing in parallel — all three independent, combined when done:

ExecutorService ioPool = Executors.newCachedThreadPool();

public CompletableFuture<CheckoutData> buildCheckout(String productId) {
    // Launch all three requests at the same time
    CompletableFuture<ProductDetails> productFuture =
        CompletableFuture.supplyAsync(() -> fetchProductDetails(productId), ioPool);
    CompletableFuture<Integer> inventoryFuture =
        CompletableFuture.supplyAsync(() -> checkInventory(productId), ioPool);
    CompletableFuture<PricingInfo> priceFuture =
        CompletableFuture.supplyAsync(() -> calculatePrice(productId), ioPool);

    // Combine: wait for product + inventory, then combine with pricing
    return productFuture
        .thenCombine(inventoryFuture, ImmutablePair::new)
        .thenCombine(priceFuture,
            (pair, pricing) -> new CheckoutData(pair.getLeft(), pair.getRight(), pricing))
        .exceptionally(ex -> {
            log.error("Checkout build failed for product {}", productId, ex);
            return CheckoutData.unavailable();
        });
}
// Total time ≈ max(product, inventory, pricing) instead of their sum

Credit Approval: Fan-In (Sequential Dependent) Pipeline

Each step depends on the previous one — use thenCompose to chain without blocking:

public CompletableFuture<ApprovalDecision> approveLoan(Long customerId) {
    return fetchCustomerDetails(customerId)                     // Step 1
        .thenCompose(customer ->
            getCreditScore(customer.getId())                    // Step 2: needs customer
                .thenApply(score -> new CustomerScore(customer, score))
        )
        .thenCompose(cs ->
            getBankReports(cs.customer().getId())               // Step 3: needs customer
                .thenApply(reports -> new LoanContext(cs.customer(), cs.score(), reports))
        )
        .thenApply(ctx -> makeDecision(ctx.customer(), ctx.score(), ctx.reports())) // Step 4
        .exceptionally(ex -> {
            log.error("Loan approval failed for customer {}", customerId, ex);
            return ApprovalDecision.DENIED;
        });
}

Bulk Import: allOf with Result Collection

public CompletableFuture<List<User>> importUsersInParallel(List<UserDTO> dtos) {
    List<CompletableFuture<User>> futures = dtos.stream()
        .map(dto -> CompletableFuture.supplyAsync(() -> importUser(dto), ioPool))
        .collect(Collectors.toList());

    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        .thenApply(v -> futures.stream()
            .map(CompletableFuture::join)   // safe: all complete at this point
            .collect(Collectors.toList()));
}

Payment Gateway with Fallback

public CompletableFuture<PaymentResult> processPayment(Payment payment) {
    return callPrimaryGateway(payment)
        .exceptionally(ex -> {
            log.warn("Primary gateway failed, trying backup", ex);
            return null;  // signal retry
        })
        .thenCompose(result ->
            result != null
                ? CompletableFuture.completedFuture(result)
                : callBackupGateway(payment)
        )
        .handle((result, ex) -> {
            if (ex != null) return new PaymentResult("FAILED", ex.getMessage());
            return result;
        });
}

When to Use CompletableFuture

SituationUse CompletableFuture?Alternative
Multiple independent I/O calls that can run in parallelYes
Sequential async pipeline (step B depends on step A)Yes — thenCompose
Fire-and-forget background task (no result needed)Yes — runAsyncExecutorService.execute
Simple single async call, result needed immediatelyMaybe — get() blocks anywayDirect call on a thread pool
CPU-bound work, no I/OParallel streams often simpler
High-throughput reactive pipelineConsider Project Reactor / RxJavaCompletableFuture is not back-pressure-aware
Timeout on a single operationYes — get(timeout, unit)
Fan-out: call N services in parallel, collect all resultsYes — allOf
Fan-out: use the first result that arrivesYes — anyOf

CompletableFuture vs Thread + Future

ApproachBlocking?Composable?Exception handling
Future.get()YesNoChecked ExecutionException
CompletableFutureNo (callbacks)Yesexceptionally / handle
CompletableFuture.get()Yes (if needed)YesChecked / unchecked

Common Pitfalls

Forgetting to use a custom pool for I/O

// WRONG: blocks ForkJoinPool.commonPool() threads with I/O
CompletableFuture.supplyAsync(() -> httpClient.get("/api/users"));

// RIGHT: use a dedicated I/O thread pool
ExecutorService ioPool = Executors.newCachedThreadPool();
CompletableFuture.supplyAsync(() -> httpClient.get("/api/users"), ioPool);

Using get() inside async callbacks

// WRONG: deadlock risk — get() inside a ForkJoin callback can exhaust the pool
CompletableFuture<String> result = future.thenApply(user ->
    anotherFuture.get() // blocking inside callback!
);

// RIGHT: use thenCompose to chain async operations
CompletableFuture<String> result = future.thenCompose(user -> anotherFuture);

Not handling exceptions

// WRONG: unhandled exception — future fails silently
CompletableFuture.supplyAsync(() -> riskyOperation());

// RIGHT: always add error handling
CompletableFuture.supplyAsync(() -> riskyOperation())
    .exceptionally(ex -> { log.error("Failed", ex); return fallback; });

Summary

MethodPurpose
supplyAsync(Supplier)Start async computation returning a value
runAsync(Runnable)Start async computation with no return
thenApply(Function)Transform result (sync)
thenApplyAsync(Function)Transform result (async, new thread)
thenCompose(Function)Chain async operation (flatMap equivalent)
thenAccept(Consumer)Consume result, return void
thenRun(Runnable)Run after completion, ignore result
thenCombine(other, fn)Combine two independent futures
allOf(futures...)Wait for all to complete
anyOf(futures...)First to complete wins
exceptionally(fn)Fallback on exception
handle(fn)Handle both success and failure
whenComplete(fn)Side effects on completion
join()Block for result (unchecked)
get()Block for result (checked)

Next Step

New APIs: Base64, StampedLock, Nashorn JavaScript Engine →

Part of the DevOps Monk Java tutorial series: Java 8Java 11Java 17Java 21