Как правильно выполнить несколько параллельных запросов к микросервисам
Когда нужно получить данные от нескольких микросервисов, не делайте запросы последовательно — выполняйте их параллельно.
🟢 Junior Level
Когда нужно получить данные от нескольких микросервисов, не делайте запросы последовательно — выполняйте их параллельно.
Последовательно (медленно)
// ❌ Каждый запрос ждёт завершения предыдущего
User user = userService.getUser(userId); // 200ms
Order orders = orderService.getOrders(userId); // 300ms
Notification notifs = notificationService.get(userId); // 150ms
// Общее время: 200 + 300 + 150 = 650ms
Параллельно (быстро)
// ✅ Все запросы запускаются одновременно
CompletableFuture<User> user = userService.getUserAsync(userId);
CompletableFuture<List<Order>> orders = orderService.getOrdersAsync(userId);
CompletableFuture<List<Notification>> notifs = notificationService.getAsync(userId);
// Ждём все сразу
CompletableFuture.allOf(user, orders, notifs)
.thenAccept(v -> {
Dashboard dashboard = new Dashboard(
user.join(),
orders.join(),
notifs.join()
);
// Общее время: max(200, 300, 150) = 300ms — в 2 раза быстрее!
});
🟡 Middle Level
Паттерн: Gather и Combine
@Service
public class UserProfileService {
private final RestTemplate restTemplate;
private final Executor executor;
public UserProfileDto getProfile(Long userId) {
// 1. Запускаем все запросы параллельно
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(
() -> restTemplate.getForObject("http://user-service/users/" + userId, User.class),
executor
);
CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync(
() -> restTemplate.getForObject("http://order-service/orders?userId=" + userId, OrderList.class).orders(),
executor
);
CompletableFuture<List<Review>> reviewsFuture = CompletableFuture.supplyAsync(
() -> restTemplate.getForObject("http://review-service/reviews?authorId=" + userId, ReviewList.class).reviews(),
executor
);
// 2. Ждём все
// allOf().join() бросает CompletionException, если хотя бы один CF завершился
// с ошибкой. После успешного allOf().join() повторный .join() на каждом CF
// безопасен, но избыточен — они уже завершены.
CompletableFuture.allOf(userFuture, ordersFuture, reviewsFuture).join();
// 3. Комбинируем результаты
return new UserProfileDto(
userFuture.join(),
ordersFuture.join(),
reviewsFuture.join()
);
}
}
Обработка ошибок
// Если один сервис упал — всё не должно упасть
CompletableFuture<User> user = userService.getUserAsync(userId)
.exceptionally(ex -> {
log.error("User service unavailable", ex);
return User.unknown(); // fallback
});
CompletableFuture<List<Order>> orders = orderService.getOrdersAsync(userId)
.exceptionally(ex -> {
log.error("Order service unavailable", ex);
return List.of(); // пустой список
});
CompletableFuture.allOf(user, orders).join();
// Работает даже при частичных ошибках
Timeout для каждого запроса
CompletableFuture<User> user = userService.getUserAsync(userId)
.orTimeout(2, TimeUnit.SECONDS);
CompletableFuture<List<Order>> orders = orderService.getOrdersAsync(userId)
.orTimeout(3, TimeUnit.SECONDS);
// Если user-service не ответил за 2 секунды — таймаут
// Но order-service может продолжать работать (у него 3 секунды)
Настройка Executor
@Configuration
public class AsyncConfig {
@Bean("microserviceExecutor")
public Executor microserviceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("ms-call-");
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
executor.initialize();
return executor;
}
}
Сравнение подходов
| Подход | Скорость | Сложность | Отказоустойчивость |
|---|---|---|---|
| Последовательный | N × latency | Низкая | Низкая (один упал — всё) |
| CompletableFuture.allOf | max(latency) | Средняя | Средняя (нужна обработка ошибок) |
| Reactive (WebClient) | max(latency) | Высокая | Высокая (retry, timeout, circuit breaker) |
🔴 Senior Level
Resilience4j: Circuit Breaker + Retry
@Service
public class ResilientMicroserviceClient {
private final CircuitBreakerRegistry cbRegistry;
private final RetryRegistry retryRegistry;
private final WebClient webClient;
public CompletableFuture<User> getUserWithResilience(Long userId) {
CircuitBreaker cb = cbRegistry.circuitBreaker("user-service");
Retry retry = retryRegistry.retry("user-service");
return CompletableFuture.supplyAsync(
() -> Retry.decorateCheckedSupplier(retry,
() -> CircuitBreaker.decorateCheckedSupplier(cb,
() -> webClient.get()
.uri("/users/{id}", userId)
.retrieve()
.bodyToMono(User.class)
// ⚠️ Если вы уже используете WebClient, лучше работать с Mono напрямую
// и конвертировать в CompletableFuture через .toFuture(),
// чем блокировать внутри supplyAsync.
.block()
).apply()
).apply(),
executor
);
}
}
Bulkhead — ограничение параллелизма
// Проблема: 1000 запросов одновременно → перегрузка микросервиса
// Решение: Bulkhead — ограничение числа одновременных вызовов
Bulkhead bulkhead = Bulkhead.of("microservice-calls",
BulkheadConfig.custom()
.maxConcurrentCalls(50)
.maxWaitDuration(Duration.ofMillis(500))
.build());
public CompletableFuture<List<User>> getUsers(List<Long> ids) {
List<CompletableFuture<User>> futures = ids.stream()
.map(id -> CompletableFuture.supplyAsync(
Bulkhead.decorateSupplier(bulkhead,
() -> callUserService(id)),
executor
))
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
Partial Results — не ждать всех
// Сценарий: 5 микросервисов, один медленно отвечает
// Не хотим ждать 10 секунд ради одного сервиса
public CompletableFuture<Dashboard> getDashboardPartial(Long userId) {
CompletableFuture<User> user = callUserAsync(userId).orTimeout(2, SECONDS);
CompletableFuture<List<Order>> orders = callOrdersAsync(userId).orTimeout(2, SECONDS);
CompletableFuture<List<Review>> reviews = callReviewsAsync(userId).orTimeout(2, SECONDS);
// Ждём 2 секунды, потом возвращаем что есть
CompletableFuture<Void> all = CompletableFuture.allOf(user, orders, reviews);
// delayedExecutor возвращает Executor, не CompletionStage.
// Нужно обернуть в CF:
CompletableFuture<Void> timeout = CompletableFuture
.runAsync(() -> {}, CompletableFuture.delayedExecutor(2, SECONDS));
return all.applyToEither(timeout, v -> buildDashboard(
getOrDefault(user, null),
getOrDefault(orders, List.of()),
getOrDefault(reviews, List.of())
)
);
}
private <T> T getOrDefault(CompletableFuture<T> future, T defaultValue) {
try {
return future.getNow(defaultValue);
} catch (CompletionException e) {
return defaultValue;
}
}
Грейды сервиса (Service Degradation)
public CompletableFuture<Dashboard> getDashboardWithDegradation(Long userId) {
CompletableFuture<User> user = callUserAsync(userId)
.exceptionally(ex -> {
metrics.counter("degraded.user-service").increment();
return User.unknown(); // Level 1 degradation
});
CompletableFuture<List<Order>> orders = callOrdersAsync(userId)
.thenCompose(orderList -> {
if (orderList.isEmpty()) {
// Level 2 degradation: нет заказов — пробуем из кеша
return cacheService.getCachedOrders(userId)
.exceptionally(ex -> List.of());
}
return CompletableFuture.completedFuture(orderList);
});
return CompletableFuture.allOf(user, orders)
.thenApply(v -> new Dashboard(user.join(), orders.join()));
}
Monitoring и метрики
public CompletableFuture<Dashboard> getDashboardInstrumented(Long userId) {
Timer.Sample sample = Timer.start(metricsRegistry);
CompletableFuture<User> user = callUserAsync(userId);
CompletableFuture<List<Order>> orders = callOrdersAsync(userId);
return CompletableFuture.allOf(user, orders)
.thenApply(v -> new Dashboard(user.join(), orders.join()))
.whenComplete((result, ex) -> {
sample.stop(Timer.builder("dashboard.latency")
.tag("userId", String.valueOf(userId))
.tag("status", ex != null ? "error" : "success")
.register(metricsRegistry));
// Метрики для каждого вызова
metrics.timer("call.user-service.latency", ...);
metrics.timer("call.order-service.latency", ...);
});
}
Production Architecture
Client Request
│
├─→ API Gateway
│
├─→ CompletableFuture.allOf(
│ ├─→ User Service (with Circuit Breaker)
│ ├─→ Order Service (with Circuit Breaker)
│ ├─→ Review Service (with Circuit Breaker)
│ └─→ Notification Service (with Circuit Breaker)
│ )
│
├─→ Timeout: 3s
├─→ Partial results after 3s
└─→ Response with degradation flags
Best Practices
// ✅ Параллельные вызовы через CompletableFuture.allOf
// ✅ Timeout на каждый вызов (orTimeout)
// ✅ Circuit Breaker для каждого микросервиса
// ✅ Fallback для graceful degradation
// ✅ Свой Executor (не ForkJoinPool.commonPool)
// ✅ Метрики и трейсинг для каждого вызова
// ✅ Partial results при таймауте
// ✅ Bulkhead для ограничения нагрузки
// ❌ Последовательные вызовы (блокирующие)
// ❌ Без timeout (бесконечное ожидание)
// ❌ Без circuit breaker (каскадный отказ)
// ❌ Без fallback (полный отказ при ошибке)
// ❌ Общий ForkJoinPool (конкуренция с другими задачами)
🎯 Шпаргалка для интервью
Обязательно знать:
- Параллельно: allOf(cf1, cf2, cf3), latency = max(все CF). Последовательно: latency = sum(все CF)
- Timeout на каждый вызов: orTimeout(2, SECONDS)
- Circuit Breaker (Resilience4j) для каждого микросервиса
- Fallback для graceful degradation
- Bulkhead для ограничения параллелизма (maxConcurrentCalls)
- Свой Executor, не commonPool
Частые уточняющие вопросы:
- allOf().join() бросит если один CF упадёт? — Да, CompletionException. Нужен handle на каждом CF
- Partial results — как вернуть что есть? — orTimeout на каждый CF + applyToEither с delayedExecutor
- Bulkhead зачем? — Ограничивает число одновременных вызовов, предотвращает перегрузку микросервиса
- Почему свой Executor? — Изоляция: сбой одного модуля не влияет на другие
Красные флаги (НЕ говорить):
- «Последовательные вызовы ок для 2-3 сервисов» — latency суммируется, пользователь ждёт дольше
- «Без circuit breaker нормально» — каскадный отказ при недоступности сервиса
- «commonPool подойдёт для HTTP запросов» — thread pool starvation
Связанные темы:
- [[9. Что делает метод allOf() и когда его использовать]]
- [[8. Как комбинировать результаты нескольких CompletableFuture]]
- [[20. Как реализовать timeout для CompletableFuture]]
- [[12. Какой пул потоков используется по умолчанию для async методов]]