Як правильно виконати декілька паралельних запитів до мікросервісів
Коли потрібно отримати дані від декількох мікросервісів, не робіть запити послідовно — виконуйте їх паралельно.
🟢 Junior Level
Коли потрібно отримати дані від декількох мікросервісів, не робіть запити послідовно — виконуйте їх паралельно.
Послідовно (повільно)
// ❌ Кожен запит чекає завершення попереднього
User user = userService.getUser(userId); // 200ms
Order orders = orderService.getOrders(userId); // 300ms
Notification notifs = notificationService.get(userId); // 150ms
// Загальний час: 200 + 300 + 150 = 650ms
Паралельно (швидко)
// ✅ Всі запити запускаються одночасно
CompletableFuture<User> user = userService.getUserAsync(userId);
CompletableFuture<List<Order>> orders = orderService.getOrdersAsync(userId);
CompletableFuture<List<Notification>> notifs = notificationService.getAsync(userId);
// Чекаємо всі одразу
CompletableFuture.allOf(user, orders, notifs)
.thenAccept(v -> {
Dashboard dashboard = new Dashboard(
user.join(),
orders.join(),
notifs.join()
);
// Загальний час: max(200, 300, 150) = 300ms — у 2 рази швидше!
});
🟡 Middle Level
Патерн: Gather і Combine
@Service
public class UserProfileService {
private final RestTemplate restTemplate;
private final Executor executor;
public UserProfileDto getProfile(Long userId) {
// 1. Запускаємо всі запити паралельно
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(
() -> restTemplate.getForObject("http://user-service/users/" + userId, User.class),
executor
);
CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync(
() -> restTemplate.getForObject("http://order-service/orders?userId=" + userId, OrderList.class).orders(),
executor
);
CompletableFuture<List<Review>> reviewsFuture = CompletableFuture.supplyAsync(
() -> restTemplate.getForObject("http://review-service/reviews?authorId=" + userId, ReviewList.class).reviews(),
executor
);
// 2. Чекаємо всі
// allOf().join() кидає CompletionException, якщо хоча б один CF завершився
// з помилкою. Після успішного allOf().join() повторний .join() на кожному CF
// безпечний, але надмірний — вони вже завершені.
CompletableFuture.allOf(userFuture, ordersFuture, reviewsFuture).join();
// 3. Комбінуємо результати
return new UserProfileDto(
userFuture.join(),
ordersFuture.join(),
reviewsFuture.join()
);
}
}
Обробка помилок
// Якщо один сервіс впав — все не повинно впасти
CompletableFuture<User> user = userService.getUserAsync(userId)
.exceptionally(ex -> {
log.error("User service unavailable", ex);
return User.unknown(); // fallback
});
CompletableFuture<List<Order>> orders = orderService.getOrdersAsync(userId)
.exceptionally(ex -> {
log.error("Order service unavailable", ex);
return List.of(); // порожній список
});
CompletableFuture.allOf(user, orders).join();
// Працює навіть при часткових помилках
Timeout для кожного запиту
CompletableFuture<User> user = userService.getUserAsync(userId)
.orTimeout(2, TimeUnit.SECONDS);
CompletableFuture<List<Order>> orders = orderService.getOrdersAsync(userId)
.orTimeout(3, TimeUnit.SECONDS);
// Якщо user-service не відповів за 2 секунди — таймаут
// Але order-service може продовжувати працювати (у нього 3 секунди)
Налаштування Executor
@Configuration
public class AsyncConfig {
@Bean("microserviceExecutor")
public Executor microserviceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("ms-call-");
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
executor.initialize();
return executor;
}
}
Порівняння підходів
| Підхід | Швидкість | Складність | Відмовостійкість |
|---|---|---|---|
| Послідовний | N × latency | Низька | Низька (один впав — все) |
| CompletableFuture.allOf | max(latency) | Середня | Середня (потрібна обробка помилок) |
| Reactive (WebClient) | max(latency) | Висока | Висока (retry, timeout, circuit breaker) |
🔴 Senior Level
Resilience4j: Circuit Breaker + Retry
@Service
public class ResilientMicroserviceClient {
private final CircuitBreakerRegistry cbRegistry;
private final RetryRegistry retryRegistry;
private final WebClient webClient;
public CompletableFuture<User> getUserWithResilience(Long userId) {
CircuitBreaker cb = cbRegistry.circuitBreaker("user-service");
Retry retry = retryRegistry.retry("user-service");
return CompletableFuture.supplyAsync(
() -> Retry.decorateCheckedSupplier(retry,
() -> CircuitBreaker.decorateCheckedSupplier(cb,
() -> webClient.get()
.uri("/users/{id}", userId)
.retrieve()
.bodyToMono(User.class)
// ⚠️ Якщо ви вже використовуєте WebClient, краще працювати з Mono безпосередньо
// і конвертувати в CompletableFuture через .toFuture(),
// ніж блокувати всередині supplyAsync.
.block()
).apply()
).apply(),
executor
);
}
}
Bulkhead — обмеження паралелізму
// Проблема: 1000 запитів одночасно → перевантаження мікросервісу
// Рішення: Bulkhead — обмеження числа одночасних викликів
Bulkhead bulkhead = Bulkhead.of("microservice-calls",
BulkheadConfig.custom()
.maxConcurrentCalls(50)
.maxWaitDuration(Duration.ofMillis(500))
.build());
public CompletableFuture<List<User>> getUsers(List<Long> ids) {
List<CompletableFuture<User>> futures = ids.stream()
.map(id -> CompletableFuture.supplyAsync(
Bulkhead.decorateSupplier(bulkhead,
() -> callUserService(id)),
executor
))
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
Partial Results — не чекати всіх
// Сценарій: 5 мікросервісів, один повільно відповідає
// Не хочемо чекати 10 секунд заради одного сервісу
public CompletableFuture<Dashboard> getDashboardPartial(Long userId) {
CompletableFuture<User> user = callUserAsync(userId).orTimeout(2, SECONDS);
CompletableFuture<List<Order>> orders = callOrdersAsync(userId).orTimeout(2, SECONDS);
CompletableFuture<List<Review>> reviews = callReviewsAsync(userId).orTimeout(2, SECONDS);
// Чекаємо 2 секунди, потім повертаємо що є
CompletableFuture<Void> all = CompletableFuture.allOf(user, orders, reviews);
// delayedExecutor повертає Executor, не CompletionStage.
// Потрібно обгорнути в CF:
CompletableFuture<Void> timeout = CompletableFuture
.runAsync(() -> {}, CompletableFuture.delayedExecutor(2, SECONDS));
return all.applyToEither(timeout, v -> buildDashboard(
getOrDefault(user, null),
getOrDefault(orders, List.of()),
getOrDefault(reviews, List.of())
)
);
}
private <T> T getOrDefault(CompletableFuture<T> future, T defaultValue) {
try {
return future.getNow(defaultValue);
} catch (CompletionException e) {
return defaultValue;
}
}
Грейди сервісу (Service Degradation)
public CompletableFuture<Dashboard> getDashboardWithDegradation(Long userId) {
CompletableFuture<User> user = callUserAsync(userId)
.exceptionally(ex -> {
metrics.counter("degraded.user-service").increment();
return User.unknown(); // Level 1 degradation
});
CompletableFuture<List<Order>> orders = callOrdersAsync(userId)
.thenCompose(orderList -> {
if (orderList.isEmpty()) {
// Level 2 degradation: немає замовлень — пробуємо з кешу
return cacheService.getCachedOrders(userId)
.exceptionally(ex -> List.of());
}
return CompletableFuture.completedFuture(orderList);
});
return CompletableFuture.allOf(user, orders)
.thenApply(v -> new Dashboard(user.join(), orders.join()));
}
Monitoring і метрики
public CompletableFuture<Dashboard> getDashboardInstrumented(Long userId) {
Timer.Sample sample = Timer.start(metricsRegistry);
CompletableFuture<User> user = callUserAsync(userId);
CompletableFuture<List<Order>> orders = callOrdersAsync(userId);
return CompletableFuture.allOf(user, orders)
.thenApply(v -> new Dashboard(user.join(), orders.join()))
.whenComplete((result, ex) -> {
sample.stop(Timer.builder("dashboard.latency")
.tag("userId", String.valueOf(userId))
.tag("status", ex != null ? "error" : "success")
.register(metricsRegistry));
// Метрики для кожного виклику
metrics.timer("call.user-service.latency", ...);
metrics.timer("call.order-service.latency", ...);
});
}
Production Architecture
Client Request
│
├─→ API Gateway
│
├─→ CompletableFuture.allOf(
│ ├─→ User Service (with Circuit Breaker)
│ ├─→ Order Service (with Circuit Breaker)
│ ├─→ Review Service (with Circuit Breaker)
│ └─→ Notification Service (with Circuit Breaker)
│ )
│
├─→ Timeout: 3s
├─→ Partial results after 3s
└─→ Response with degradation flags
Best Practices
// ✅ Паралельні виклики через CompletableFuture.allOf
// ✅ Timeout на кожен виклик (orTimeout)
// ✅ Circuit Breaker для кожного мікросервісу
// ✅ Fallback для graceful degradation
// ✅ Свій Executor (не ForkJoinPool.commonPool)
// ✅ Метрики і трейсинг для кожного виклику
// ✅ Partial results при таймауті
// ✅ Bulkhead для обмеження навантаження
// ❌ Послідовні виклики (блокуючі)
// ❌ Без timeout (нескінченне очікування)
// ❌ Без circuit breaker (каскадний отказ)
// ❌ Без fallback (повний отказ при помилці)
// ❌ Загальний ForkJoinPool (конкуренція з іншими задачами)
🎯 Шпаргалка для співбесіди
Обов’язково знати:
- Паралельно: allOf(cf1, cf2, cf3), latency = max(усі CF). Послідовно: latency = sum(усі CF)
- Timeout на кожен виклик: orTimeout(2, SECONDS)
- Circuit Breaker (Resilience4j) для кожного мікросервісу
- Fallback для graceful degradation
- Bulkhead для обмеження паралелізму (maxConcurrentCalls)
- Свій Executor, не commonPool
Часті уточнюючі питання:
- allOf().join() кине якщо один CF впаде? — Так, CompletionException. Потрібен handle на кожному CF
- Partial results — як повернути що є? — orTimeout на кожен CF + applyToEither з delayedExecutor
- Bulkhead навіщо? — Обмежує число одночасних викликів, запобігає перевантаженню мікросервісу
- Чому свій Executor? — Ізоляція: збій одного модуля не впливає на інші
Червоні прапорці (НЕ говорити):
- «Послідовні виклики ок для 2-3 сервісів» — latency сумується, користувач чекає довше
- «Без circuit breaker нормально» — каскадний отказ при недоступності сервісу
- «commonPool підійде для HTTP запитів» — thread pool starvation
Пов’язані теми:
- [[9. Що робить метод allOf() і коли його використовувати]]
- [[8. Як комбінувати результати декількох CompletableFuture]]
- [[20. Як реалізувати timeout для CompletableFuture]]
- [[12. Який пул потоків використовується за замовчуванням для async методів]]