Вопрос 16 · Раздел 19

Как правильно выполнить несколько параллельных запросов к микросервисам

Когда нужно получить данные от нескольких микросервисов, не делайте запросы последовательно — выполняйте их параллельно.

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

Когда нужно получить данные от нескольких микросервисов, не делайте запросы последовательно — выполняйте их параллельно.

Последовательно (медленно)

// ❌ Каждый запрос ждёт завершения предыдущего
User user = userService.getUser(userId);       // 200ms
Order orders = orderService.getOrders(userId);  // 300ms
Notification notifs = notificationService.get(userId); // 150ms
// Общее время: 200 + 300 + 150 = 650ms

Параллельно (быстро)

// ✅ Все запросы запускаются одновременно
CompletableFuture<User> user = userService.getUserAsync(userId);
CompletableFuture<List<Order>> orders = orderService.getOrdersAsync(userId);
CompletableFuture<List<Notification>> notifs = notificationService.getAsync(userId);

// Ждём все сразу
CompletableFuture.allOf(user, orders, notifs)
    .thenAccept(v -> {
        Dashboard dashboard = new Dashboard(
            user.join(),
            orders.join(),
            notifs.join()
        );
        // Общее время: max(200, 300, 150) = 300ms — в 2 раза быстрее!
    });

🟡 Middle Level

Паттерн: Gather и Combine

@Service
public class UserProfileService {

    private final RestTemplate restTemplate;
    private final Executor executor;

    public UserProfileDto getProfile(Long userId) {
        // 1. Запускаем все запросы параллельно
        CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(
            () -> restTemplate.getForObject("http://user-service/users/" + userId, User.class),
            executor
        );

        CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync(
            () -> restTemplate.getForObject("http://order-service/orders?userId=" + userId, OrderList.class).orders(),
            executor
        );

        CompletableFuture<List<Review>> reviewsFuture = CompletableFuture.supplyAsync(
            () -> restTemplate.getForObject("http://review-service/reviews?authorId=" + userId, ReviewList.class).reviews(),
            executor
        );

        // 2. Ждём все
        // allOf().join() бросает CompletionException, если хотя бы один CF завершился
        // с ошибкой. После успешного allOf().join() повторный .join() на каждом CF
        // безопасен, но избыточен — они уже завершены.
        CompletableFuture.allOf(userFuture, ordersFuture, reviewsFuture).join();

        // 3. Комбинируем результаты
        return new UserProfileDto(
            userFuture.join(),
            ordersFuture.join(),
            reviewsFuture.join()
        );
    }
}

Обработка ошибок

// Если один сервис упал — всё не должно упасть
CompletableFuture<User> user = userService.getUserAsync(userId)
    .exceptionally(ex -> {
        log.error("User service unavailable", ex);
        return User.unknown(); // fallback
    });

CompletableFuture<List<Order>> orders = orderService.getOrdersAsync(userId)
    .exceptionally(ex -> {
        log.error("Order service unavailable", ex);
        return List.of(); // пустой список
    });

CompletableFuture.allOf(user, orders).join();
// Работает даже при частичных ошибках

Timeout для каждого запроса

CompletableFuture<User> user = userService.getUserAsync(userId)
    .orTimeout(2, TimeUnit.SECONDS);

CompletableFuture<List<Order>> orders = orderService.getOrdersAsync(userId)
    .orTimeout(3, TimeUnit.SECONDS);

// Если user-service не ответил за 2 секунды — таймаут
// Но order-service может продолжать работать (у него 3 секунды)

Настройка Executor

@Configuration
public class AsyncConfig {
    @Bean("microserviceExecutor")
    public Executor microserviceExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("ms-call-");
        executor.setRejectedExecutionHandler(new CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

Сравнение подходов

Подход Скорость Сложность Отказоустойчивость
Последовательный N × latency Низкая Низкая (один упал — всё)
CompletableFuture.allOf max(latency) Средняя Средняя (нужна обработка ошибок)
Reactive (WebClient) max(latency) Высокая Высокая (retry, timeout, circuit breaker)

🔴 Senior Level

Resilience4j: Circuit Breaker + Retry

@Service
public class ResilientMicroserviceClient {

    private final CircuitBreakerRegistry cbRegistry;
    private final RetryRegistry retryRegistry;
    private final WebClient webClient;

    public CompletableFuture<User> getUserWithResilience(Long userId) {
        CircuitBreaker cb = cbRegistry.circuitBreaker("user-service");
        Retry retry = retryRegistry.retry("user-service");

        return CompletableFuture.supplyAsync(
            () -> Retry.decorateCheckedSupplier(retry,
                () -> CircuitBreaker.decorateCheckedSupplier(cb,
                    () -> webClient.get()
                        .uri("/users/{id}", userId)
                        .retrieve()
                        .bodyToMono(User.class)
                        // ⚠️ Если вы уже используете WebClient, лучше работать с Mono напрямую
                        // и конвертировать в CompletableFuture через .toFuture(),
                        // чем блокировать внутри supplyAsync.
                        .block()
                ).apply()
            ).apply(),
            executor
        );
    }
}

Bulkhead — ограничение параллелизма

// Проблема: 1000 запросов одновременно → перегрузка микросервиса
// Решение: Bulkhead — ограничение числа одновременных вызовов

Bulkhead bulkhead = Bulkhead.of("microservice-calls",
    BulkheadConfig.custom()
        .maxConcurrentCalls(50)
        .maxWaitDuration(Duration.ofMillis(500))
        .build());

public CompletableFuture<List<User>> getUsers(List<Long> ids) {
    List<CompletableFuture<User>> futures = ids.stream()
        .map(id -> CompletableFuture.supplyAsync(
            Bulkhead.decorateSupplier(bulkhead,
                () -> callUserService(id)),
            executor
        ))
        .toList();

    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        .thenApply(v -> futures.stream()
            .map(CompletableFuture::join)
            .toList());
}

Partial Results — не ждать всех

// Сценарий: 5 микросервисов, один медленно отвечает
// Не хотим ждать 10 секунд ради одного сервиса

public CompletableFuture<Dashboard> getDashboardPartial(Long userId) {
    CompletableFuture<User> user = callUserAsync(userId).orTimeout(2, SECONDS);
    CompletableFuture<List<Order>> orders = callOrdersAsync(userId).orTimeout(2, SECONDS);
    CompletableFuture<List<Review>> reviews = callReviewsAsync(userId).orTimeout(2, SECONDS);

    // Ждём 2 секунды, потом возвращаем что есть
    CompletableFuture<Void> all = CompletableFuture.allOf(user, orders, reviews);

    // delayedExecutor возвращает Executor, не CompletionStage.
    // Нужно обернуть в CF:
    CompletableFuture<Void> timeout = CompletableFuture
        .runAsync(() -> {}, CompletableFuture.delayedExecutor(2, SECONDS));
    return all.applyToEither(timeout, v -> buildDashboard(
            getOrDefault(user, null),
            getOrDefault(orders, List.of()),
            getOrDefault(reviews, List.of())
        )
    );
}

private <T> T getOrDefault(CompletableFuture<T> future, T defaultValue) {
    try {
        return future.getNow(defaultValue);
    } catch (CompletionException e) {
        return defaultValue;
    }
}

Грейды сервиса (Service Degradation)

public CompletableFuture<Dashboard> getDashboardWithDegradation(Long userId) {
    CompletableFuture<User> user = callUserAsync(userId)
        .exceptionally(ex -> {
            metrics.counter("degraded.user-service").increment();
            return User.unknown(); // Level 1 degradation
        });

    CompletableFuture<List<Order>> orders = callOrdersAsync(userId)
        .thenCompose(orderList -> {
            if (orderList.isEmpty()) {
                // Level 2 degradation: нет заказов — пробуем из кеша
                return cacheService.getCachedOrders(userId)
                    .exceptionally(ex -> List.of());
            }
            return CompletableFuture.completedFuture(orderList);
        });

    return CompletableFuture.allOf(user, orders)
        .thenApply(v -> new Dashboard(user.join(), orders.join()));
}

Monitoring и метрики

public CompletableFuture<Dashboard> getDashboardInstrumented(Long userId) {
    Timer.Sample sample = Timer.start(metricsRegistry);

    CompletableFuture<User> user = callUserAsync(userId);
    CompletableFuture<List<Order>> orders = callOrdersAsync(userId);

    return CompletableFuture.allOf(user, orders)
        .thenApply(v -> new Dashboard(user.join(), orders.join()))
        .whenComplete((result, ex) -> {
            sample.stop(Timer.builder("dashboard.latency")
                .tag("userId", String.valueOf(userId))
                .tag("status", ex != null ? "error" : "success")
                .register(metricsRegistry));

            // Метрики для каждого вызова
            metrics.timer("call.user-service.latency", ...);
            metrics.timer("call.order-service.latency", ...);
        });
}

Production Architecture

Client Request
     │
     ├─→ API Gateway
           │
           ├─→ CompletableFuture.allOf(
           │     ├─→ User Service (with Circuit Breaker)
           │     ├─→ Order Service (with Circuit Breaker)
           │     ├─→ Review Service (with Circuit Breaker)
           │     └─→ Notification Service (with Circuit Breaker)
           │   )
           │
           ├─→ Timeout: 3s
           ├─→ Partial results after 3s
           └─→ Response with degradation flags

Best Practices

// ✅ Параллельные вызовы через CompletableFuture.allOf
// ✅ Timeout на каждый вызов (orTimeout)
// ✅ Circuit Breaker для каждого микросервиса
// ✅ Fallback для graceful degradation
// ✅ Свой Executor (не ForkJoinPool.commonPool)
// ✅ Метрики и трейсинг для каждого вызова
// ✅ Partial results при таймауте
// ✅ Bulkhead для ограничения нагрузки

// ❌ Последовательные вызовы (блокирующие)
// ❌ Без timeout (бесконечное ожидание)
// ❌ Без circuit breaker (каскадный отказ)
// ❌ Без fallback (полный отказ при ошибке)
// ❌ Общий ForkJoinPool (конкуренция с другими задачами)

🎯 Шпаргалка для интервью

Обязательно знать:

  • Параллельно: allOf(cf1, cf2, cf3), latency = max(все CF). Последовательно: latency = sum(все CF)
  • Timeout на каждый вызов: orTimeout(2, SECONDS)
  • Circuit Breaker (Resilience4j) для каждого микросервиса
  • Fallback для graceful degradation
  • Bulkhead для ограничения параллелизма (maxConcurrentCalls)
  • Свой Executor, не commonPool

Частые уточняющие вопросы:

  • allOf().join() бросит если один CF упадёт? — Да, CompletionException. Нужен handle на каждом CF
  • Partial results — как вернуть что есть? — orTimeout на каждый CF + applyToEither с delayedExecutor
  • Bulkhead зачем? — Ограничивает число одновременных вызовов, предотвращает перегрузку микросервиса
  • Почему свой Executor? — Изоляция: сбой одного модуля не влияет на другие

Красные флаги (НЕ говорить):

  • «Последовательные вызовы ок для 2-3 сервисов» — latency суммируется, пользователь ждёт дольше
  • «Без circuit breaker нормально» — каскадный отказ при недоступности сервиса
  • «commonPool подойдёт для HTTP запросов» — thread pool starvation

Связанные темы:

  • [[9. Что делает метод allOf() и когда его использовать]]
  • [[8. Как комбинировать результаты нескольких CompletableFuture]]
  • [[20. Как реализовать timeout для CompletableFuture]]
  • [[12. Какой пул потоков используется по умолчанию для async методов]]