Як комбінувати результати декількох CompletableFuture
Є три основні способи об'єднати декілька CompletableFuture:
🟢 Junior Level
Є три основні способи об’єднати декілька CompletableFuture:
1. thenCombine — два CF, один результат
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Ivan");
CompletableFuture<Integer> age = CompletableFuture.supplyAsync(() -> 30);
name.thenCombine(age, (n, a) -> n + " is " + a + " years old")
.thenAccept(System.out::println);
// Ivan is 30 years old
2. allOf — чекати УСІ
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> "C");
CompletableFuture<Void> all = CompletableFuture.allOf(cf1, cf2, cf3);
all.thenRun(() -> {
System.out.println(cf1.join() + cf2.join() + cf3.join());
// ABC
});
3. thenCompose — один за одним
CompletableFuture<Long> userId = getUserIdAsync();
userId.thenCompose(id -> getUserAsync(id)) // спочатку getUserId, потім getUser
.thenAccept(user -> System.out.println(user.name()));
Порівняння
| Метод | Скільки CF | Порядок | Результат |
|---|---|---|---|
thenCombine |
2 | Паралельно | Комбінований |
allOf |
N | Паралельно | Void (потрібен join) |
thenCompose |
2 | Послідовно | Другий CF |
🟡 Middle Level
thenCombine — два незалежних CF
CompletableFuture<User> userFuture = userService.findByIdAsync(userId);
CompletableFuture<Order> orderFuture = orderService.findLastOrderAsync(userId);
// Обидва запити виконуються ПАРАЛЕЛЬНО
userFuture.thenCombine(orderFuture, (user, order) ->
new UserProfile(user, order))
.thenAccept(profile -> sendToClient(profile));
Latency: max(userTime, orderTime) — виконується за час найповільнішого.
allOf — N незалежних CF
List<Long> userIds = List.of(1L, 2L, 3L, 4L, 5L);
List<CompletableFuture<User>> futures = userIds.stream()
.map(id -> userService.findByIdAsync(id))
.toList();
CompletableFuture<Void> all = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// Отримати всі результати
all.thenApply(v ->
futures.stream()
.map(CompletableFuture::join) // НЕ блокує — всі вже готові
.toList()
);
anyOf — перший завершений
CompletableFuture<String> server1 = callServerAsync("server1");
CompletableFuture<String> server2 = callServerAsync("server2");
CompletableFuture<String> server3 = callServerAsync("server3");
// Беремо перший, що відповів
CompletableFuture<Object> fastest = CompletableFuture.anyOf(server1, server2, server3);
fastest.thenAccept(result -> System.out.println("Fastest: " + result));
Комбінація методів — реальний приклад
public CompletableFuture<Dashboard> getDashboard(Long userId) {
// Паралельні незалежні запити
CompletableFuture<User> user = userService.findByIdAsync(userId);
CompletableFuture<List<Order>> orders = orderService.findByUserAsync(userId);
CompletableFuture<List<Notification>> notifications =
notificationService.findByUserAsync(userId);
// Комбінуємо через allOf
return CompletableFuture.allOf(user, orders, notifications)
.thenApply(v -> new Dashboard(
user.join(),
orders.join(),
notifications.join()
));
}
thenCompose vs thenCombine — коли що
// ❌ thenCompose — послідовно (повільніше)
getUserAsync(userId)
.thenCompose(user -> getOrderAsync(user.getId()))
.thenAccept(order -> ...);
// Latency: getUser + getOrder
// ✅ thenCompose — коли ДРУГИЙ залежить від ПЕРШОГО
getUserAsync(userId)
.thenCompose(user -> getOrdersByRegionAsync(user.getRegion()));
// Не можна зробити паралельно — потрібен user.region
// ✅ thenCombine — коли НЕЗАЛЕЖНІ (швидше)
CompletableFuture<User> user = getUserAsync(userId);
CompletableFuture<Order> order = getOrderAsync(userId);
user.thenCombine(order, UserProfile::new);
// Latency: max(getUser, getOrder)
🔴 Senior Level
Патерн: Collect All Results
public static <T> CompletableFuture<List<T>> allAsList(
CompletableFuture<T>... futures) {
return CompletableFuture.allOf(futures)
.thenApply(v -> Arrays.stream(futures)
.map(CompletableFuture::join)
.toList());
}
// Використання
CompletableFuture<List<User>> allUsers = allAsList(
findByIdAsync(1L),
findByIdAsync(2L),
findByIdAsync(3L)
);
Патерн: First Successful (з fallback)
public CompletableFuture<String> getDataWithFallback() {
CompletableFuture<String> cache = cacheService.getAsync(key);
CompletableFuture<String> db = databaseService.getAsync(key);
return cache.applyToEither(db, Function.identity())
.exceptionally(ex -> {
// Обидва впали — використовуємо fallback
return defaultService.get(key);
});
}
Патерн: Timeout + Fallback
public CompletableFuture<User> getUserWithTimeout(Long id) {
return userService.findByIdAsync(id)
.orTimeout(2, TimeUnit.SECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
log.warn("UserService timeout for id={}", id);
return cacheService.getCached(id); // fallback
}
throw new CompletionException(ex);
});
}
Патерн: Batch з обмеженням паралелізму
// ПОГАНО: supplyAsync без executor → ForkJoinPool.commonPool()
// semaphore.acquireUninterruptibly() блокує потоки commonPool → thread starvation!
// ДОБРЕ: передаємо dedicated executor
ExecutorService ioExecutor = Executors.newFixedThreadPool(10);
public CompletableFuture<List<User>> getUsersBatch(
List<Long> ids, int maxParallel) {
Semaphore semaphore = new Semaphore(maxParallel);
List<CompletableFuture<User>> futures = ids.stream()
.map(id -> CompletableFuture.supplyAsync(() -> {
semaphore.acquireUninterruptibly();
try {
return userRepository.findById(id);
} finally {
semaphore.release();
}
}, ioExecutor)) // ← dedicated executor, НЕ commonPool!
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
thenCombine під капотом
// thenCombine використовує BiRelay — очікує обидва CF
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
// Якщо обидва CF вже завершені — виконується синхронно
// Якщо ні — підписується на completion обох
Обробка помилок при комбінуванні
// Проблема: якщо один з CF падає — весь chain падає
CompletableFuture.allOf(cf1, cf2, cf3)
.exceptionally(ex -> {
// Як дізнатися, який саме впав?
// Потрібно перевіряти кожен
if (cf1.isCompletedExceptionally()) { /* ... */ }
if (cf2.isCompletedExceptionally()) { /* ... */ }
return null;
});
// ✅ Рішення: handleErrors на кожному CF
CompletableFuture<User> user = userService.findByIdAsync(id)
.handle((result, ex) -> ex != null ? defaultUser : result);
CompletableFuture<Order> order = orderService.findAsync(id)
.handle((result, ex) -> ex != null ? emptyList() : result);
// Тепер allOf не впаде — кожен CF обробив свою помилку
Production Experience
@Service
public class DashboardService {
private final ExecutorService dashboardExecutor =
Executors.newFixedThreadPool(10, new ThreadFactoryBuilder()
.setNameFormat("dashboard-%d").build());
public CompletableFuture<DashboardDto> getDashboard(Long userId) {
long start = System.nanoTime();
CompletableFuture<UserDto> user = CompletableFuture
.supplyAsync(() -> userClient.getUser(userId), dashboardExecutor)
.orTimeout(3, TimeUnit.SECONDS);
CompletableFuture<List<OrderDto>> orders = CompletableFuture
.supplyAsync(() -> orderClient.getOrders(userId), dashboardExecutor)
.orTimeout(3, TimeUnit.SECONDS);
CompletableFuture<StatsDto> stats = CompletableFuture
.supplyAsync(() -> statsClient.getStats(userId), dashboardExecutor)
.orTimeout(3, TimeUnit.SECONDS);
return CompletableFuture.allOf(user, orders, stats)
.thenApply(v -> new DashboardDto(
user.join(),
orders.join(),
stats.join()
))
.whenComplete((result, ex) -> {
long duration = System.nanoTime() - start;
metrics.histogram("dashboard.latency", duration);
if (ex != null) {
metrics.counter("dashboard.errors").increment();
}
});
}
}
Best Practices
// ✅ Незалежні CF — thenCombine / allOf
cf1.thenCombine(cf2, combiner);
CompletableFuture.allOf(cf1, cf2, cf3);
// ✅ Залежні CF — thenCompose
cf1.thenCompose(result -> cf2(result));
// ✅ Обробка помилок на кожному CF
cf.handle((r, ex) -> ex != null ? fallback : r);
// ✅ Timeout для кожного CF
cf.orTimeout(5, TimeUnit.SECONDS);
// ✅ Свій Executor для ізоляції
CompletableFuture.supplyAsync(task, dedicatedExecutor);
// ❌ Блокуючі join() в async ланцюжку
// ❌ allOf без обробки помилок
// ❌ Безлімітний паралелізм (Semaphore для batch)
Див. також: [[9. Що робить метод allOf() і коли його використовувати]], [[10. Що робить метод anyOf() і в яких випадках він корисний]]
🎯 Шпаргалка для співбесіди
Обов’язково знати:
- thenCombine — два незалежних CF, паралельне виконання, latency = max(cf1, cf2)
- allOf — N CF, повертає CompletableFuture
, потрібен join() для результатів - anyOf — перший завершений CF, повертає CompletableFuture
- thenCompose — залежні CF, послідовне виконання, latency = cf1 + cf2
- applyToEither — аналог anyOf для двох CF
Часті уточнюючі питання:
- allOf повертає результати? — Ні, Void. Потрібен join() на кожному вихідному CF
- thenCombine vs thenCompose — коли що? — thenCombine для незалежних (паралельно), thenCompose для залежних (послідовно)
- Що якщо один з CF в allOf впаде? — allOf завершиться з помилкою. Потрібен handle на кожному CF
- anyOf скасовує решту CF? — Ні, решта продовжують виконуватися
Червоні прапорці (НЕ говорити):
- «allOf повертає список результатів» — він повертає Void, результати через join()
- «anyOf скасовує решту завдань» — ні, потрібно вручну cancel()
- «thenCombine виконує послідовно» — він паралельний, latency = max(cf1, cf2)
Пов’язані теми:
- [[9. Що робить метод allOf() і коли його використовувати]]
- [[10. Що робить метод anyOf() і в яких випадках він корисний]]
- [[4. У чому різниця між thenApply() і thenCompose()]]
- [[22. У чому різниця між thenCombine() і thenCompose()]]