Питання 8 · Розділ 19

Як комбінувати результати декількох CompletableFuture

Є три основні способи об'єднати декілька CompletableFuture:

Мовні версії: English Russian Ukrainian

🟢 Junior Level

Є три основні способи об’єднати декілька CompletableFuture:

1. thenCombine — два CF, один результат

CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Ivan");
CompletableFuture<Integer> age = CompletableFuture.supplyAsync(() -> 30);

name.thenCombine(age, (n, a) -> n + " is " + a + " years old")
    .thenAccept(System.out::println);
// Ivan is 30 years old

2. allOf — чекати УСІ

CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> "C");

CompletableFuture<Void> all = CompletableFuture.allOf(cf1, cf2, cf3);

all.thenRun(() -> {
    System.out.println(cf1.join() + cf2.join() + cf3.join());
    // ABC
});

3. thenCompose — один за одним

CompletableFuture<Long> userId = getUserIdAsync();

userId.thenCompose(id -> getUserAsync(id))  // спочатку getUserId, потім getUser
      .thenAccept(user -> System.out.println(user.name()));

Порівняння

Метод Скільки CF Порядок Результат
thenCombine 2 Паралельно Комбінований
allOf N Паралельно Void (потрібен join)
thenCompose 2 Послідовно Другий CF

🟡 Middle Level

thenCombine — два незалежних CF

CompletableFuture<User> userFuture = userService.findByIdAsync(userId);
CompletableFuture<Order> orderFuture = orderService.findLastOrderAsync(userId);

// Обидва запити виконуються ПАРАЛЕЛЬНО
userFuture.thenCombine(orderFuture, (user, order) ->
        new UserProfile(user, order))
    .thenAccept(profile -> sendToClient(profile));

Latency: max(userTime, orderTime) — виконується за час найповільнішого.

allOf — N незалежних CF

List<Long> userIds = List.of(1L, 2L, 3L, 4L, 5L);

List<CompletableFuture<User>> futures = userIds.stream()
    .map(id -> userService.findByIdAsync(id))
    .toList();

CompletableFuture<Void> all = CompletableFuture.allOf(
    futures.toArray(new CompletableFuture[0])
);

// Отримати всі результати
all.thenApply(v ->
    futures.stream()
        .map(CompletableFuture::join)  // НЕ блокує — всі вже готові
        .toList()
);

anyOf — перший завершений

CompletableFuture<String> server1 = callServerAsync("server1");
CompletableFuture<String> server2 = callServerAsync("server2");
CompletableFuture<String> server3 = callServerAsync("server3");

// Беремо перший, що відповів
CompletableFuture<Object> fastest = CompletableFuture.anyOf(server1, server2, server3);
fastest.thenAccept(result -> System.out.println("Fastest: " + result));

Комбінація методів — реальний приклад

public CompletableFuture<Dashboard> getDashboard(Long userId) {
    // Паралельні незалежні запити
    CompletableFuture<User> user = userService.findByIdAsync(userId);
    CompletableFuture<List<Order>> orders = orderService.findByUserAsync(userId);
    CompletableFuture<List<Notification>> notifications =
        notificationService.findByUserAsync(userId);

    // Комбінуємо через allOf
    return CompletableFuture.allOf(user, orders, notifications)
        .thenApply(v -> new Dashboard(
            user.join(),
            orders.join(),
            notifications.join()
        ));
}

thenCompose vs thenCombine — коли що

// ❌ thenCompose — послідовно (повільніше)
getUserAsync(userId)
    .thenCompose(user -> getOrderAsync(user.getId()))
    .thenAccept(order -> ...);
// Latency: getUser + getOrder

// ✅ thenCompose — коли ДРУГИЙ залежить від ПЕРШОГО
getUserAsync(userId)
    .thenCompose(user -> getOrdersByRegionAsync(user.getRegion()));
// Не можна зробити паралельно — потрібен user.region

// ✅ thenCombine — коли НЕЗАЛЕЖНІ (швидше)
CompletableFuture<User> user = getUserAsync(userId);
CompletableFuture<Order> order = getOrderAsync(userId);
user.thenCombine(order, UserProfile::new);
// Latency: max(getUser, getOrder)

🔴 Senior Level

Патерн: Collect All Results

public static <T> CompletableFuture<List<T>> allAsList(
        CompletableFuture<T>... futures) {
    return CompletableFuture.allOf(futures)
        .thenApply(v -> Arrays.stream(futures)
            .map(CompletableFuture::join)
            .toList());
}

// Використання
CompletableFuture<List<User>> allUsers = allAsList(
    findByIdAsync(1L),
    findByIdAsync(2L),
    findByIdAsync(3L)
);

Патерн: First Successful (з fallback)

public CompletableFuture<String> getDataWithFallback() {
    CompletableFuture<String> cache = cacheService.getAsync(key);
    CompletableFuture<String> db = databaseService.getAsync(key);

    return cache.applyToEither(db, Function.identity())
        .exceptionally(ex -> {
            // Обидва впали — використовуємо fallback
            return defaultService.get(key);
        });
}

Патерн: Timeout + Fallback

public CompletableFuture<User> getUserWithTimeout(Long id) {
    return userService.findByIdAsync(id)
        .orTimeout(2, TimeUnit.SECONDS)
        .exceptionally(ex -> {
            if (ex instanceof TimeoutException) {
                log.warn("UserService timeout for id={}", id);
                return cacheService.getCached(id); // fallback
            }
            throw new CompletionException(ex);
        });
}

Патерн: Batch з обмеженням паралелізму

// ПОГАНО: supplyAsync без executor → ForkJoinPool.commonPool()
// semaphore.acquireUninterruptibly() блокує потоки commonPool → thread starvation!

// ДОБРЕ: передаємо dedicated executor
ExecutorService ioExecutor = Executors.newFixedThreadPool(10);

public CompletableFuture<List<User>> getUsersBatch(
        List<Long> ids, int maxParallel) {
    Semaphore semaphore = new Semaphore(maxParallel);

    List<CompletableFuture<User>> futures = ids.stream()
        .map(id -> CompletableFuture.supplyAsync(() -> {
            semaphore.acquireUninterruptibly();
            try {
                return userRepository.findById(id);
            } finally {
                semaphore.release();
            }
        }, ioExecutor))  // ← dedicated executor, НЕ commonPool!
        .toList();

    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        .thenApply(v -> futures.stream()
            .map(CompletableFuture::join)
            .toList());
}

thenCombine під капотом

// thenCombine використовує BiRelay — очікує обидва CF
public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(null, other, fn);
}

// Якщо обидва CF вже завершені — виконується синхронно
// Якщо ні — підписується на completion обох

Обробка помилок при комбінуванні

// Проблема: якщо один з CF падає — весь chain падає
CompletableFuture.allOf(cf1, cf2, cf3)
    .exceptionally(ex -> {
        // Як дізнатися, який саме впав?
        // Потрібно перевіряти кожен
        if (cf1.isCompletedExceptionally()) { /* ... */ }
        if (cf2.isCompletedExceptionally()) { /* ... */ }
        return null;
    });

// ✅ Рішення: handleErrors на кожному CF
CompletableFuture<User> user = userService.findByIdAsync(id)
    .handle((result, ex) -> ex != null ? defaultUser : result);

CompletableFuture<Order> order = orderService.findAsync(id)
    .handle((result, ex) -> ex != null ? emptyList() : result);

// Тепер allOf не впаде — кожен CF обробив свою помилку

Production Experience

@Service
public class DashboardService {

    private final ExecutorService dashboardExecutor =
        Executors.newFixedThreadPool(10, new ThreadFactoryBuilder()
            .setNameFormat("dashboard-%d").build());

    public CompletableFuture<DashboardDto> getDashboard(Long userId) {
        long start = System.nanoTime();

        CompletableFuture<UserDto> user = CompletableFuture
            .supplyAsync(() -> userClient.getUser(userId), dashboardExecutor)
            .orTimeout(3, TimeUnit.SECONDS);

        CompletableFuture<List<OrderDto>> orders = CompletableFuture
            .supplyAsync(() -> orderClient.getOrders(userId), dashboardExecutor)
            .orTimeout(3, TimeUnit.SECONDS);

        CompletableFuture<StatsDto> stats = CompletableFuture
            .supplyAsync(() -> statsClient.getStats(userId), dashboardExecutor)
            .orTimeout(3, TimeUnit.SECONDS);

        return CompletableFuture.allOf(user, orders, stats)
            .thenApply(v -> new DashboardDto(
                user.join(),
                orders.join(),
                stats.join()
            ))
            .whenComplete((result, ex) -> {
                long duration = System.nanoTime() - start;
                metrics.histogram("dashboard.latency", duration);
                if (ex != null) {
                    metrics.counter("dashboard.errors").increment();
                }
            });
    }
}

Best Practices

// ✅ Незалежні CF — thenCombine / allOf
cf1.thenCombine(cf2, combiner);
CompletableFuture.allOf(cf1, cf2, cf3);

// ✅ Залежні CF — thenCompose
cf1.thenCompose(result -> cf2(result));

// ✅ Обробка помилок на кожному CF
cf.handle((r, ex) -> ex != null ? fallback : r);

// ✅ Timeout для кожного CF
cf.orTimeout(5, TimeUnit.SECONDS);

// ✅ Свій Executor для ізоляції
CompletableFuture.supplyAsync(task, dedicatedExecutor);

// ❌ Блокуючі join() в async ланцюжку
// ❌ allOf без обробки помилок
// ❌ Безлімітний паралелізм (Semaphore для batch)

Див. також: [[9. Що робить метод allOf() і коли його використовувати]], [[10. Що робить метод anyOf() і в яких випадках він корисний]]


🎯 Шпаргалка для співбесіди

Обов’язково знати:

  • thenCombine — два незалежних CF, паралельне виконання, latency = max(cf1, cf2)
  • allOf — N CF, повертає CompletableFuture, потрібен join() для результатів
  • anyOf — перший завершений CF, повертає CompletableFuture, потрібен cast
  • thenCompose — залежні CF, послідовне виконання, latency = cf1 + cf2
  • applyToEither — аналог anyOf для двох CF

Часті уточнюючі питання:

  • allOf повертає результати? — Ні, Void. Потрібен join() на кожному вихідному CF
  • thenCombine vs thenCompose — коли що? — thenCombine для незалежних (паралельно), thenCompose для залежних (послідовно)
  • Що якщо один з CF в allOf впаде? — allOf завершиться з помилкою. Потрібен handle на кожному CF
  • anyOf скасовує решту CF? — Ні, решта продовжують виконуватися

Червоні прапорці (НЕ говорити):

  • «allOf повертає список результатів» — він повертає Void, результати через join()
  • «anyOf скасовує решту завдань» — ні, потрібно вручну cancel()
  • «thenCombine виконує послідовно» — він паралельний, latency = max(cf1, cf2)

Пов’язані теми:

  • [[9. Що робить метод allOf() і коли його використовувати]]
  • [[10. Що робить метод anyOf() і в яких випадках він корисний]]
  • [[4. У чому різниця між thenApply() і thenCompose()]]
  • [[22. У чому різниця між thenCombine() і thenCompose()]]