Question 8 Β· Section 19

How to combine results of multiple CompletableFutures

There are three main ways to combine multiple CompletableFutures:

Language versions: English Russian Ukrainian

🟒 Junior Level

There are three main ways to combine multiple CompletableFutures:

1. thenCombine β€” two CFs, one result

CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Ivan");
CompletableFuture<Integer> age = CompletableFuture.supplyAsync(() -> 30);

name.thenCombine(age, (n, a) -> n + " is " + a + " years old")
    .thenAccept(System.out::println);
// Ivan is 30 years old

2. allOf β€” wait for ALL

CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> "C");

CompletableFuture<Void> all = CompletableFuture.allOf(cf1, cf2, cf3);

all.thenRun(() -> {
    System.out.println(cf1.join() + cf2.join() + cf3.join());
    // ABC
});

3. thenCompose β€” one after another

CompletableFuture<Long> userId = getUserIdAsync();

userId.thenCompose(id -> getUserAsync(id))  // first getUserId, then getUser
      .thenAccept(user -> System.out.println(user.name()));

Comparison

Method How many CFs Order Result
thenCombine 2 Parallel Combined
allOf N Parallel Void (needs join)
thenCompose 2 Sequential Second CF

🟑 Middle Level

thenCombine β€” two independent CFs

CompletableFuture<User> userFuture = userService.findByIdAsync(userId);
CompletableFuture<Order> orderFuture = orderService.findLastOrderAsync(userId);

// Both requests execute IN PARALLEL
userFuture.thenCombine(orderFuture, (user, order) ->
        new UserProfile(user, order))
    .thenAccept(profile -> sendToClient(profile));

Latency: max(userTime, orderTime) β€” completes in the time of the slowest one.

allOf β€” N independent CFs

List<Long> userIds = List.of(1L, 2L, 3L, 4L, 5L);

List<CompletableFuture<User>> futures = userIds.stream()
    .map(id -> userService.findByIdAsync(id))
    .toList();

CompletableFuture<Void> all = CompletableFuture.allOf(
    futures.toArray(new CompletableFuture[0])
);

// Get all results
all.thenApply(v ->
    futures.stream()
        .map(CompletableFuture::join)  // Does NOT block β€” all are already ready
        .toList()
);

anyOf β€” first completed

CompletableFuture<String> server1 = callServerAsync("server1");
CompletableFuture<String> server2 = callServerAsync("server2");
CompletableFuture<String> server3 = callServerAsync("server3");

// Take the first response
CompletableFuture<Object> fastest = CompletableFuture.anyOf(server1, server2, server3);
fastest.thenAccept(result -> System.out.println("Fastest: " + result));

Combining methods β€” real example

public CompletableFuture<Dashboard> getDashboard(Long userId) {
    // Parallel independent requests
    CompletableFuture<User> user = userService.findByIdAsync(userId);
    CompletableFuture<List<Order>> orders = orderService.findByUserAsync(userId);
    CompletableFuture<List<Notification>> notifications =
        notificationService.findByUserAsync(userId);

    // Combine via allOf
    return CompletableFuture.allOf(user, orders, notifications)
        .thenApply(v -> new Dashboard(
            user.join(),
            orders.join(),
            notifications.join()
        ));
}

thenCompose vs thenCombine β€” when to use which

// ❌ thenCompose β€” sequential (slower)
getUserAsync(userId)
    .thenCompose(user -> getOrderAsync(user.getId()))
    .thenAccept(order -> ...);
// Latency: getUser + getOrder

// βœ… thenCompose β€” when the SECOND depends on the FIRST
getUserAsync(userId)
    .thenCompose(user -> getOrdersByRegionAsync(user.getRegion()));
// Cannot be parallel β€” need user.region

// βœ… thenCombine β€” when INDEPENDENT (faster)
CompletableFuture<User> user = getUserAsync(userId);
CompletableFuture<Order> order = getOrderAsync(userId);
user.thenCombine(order, UserProfile::new);
// Latency: max(getUser, getOrder)

πŸ”΄ Senior Level

Pattern: Collect All Results

public static <T> CompletableFuture<List<T>> allAsList(
        CompletableFuture<T>... futures) {
    return CompletableFuture.allOf(futures)
        .thenApply(v -> Arrays.stream(futures)
            .map(CompletableFuture::join)
            .toList());
}

// Usage
CompletableFuture<List<User>> allUsers = allAsList(
    findByIdAsync(1L),
    findByIdAsync(2L),
    findByIdAsync(3L)
);

Pattern: First Successful (with fallback)

public CompletableFuture<String> getDataWithFallback() {
    CompletableFuture<String> cache = cacheService.getAsync(key);
    CompletableFuture<String> db = databaseService.getAsync(key);

    return cache.applyToEither(db, Function.identity())
        .exceptionally(ex -> {
            // Both failed β€” use fallback
            return defaultService.get(key);
        });
}

Pattern: Timeout + Fallback

public CompletableFuture<User> getUserWithTimeout(Long id) {
    return userService.findByIdAsync(id)
        .orTimeout(2, TimeUnit.SECONDS)
        .exceptionally(ex -> {
            if (ex instanceof TimeoutException) {
                log.warn("UserService timeout for id={}", id);
                return cacheService.getCached(id); // fallback
            }
            throw new CompletionException(ex);
        });
}

Pattern: Batch with concurrency limit

// BAD: supplyAsync without executor β†’ ForkJoinPool.commonPool()
// semaphore.acquireUninterruptibly() blocks commonPool threads β†’ thread starvation!

// GOOD: pass a dedicated executor
ExecutorService ioExecutor = Executors.newFixedThreadPool(10);

public CompletableFuture<List<User>> getUsersBatch(
        List<Long> ids, int maxParallel) {
    Semaphore semaphore = new Semaphore(maxParallel);

    List<CompletableFuture<User>> futures = ids.stream()
        .map(id -> CompletableFuture.supplyAsync(() -> {
            semaphore.acquireUninterruptibly();
            try {
                return userRepository.findById(id);
            } finally {
                semaphore.release();
            }
        }, ioExecutor))  // ← dedicated executor, NOT commonPool!
        .toList();

    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        .thenApply(v -> futures.stream()
            .map(CompletableFuture::join)
            .toList());
}

thenCombine under the hood

// thenCombine uses BiRelay β€” waits for both CFs
public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(null, other, fn);
}

// If both CFs are already completed β€” executes synchronously
// If not β€” subscribes to completion of both

Error handling when combining

// Problem: if one of the CFs fails β€” the entire chain fails
CompletableFuture.allOf(cf1, cf2, cf3)
    .exceptionally(ex -> {
        // How to know which one failed?
        // Need to check each one
        if (cf1.isCompletedExceptionally()) { /* ... */ }
        if (cf2.isCompletedExceptionally()) { /* ... */ }
        return null;
    });

// βœ… Solution: handleErrors on each CF
CompletableFuture<User> user = userService.findByIdAsync(id)
    .handle((result, ex) -> ex != null ? defaultUser : result);

CompletableFuture<Order> order = orderService.findAsync(id)
    .handle((result, ex) -> ex != null ? emptyList() : result);

// Now allOf won't fail β€” each CF handled its own error

Production Experience

@Service
public class DashboardService {

    private final ExecutorService dashboardExecutor =
        Executors.newFixedThreadPool(10, new ThreadFactoryBuilder()
            .setNameFormat("dashboard-%d").build());

    public CompletableFuture<DashboardDto> getDashboard(Long userId) {
        long start = System.nanoTime();

        CompletableFuture<UserDto> user = CompletableFuture
            .supplyAsync(() -> userClient.getUser(userId), dashboardExecutor)
            .orTimeout(3, TimeUnit.SECONDS);

        CompletableFuture<List<OrderDto>> orders = CompletableFuture
            .supplyAsync(() -> orderClient.getOrders(userId), dashboardExecutor)
            .orTimeout(3, TimeUnit.SECONDS);

        CompletableFuture<StatsDto> stats = CompletableFuture
            .supplyAsync(() -> statsClient.getStats(userId), dashboardExecutor)
            .orTimeout(3, TimeUnit.SECONDS);

        return CompletableFuture.allOf(user, orders, stats)
            .thenApply(v -> new DashboardDto(
                user.join(),
                orders.join(),
                stats.join()
            ))
            .whenComplete((result, ex) -> {
                long duration = System.nanoTime() - start;
                metrics.histogram("dashboard.latency", duration);
                if (ex != null) {
                    metrics.counter("dashboard.errors").increment();
                }
            });
    }
}

Best Practices

// βœ… Independent CFs β€” thenCombine / allOf
cf1.thenCombine(cf2, combiner);
CompletableFuture.allOf(cf1, cf2, cf3);

// βœ… Dependent CFs β€” thenCompose
cf1.thenCompose(result -> cf2(result));

// βœ… Error handling on each CF
cf.handle((r, ex) -> ex != null ? fallback : r);

// βœ… Timeout for each CF
cf.orTimeout(5, TimeUnit.SECONDS);

// βœ… Custom Executor for isolation
CompletableFuture.supplyAsync(task, dedicatedExecutor);

// ❌ Blocking join() in async chains
// ❌ allOf without error handling
// ❌ Unlimited parallelism (use Semaphore for batches)

See also: [[9. What does allOf() method do and when to use it]], [[10. What does anyOf() method do and when is it useful]]


🎯 Interview Cheat Sheet

Must know:

  • thenCombine β€” two independent CFs, parallel execution, latency = max(cf1, cf2)
  • allOf β€” N CFs, returns CompletableFuture, need join() for results
  • anyOf β€” first completed CF, returns CompletableFuture, need cast
  • thenCompose β€” dependent CFs, sequential execution, latency = cf1 + cf2
  • applyToEither β€” analog of anyOf for two CFs

Frequent follow-up questions:

  • Does allOf return results? β€” No, Void. Need join() on each original CF
  • thenCombine vs thenCompose β€” when which? β€” thenCombine for independent (parallel), thenCompose for dependent (sequential)
  • What if one of the CFs in allOf fails? β€” allOf completes with an error. Need handle on each CF
  • Does anyOf cancel the remaining CFs? β€” No, the rest continue executing

Red flags (DO NOT say):

  • β€œallOf returns a list of results” β€” it returns Void, results via join()
  • β€œanyOf cancels the remaining tasks” β€” no, need manual cancel()
  • β€œthenCombine executes sequentially” β€” it is parallel, latency = max(cf1, cf2)

Related topics:

  • [[9. What does allOf() method do and when to use it]]
  • [[10. What does anyOf() method do and when is it useful]]
  • [[4. What is the difference between thenApply() and thenCompose()]]
  • [[22. What is the difference between thenCombine() and thenCompose()]]