Вопрос 8 · Раздел 19

Как комбинировать результаты нескольких CompletableFuture

Есть три основных способа объединить несколько CompletableFuture:

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

Есть три основных способа объединить несколько CompletableFuture:

1. thenCombine — два CF, один результат

CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Ivan");
CompletableFuture<Integer> age = CompletableFuture.supplyAsync(() -> 30);

name.thenCombine(age, (n, a) -> n + " is " + a + " years old")
    .thenAccept(System.out::println);
// Ivan is 30 years old

2. allOf — ждать ВСЕ

CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> "C");

CompletableFuture<Void> all = CompletableFuture.allOf(cf1, cf2, cf3);

all.thenRun(() -> {
    System.out.println(cf1.join() + cf2.join() + cf3.join());
    // ABC
});

3. thenCompose — один за другим

CompletableFuture<Long> userId = getUserIdAsync();

userId.thenCompose(id -> getUserAsync(id))  // сначала getUserId, потом getUser
      .thenAccept(user -> System.out.println(user.name()));

Сравнение

Метод Сколько CF Порядок Результат
thenCombine 2 Параллельно Комбинированный
allOf N Параллельно Void (нужен join)
thenCompose 2 Последовательно Второй CF

🟡 Middle Level

thenCombine — два независимых CF

CompletableFuture<User> userFuture = userService.findByIdAsync(userId);
CompletableFuture<Order> orderFuture = orderService.findLastOrderAsync(userId);

// Оба запроса выполняются ПАРАЛЛЕЛЬНО
userFuture.thenCombine(orderFuture, (user, order) ->
        new UserProfile(user, order))
    .thenAccept(profile -> sendToClient(profile));

Latency: max(userTime, orderTime) — выполняется за время самого медленного.

allOf — N независимых CF

List<Long> userIds = List.of(1L, 2L, 3L, 4L, 5L);

List<CompletableFuture<User>> futures = userIds.stream()
    .map(id -> userService.findByIdAsync(id))
    .toList();

CompletableFuture<Void> all = CompletableFuture.allOf(
    futures.toArray(new CompletableFuture[0])
);

// Получить все результаты
all.thenApply(v ->
    futures.stream()
        .map(CompletableFuture::join)  // НЕ блокирует — все уже готовы
        .toList()
);

anyOf — первый завершённый

CompletableFuture<String> server1 = callServerAsync("server1");
CompletableFuture<String> server2 = callServerAsync("server2");
CompletableFuture<String> server3 = callServerAsync("server3");

// Берём первый ответивший
CompletableFuture<Object> fastest = CompletableFuture.anyOf(server1, server2, server3);
fastest.thenAccept(result -> System.out.println("Fastest: " + result));

Комбинация методов — реальный пример

public CompletableFuture<Dashboard> getDashboard(Long userId) {
    // Параллельные независимые запросы
    CompletableFuture<User> user = userService.findByIdAsync(userId);
    CompletableFuture<List<Order>> orders = orderService.findByUserAsync(userId);
    CompletableFuture<List<Notification>> notifications =
        notificationService.findByUserAsync(userId);

    // Комбинируем через allOf
    return CompletableFuture.allOf(user, orders, notifications)
        .thenApply(v -> new Dashboard(
            user.join(),
            orders.join(),
            notifications.join()
        ));
}

thenCompose vs thenCombine — когда что

// ❌ thenCompose — последовательно (медленнее)
getUserAsync(userId)
    .thenCompose(user -> getOrderAsync(user.getId()))
    .thenAccept(order -> ...);
// Latency: getUser + getOrder

// ✅ thenCompose — когда ВТОРОЙ зависит от ПЕРВОГО
getUserAsync(userId)
    .thenCompose(user -> getOrdersByRegionAsync(user.getRegion()));
// Нельзя сделать параллельно — нужен user.region

// ✅ thenCombine — когда НЕЗАВИСИМЫ (быстрее)
CompletableFuture<User> user = getUserAsync(userId);
CompletableFuture<Order> order = getOrderAsync(userId);
user.thenCombine(order, UserProfile::new);
// Latency: max(getUser, getOrder)

🔴 Senior Level

Паттерн: Collect All Results

public static <T> CompletableFuture<List<T>> allAsList(
        CompletableFuture<T>... futures) {
    return CompletableFuture.allOf(futures)
        .thenApply(v -> Arrays.stream(futures)
            .map(CompletableFuture::join)
            .toList());
}

// Использование
CompletableFuture<List<User>> allUsers = allAsList(
    findByIdAsync(1L),
    findByIdAsync(2L),
    findByIdAsync(3L)
);

Паттерн: First Successful (с fallback)

public CompletableFuture<String> getDataWithFallback() {
    CompletableFuture<String> cache = cacheService.getAsync(key);
    CompletableFuture<String> db = databaseService.getAsync(key);

    return cache.applyToEither(db, Function.identity())
        .exceptionally(ex -> {
            // Оба упали — используем fallback
            return defaultService.get(key);
        });
}

Паттерн: Timeout + Fallback

public CompletableFuture<User> getUserWithTimeout(Long id) {
    return userService.findByIdAsync(id)
        .orTimeout(2, TimeUnit.SECONDS)
        .exceptionally(ex -> {
            if (ex instanceof TimeoutException) {
                log.warn("UserService timeout for id={}", id);
                return cacheService.getCached(id); // fallback
            }
            throw new CompletionException(ex);
        });
}

Паттерн: Batch с ограничением параллелизма

// ПЛОХО: supplyAsync без executor → ForkJoinPool.commonPool()
// semaphore.acquireUninterruptibly() блокирует потоки commonPool → thread starvation!

// ХОРОШО: передаём dedicated executor
ExecutorService ioExecutor = Executors.newFixedThreadPool(10);

public CompletableFuture<List<User>> getUsersBatch(
        List<Long> ids, int maxParallel) {
    Semaphore semaphore = new Semaphore(maxParallel);

    List<CompletableFuture<User>> futures = ids.stream()
        .map(id -> CompletableFuture.supplyAsync(() -> {
            semaphore.acquireUninterruptibly();
            try {
                return userRepository.findById(id);
            } finally {
                semaphore.release();
            }
        }, ioExecutor))  // ← dedicated executor, НЕ commonPool!
        .toList();

    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        .thenApply(v -> futures.stream()
            .map(CompletableFuture::join)
            .toList());
}

thenCombine под капотом

// thenCombine использует BiRelay — ожидает оба CF
public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(null, other, fn);
}

// Если оба CF уже завершены — выполняется синхронно
// Если нет — подписывается на completion обоих

Обработка ошибок при комбинировании

// Проблема: если один из CF падает — весь chain падает
CompletableFuture.allOf(cf1, cf2, cf3)
    .exceptionally(ex -> {
        // Как узнать, какой именно упал?
        // Нужно проверять каждый
        if (cf1.isCompletedExceptionally()) { /* ... */ }
        if (cf2.isCompletedExceptionally()) { /* ... */ }
        return null;
    });

// ✅ Решение: handleErrors на каждом CF
CompletableFuture<User> user = userService.findByIdAsync(id)
    .handle((result, ex) -> ex != null ? defaultUser : result);

CompletableFuture<Order> order = orderService.findAsync(id)
    .handle((result, ex) -> ex != null ? emptyList() : result);

// Теперь allOf не упадет — каждый CF обработал свою ошибку

Production Experience

@Service
public class DashboardService {

    private final ExecutorService dashboardExecutor =
        Executors.newFixedThreadPool(10, new ThreadFactoryBuilder()
            .setNameFormat("dashboard-%d").build());

    public CompletableFuture<DashboardDto> getDashboard(Long userId) {
        long start = System.nanoTime();

        CompletableFuture<UserDto> user = CompletableFuture
            .supplyAsync(() -> userClient.getUser(userId), dashboardExecutor)
            .orTimeout(3, TimeUnit.SECONDS);

        CompletableFuture<List<OrderDto>> orders = CompletableFuture
            .supplyAsync(() -> orderClient.getOrders(userId), dashboardExecutor)
            .orTimeout(3, TimeUnit.SECONDS);

        CompletableFuture<StatsDto> stats = CompletableFuture
            .supplyAsync(() -> statsClient.getStats(userId), dashboardExecutor)
            .orTimeout(3, TimeUnit.SECONDS);

        return CompletableFuture.allOf(user, orders, stats)
            .thenApply(v -> new DashboardDto(
                user.join(),
                orders.join(),
                stats.join()
            ))
            .whenComplete((result, ex) -> {
                long duration = System.nanoTime() - start;
                metrics.histogram("dashboard.latency", duration);
                if (ex != null) {
                    metrics.counter("dashboard.errors").increment();
                }
            });
    }
}

Best Practices

// ✅ Независимые CF — thenCombine / allOf
cf1.thenCombine(cf2, combiner);
CompletableFuture.allOf(cf1, cf2, cf3);

// ✅ Зависимые CF — thenCompose
cf1.thenCompose(result -> cf2(result));

// ✅ Обработка ошибок на каждом CF
cf.handle((r, ex) -> ex != null ? fallback : r);

// ✅ Timeout для каждого CF
cf.orTimeout(5, TimeUnit.SECONDS);

// ✅ Свой Executor для изоляции
CompletableFuture.supplyAsync(task, dedicatedExecutor);

// ❌ Блокирующие join() в async цепочке
// ❌ allOf без обработки ошибок
// ❌ Безлимитный параллелизм (Semaphore для batch)

См. также: [[9. Что делает метод allOf()]], [[10. Что делает метод anyOf()]]


🎯 Шпаргалка для интервью

Обязательно знать:

  • thenCombine — два независимых CF, параллельное выполнение, latency = max(cf1, cf2)
  • allOf — N CF, возвращает CompletableFuture, нужен join() для результатов
  • anyOf — первый завершённый CF, возвращает CompletableFuture, нужен cast
  • thenCompose — зависимые CF, последовательное выполнение, latency = cf1 + cf2
  • applyToEither — аналог anyOf для двух CF

Частые уточняющие вопросы:

  • allOf возвращает результаты? — Нет, Void. Нужно join() на каждом исходном CF
  • thenCombine vs thenCompose — когда что? — thenCombine для независимых (параллельно), thenCompose для зависимых (последовательно)
  • Что если один из CF в allOf упадёт? — allOf завершится с ошибкой. Нужно handle на каждом CF
  • anyOf отменяет остальные CF? — Нет, остальные продолжают выполняться

Красные флаги (НЕ говорить):

  • «allOf возвращает список результатов» — он возвращает Void, результаты через join()
  • «anyOf отменяет остальные задачи» — нет, нужно вручную cancel()
  • «thenCombine выполняет последовательно» — он параллельный, latency = max(cf1, cf2)

Связанные темы:

  • [[9. Что делает метод allOf() и когда его использовать]]
  • [[10. Что делает метод anyOf() и в каких случаях он полезен]]
  • [[4. В чём разница между thenApply() и thenCompose()]]
  • [[22. В чём разница между thenCombine() и thenCompose()]]