CompletableFuture: Async Pipelines and Non-Blocking Composition
The Problem with Future<T>
Fetching a product page on an e-commerce site requires at least three data sources: product details, live pricing, and inventory availability. Done sequentially, a 200 ms call to each service adds up to 600 ms. Done in parallel with a properly composed CompletableFuture pipeline, the wall-clock time drops to roughly the slowest of the three — around 200 ms. This article shows exactly how to build that pipeline and everything else you need to write robust async code in Java 8.
Java 5 introduced Future<T> for async computation. The problem: you can only get the result by calling get(), which blocks:
// Java 7
ExecutorService pool = Executors.newFixedThreadPool(4);
Future<User> future = pool.submit(() -> fetchUser(userId));
// ...
User user = future.get(); // BLOCKS until done
If you have multiple async operations that depend on each other, you end up with a chain of blocking get() calls — effectively synchronous execution with extra overhead.
CompletableFuture solves this with a fluent, callback-based API: instead of blocking to get a result, you register what should happen when the result is ready.
Creating CompletableFutures
supplyAsync — async computation returning a value
// Runs in ForkJoinPool.commonPool() by default
CompletableFuture<User> future = CompletableFuture.supplyAsync(() -> fetchUser(42L));
// With custom executor
ExecutorService pool = Executors.newFixedThreadPool(10);
CompletableFuture<User> future2 = CompletableFuture.supplyAsync(
() -> fetchUser(42L), pool);
runAsync — async computation with no return value
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> sendEmail(user));
Completed futures
// Already-completed future — useful for testing and default values
CompletableFuture<String> done = CompletableFuture.completedFuture("Hello");
// Failed future
CompletableFuture<String> failed = new CompletableFuture<>();
failed.completeExceptionally(new RuntimeException("Something went wrong"));
Transforming Results
thenApply — transform the result (synchronous, same thread)
CompletableFuture<String> name = CompletableFuture
.supplyAsync(() -> fetchUser(42L))
.thenApply(User::getName);
thenApplyAsync — transform on a different thread
CompletableFuture<String> name = CompletableFuture
.supplyAsync(() -> fetchUser(42L))
.thenApplyAsync(User::getName, pool); // runs getName on pool thread
The Async suffix variants submit the callback to a thread pool (common pool or custom) rather than running on the completing thread. Use Async when the callback itself is expensive.
thenCompose — chain async operations that return CompletableFuture
// fetchUser returns CompletableFuture<User>
// fetchOrders(user) returns CompletableFuture<List<Order>>
// WRONG: thenApply gives CompletableFuture<CompletableFuture<List<Order>>>
CompletableFuture<CompletableFuture<List<Order>>> nested =
CompletableFuture.supplyAsync(() -> fetchUser(42L))
.thenApply(user -> fetchOrders(user.getId())); // wrong!
// RIGHT: thenCompose flattens
CompletableFuture<List<Order>> orders =
CompletableFuture.supplyAsync(() -> fetchUser(42L))
.thenCompose(user -> fetchOrders(user.getId())); // correct
thenCompose is to CompletableFuture what flatMap is to Optional and Stream.
Consuming Results
thenAccept — consume the result without returning a value
CompletableFuture.supplyAsync(() -> fetchUser(42L))
.thenAccept(user -> System.out.println("Fetched: " + user.getName()));
thenRun — run a Runnable after completion (ignores the result)
CompletableFuture.supplyAsync(() -> processData())
.thenRun(() -> System.out.println("Processing complete"));
Combining Multiple Futures
thenCombine — combine two independent futures
// Fetch user and orders in parallel, combine when both are done
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> fetchUser(id));
CompletableFuture<List<Order>> orderFuture = CompletableFuture.supplyAsync(() -> fetchOrders(id));
CompletableFuture<UserWithOrders> combined = userFuture
.thenCombine(orderFuture, (user, orders) -> new UserWithOrders(user, orders));
allOf — wait for all futures to complete
List<CompletableFuture<User>> futures = userIds.stream()
.map(id -> CompletableFuture.supplyAsync(() -> fetchUser(id)))
.collect(Collectors.toList());
// Wait for all to complete
CompletableFuture<Void> all = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
// Collect results after all complete
CompletableFuture<List<User>> allUsers = all.thenApply(v ->
futures.stream()
.map(CompletableFuture::join) // join() doesn't throw checked exception
.collect(Collectors.toList())
);
anyOf — return the result of the first future to complete
CompletableFuture<Object> fastest = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> fetchFromPrimaryDB()),
CompletableFuture.supplyAsync(() -> fetchFromReplicaDB()),
CompletableFuture.supplyAsync(() -> fetchFromCache())
);
// Cast to the expected type
String result = (String) fastest.get();
Error Handling
exceptionally — provide a fallback on exception
CompletableFuture<User> safe = CompletableFuture
.supplyAsync(() -> fetchUser(id))
.exceptionally(ex -> {
log.error("Failed to fetch user {}: {}", id, ex.getMessage());
return User.anonymous();
});
exceptionally only runs if the previous stage completed exceptionally. If successful, it passes the result through unchanged.
handle — handle both success and failure
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> fetchUser(id))
.handle((user, ex) -> {
if (ex != null) {
log.error("Error: {}", ex.getMessage());
return "fallback-user";
}
return user.getName();
});
handle always runs — regardless of success or failure. The ex parameter is null on success and non-null on failure.
whenComplete — side effects on completion (doesn’t transform the result)
CompletableFuture<User> user = CompletableFuture
.supplyAsync(() -> fetchUser(id))
.whenComplete((result, ex) -> {
if (ex != null) metrics.recordFailure();
else metrics.recordSuccess();
});
// The future's value/exception is still propagated unchanged
Blocking to Get the Result
When you need to block and get the final result (e.g., at the edge of an async system):
// get() — throws checked ExecutionException, InterruptedException
try {
User user = future.get();
User userWithTimeout = future.get(5, TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
// handle
}
// join() — throws unchecked CompletionException (preferred in lambdas)
User user = future.join();
// getNow(fallback) — returns fallback if not yet complete
User user = future.getNow(User.anonymous());
Full Pipeline Example
Fetch a user, their orders, and the shipping status of each order — all in parallel where possible:
CompletableFuture<Report> report = CompletableFuture
.supplyAsync(() -> fetchUser(userId), ioPool) // fetch user
.thenCompose(user ->
CompletableFuture.supplyAsync(() -> fetchOrders(user.getId()), ioPool)
.thenApply(orders -> new UserWithOrders(user, orders))
)
.thenCompose(uwo -> {
// Fetch shipping status for each order in parallel
List<CompletableFuture<OrderStatus>> statusFutures = uwo.getOrders().stream()
.map(order -> CompletableFuture.supplyAsync(
() -> fetchShippingStatus(order.getId()), ioPool))
.collect(Collectors.toList());
return CompletableFuture.allOf(statusFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List<OrderStatus> statuses = statusFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
return new Report(uwo.getUser(), uwo.getOrders(), statuses);
});
})
.exceptionally(ex -> {
log.error("Report generation failed", ex);
return Report.empty();
});
// Blocking get at the HTTP handler boundary
Report r = report.get(10, TimeUnit.SECONDS);
Real-World Async Pipeline: Product Page Load
This is the canonical real-world CompletableFuture problem: a product detail page needs data from three independent downstream services. Calling them sequentially wastes time — each call sits idle waiting for the network while the other two services are available. The right approach is to fire all three concurrently and combine the results when all three are ready.
// Domain types
record ProductDetails(String id, String name, String description) {}
record PricingInfo(String productId, BigDecimal price, BigDecimal discountedPrice) {}
record InventoryStatus(String productId, int quantityAvailable, boolean inStock) {}
record ProductPage(ProductDetails details, PricingInfo pricing, InventoryStatus inventory) {}
public class ProductPageService {
// Dedicated I/O pool — never use commonPool for blocking I/O
private static final ExecutorService IO_POOL =
Executors.newFixedThreadPool(20, r -> {
Thread t = new Thread(r, "io-pool");
t.setDaemon(true);
return t;
});
private final ProductClient productClient;
private final PricingClient pricingClient;
private final InventoryClient inventoryClient;
/**
* Builds a complete product page by fetching all three data sources in parallel.
* Total latency ≈ max(product, pricing, inventory) rather than their sum.
*/
public CompletableFuture<ProductPage> buildProductPage(String productId) {
// Fire all three requests simultaneously
CompletableFuture<ProductDetails> detailsFuture =
CompletableFuture.supplyAsync(
() -> productClient.fetchDetails(productId), IO_POOL);
CompletableFuture<PricingInfo> pricingFuture =
CompletableFuture.supplyAsync(
() -> pricingClient.getPrice(productId), IO_POOL);
CompletableFuture<InventoryStatus> inventoryFuture =
CompletableFuture.supplyAsync(
() -> inventoryClient.checkStock(productId), IO_POOL);
// Combine all three when they are all complete
// thenCombine joins two futures; chain twice to join all three
return detailsFuture
.thenCombine(pricingFuture,
(details, pricing) -> new Object[]{details, pricing}) // intermediate pair
.thenCombine(inventoryFuture,
(pair, inventory) -> new ProductPage(
(ProductDetails) pair[0],
(PricingInfo) pair[1],
inventory))
.exceptionally(ex -> {
log.error("Failed to build product page for {}: {}", productId, ex.getMessage());
return ProductPage.unavailable(productId);
});
}
}
A cleaner alternative using allOf when you have more than two futures:
public CompletableFuture<ProductPage> buildProductPageAllOf(String productId) {
CompletableFuture<ProductDetails> detailsFuture = supplyAsync(() -> productClient.fetchDetails(productId), IO_POOL);
CompletableFuture<PricingInfo> pricingFuture = supplyAsync(() -> pricingClient.getPrice(productId), IO_POOL);
CompletableFuture<InventoryStatus> inventoryFuture = supplyAsync(() -> inventoryClient.checkStock(productId), IO_POOL);
// allOf returns CompletableFuture<Void> — we then join individual futures (safe: all done)
return CompletableFuture
.allOf(detailsFuture, pricingFuture, inventoryFuture)
.thenApply(v -> new ProductPage(
detailsFuture.join(),
pricingFuture.join(),
inventoryFuture.join()))
.exceptionally(ex -> {
log.error("Product page load failed for {}", productId, ex);
return ProductPage.unavailable(productId);
});
}
Why join() is safe inside thenApply here: allOf completes only when all supplied futures have completed. By the time thenApply runs, detailsFuture.join() is guaranteed to return immediately — it will never block.
Observed latency improvement:
| Approach | 3 × 200 ms services |
|---|---|
| Sequential calls | ~600 ms |
Parallel with allOf | ~205 ms |
| Saving | ~66% |
Real-World Production Examples
E-Commerce Checkout: Fan-Out Pattern
Fetch product details, inventory, and pricing in parallel — all three independent, combined when done:
ExecutorService ioPool = Executors.newCachedThreadPool();
public CompletableFuture<CheckoutData> buildCheckout(String productId) {
// Launch all three requests at the same time
CompletableFuture<ProductDetails> productFuture =
CompletableFuture.supplyAsync(() -> fetchProductDetails(productId), ioPool);
CompletableFuture<Integer> inventoryFuture =
CompletableFuture.supplyAsync(() -> checkInventory(productId), ioPool);
CompletableFuture<PricingInfo> priceFuture =
CompletableFuture.supplyAsync(() -> calculatePrice(productId), ioPool);
// Combine: wait for product + inventory, then combine with pricing
return productFuture
.thenCombine(inventoryFuture, ImmutablePair::new)
.thenCombine(priceFuture,
(pair, pricing) -> new CheckoutData(pair.getLeft(), pair.getRight(), pricing))
.exceptionally(ex -> {
log.error("Checkout build failed for product {}", productId, ex);
return CheckoutData.unavailable();
});
}
// Total time ≈ max(product, inventory, pricing) instead of their sum
Credit Approval: Fan-In (Sequential Dependent) Pipeline
Each step depends on the previous one — use thenCompose to chain without blocking:
public CompletableFuture<ApprovalDecision> approveLoan(Long customerId) {
return fetchCustomerDetails(customerId) // Step 1
.thenCompose(customer ->
getCreditScore(customer.getId()) // Step 2: needs customer
.thenApply(score -> new CustomerScore(customer, score))
)
.thenCompose(cs ->
getBankReports(cs.customer().getId()) // Step 3: needs customer
.thenApply(reports -> new LoanContext(cs.customer(), cs.score(), reports))
)
.thenApply(ctx -> makeDecision(ctx.customer(), ctx.score(), ctx.reports())) // Step 4
.exceptionally(ex -> {
log.error("Loan approval failed for customer {}", customerId, ex);
return ApprovalDecision.DENIED;
});
}
Bulk Import: allOf with Result Collection
public CompletableFuture<List<User>> importUsersInParallel(List<UserDTO> dtos) {
List<CompletableFuture<User>> futures = dtos.stream()
.map(dto -> CompletableFuture.supplyAsync(() -> importUser(dto), ioPool))
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join) // safe: all complete at this point
.collect(Collectors.toList()));
}
Payment Gateway with Fallback
public CompletableFuture<PaymentResult> processPayment(Payment payment) {
return callPrimaryGateway(payment)
.exceptionally(ex -> {
log.warn("Primary gateway failed, trying backup", ex);
return null; // signal retry
})
.thenCompose(result ->
result != null
? CompletableFuture.completedFuture(result)
: callBackupGateway(payment)
)
.handle((result, ex) -> {
if (ex != null) return new PaymentResult("FAILED", ex.getMessage());
return result;
});
}
When to Use CompletableFuture
| Situation | Use CompletableFuture? | Alternative |
|---|---|---|
| Multiple independent I/O calls that can run in parallel | Yes | — |
| Sequential async pipeline (step B depends on step A) | Yes — thenCompose | — |
| Fire-and-forget background task (no result needed) | Yes — runAsync | ExecutorService.execute |
| Simple single async call, result needed immediately | Maybe — get() blocks anyway | Direct call on a thread pool |
| CPU-bound work, no I/O | Parallel streams often simpler | — |
| High-throughput reactive pipeline | Consider Project Reactor / RxJava | CompletableFuture is not back-pressure-aware |
| Timeout on a single operation | Yes — get(timeout, unit) | — |
| Fan-out: call N services in parallel, collect all results | Yes — allOf | — |
| Fan-out: use the first result that arrives | Yes — anyOf | — |
CompletableFuture vs Thread + Future
| Approach | Blocking? | Composable? | Exception handling |
|---|---|---|---|
Future.get() | Yes | No | Checked ExecutionException |
CompletableFuture | No (callbacks) | Yes | exceptionally / handle |
CompletableFuture.get() | Yes (if needed) | Yes | Checked / unchecked |
Common Pitfalls
Forgetting to use a custom pool for I/O
// WRONG: blocks ForkJoinPool.commonPool() threads with I/O
CompletableFuture.supplyAsync(() -> httpClient.get("/api/users"));
// RIGHT: use a dedicated I/O thread pool
ExecutorService ioPool = Executors.newCachedThreadPool();
CompletableFuture.supplyAsync(() -> httpClient.get("/api/users"), ioPool);
Using get() inside async callbacks
// WRONG: deadlock risk — get() inside a ForkJoin callback can exhaust the pool
CompletableFuture<String> result = future.thenApply(user ->
anotherFuture.get() // blocking inside callback!
);
// RIGHT: use thenCompose to chain async operations
CompletableFuture<String> result = future.thenCompose(user -> anotherFuture);
Not handling exceptions
// WRONG: unhandled exception — future fails silently
CompletableFuture.supplyAsync(() -> riskyOperation());
// RIGHT: always add error handling
CompletableFuture.supplyAsync(() -> riskyOperation())
.exceptionally(ex -> { log.error("Failed", ex); return fallback; });
Summary
| Method | Purpose |
|---|---|
supplyAsync(Supplier) | Start async computation returning a value |
runAsync(Runnable) | Start async computation with no return |
thenApply(Function) | Transform result (sync) |
thenApplyAsync(Function) | Transform result (async, new thread) |
thenCompose(Function) | Chain async operation (flatMap equivalent) |
thenAccept(Consumer) | Consume result, return void |
thenRun(Runnable) | Run after completion, ignore result |
thenCombine(other, fn) | Combine two independent futures |
allOf(futures...) | Wait for all to complete |
anyOf(futures...) | First to complete wins |
exceptionally(fn) | Fallback on exception |
handle(fn) | Handle both success and failure |
whenComplete(fn) | Side effects on completion |
join() | Block for result (unchecked) |
get() | Block for result (checked) |
Next Step
New APIs: Base64, StampedLock, Nashorn JavaScript Engine →
Part of the DevOps Monk Java tutorial series: Java 8 → Java 11 → Java 17 → Java 21