Питання 17 · Розділ 19

Як правильно виконати декілька паралельних запитів до мікросервісів

Коли потрібно отримати дані від декількох мікросервісів, не робіть запити послідовно — виконуйте їх паралельно.

Мовні версії: English Russian Ukrainian

🟢 Junior Level

Коли потрібно отримати дані від декількох мікросервісів, не робіть запити послідовно — виконуйте їх паралельно.

Послідовно (повільно)

// ❌ Кожен запит чекає завершення попереднього
User user = userService.getUser(userId);       // 200ms
Order orders = orderService.getOrders(userId);  // 300ms
Notification notifs = notificationService.get(userId); // 150ms
// Загальний час: 200 + 300 + 150 = 650ms

Паралельно (швидко)

// ✅ Всі запити запускаються одночасно
CompletableFuture<User> user = userService.getUserAsync(userId);
CompletableFuture<List<Order>> orders = orderService.getOrdersAsync(userId);
CompletableFuture<List<Notification>> notifs = notificationService.getAsync(userId);

// Чекаємо всі одразу
CompletableFuture.allOf(user, orders, notifs)
    .thenAccept(v -> {
        Dashboard dashboard = new Dashboard(
            user.join(),
            orders.join(),
            notifs.join()
        );
        // Загальний час: max(200, 300, 150) = 300ms — у 2 рази швидше!
    });

🟡 Middle Level

Патерн: Gather і Combine

@Service
public class UserProfileService {

    private final RestTemplate restTemplate;
    private final Executor executor;

    public UserProfileDto getProfile(Long userId) {
        // 1. Запускаємо всі запити паралельно
        CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(
            () -> restTemplate.getForObject("http://user-service/users/" + userId, User.class),
            executor
        );

        CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync(
            () -> restTemplate.getForObject("http://order-service/orders?userId=" + userId, OrderList.class).orders(),
            executor
        );

        CompletableFuture<List<Review>> reviewsFuture = CompletableFuture.supplyAsync(
            () -> restTemplate.getForObject("http://review-service/reviews?authorId=" + userId, ReviewList.class).reviews(),
            executor
        );

        // 2. Чекаємо всі
        // allOf().join() кидає CompletionException, якщо хоча б один CF завершився
        // з помилкою. Після успішного allOf().join() повторний .join() на кожному CF
        // безпечний, але надмірний — вони вже завершені.
        CompletableFuture.allOf(userFuture, ordersFuture, reviewsFuture).join();

        // 3. Комбінуємо результати
        return new UserProfileDto(
            userFuture.join(),
            ordersFuture.join(),
            reviewsFuture.join()
        );
    }
}

Обробка помилок

// Якщо один сервіс впав — все не повинно впасти
CompletableFuture<User> user = userService.getUserAsync(userId)
    .exceptionally(ex -> {
        log.error("User service unavailable", ex);
        return User.unknown(); // fallback
    });

CompletableFuture<List<Order>> orders = orderService.getOrdersAsync(userId)
    .exceptionally(ex -> {
        log.error("Order service unavailable", ex);
        return List.of(); // порожній список
    });

CompletableFuture.allOf(user, orders).join();
// Працює навіть при часткових помилках

Timeout для кожного запиту

CompletableFuture<User> user = userService.getUserAsync(userId)
    .orTimeout(2, TimeUnit.SECONDS);

CompletableFuture<List<Order>> orders = orderService.getOrdersAsync(userId)
    .orTimeout(3, TimeUnit.SECONDS);

// Якщо user-service не відповів за 2 секунди — таймаут
// Але order-service може продовжувати працювати (у нього 3 секунди)

Налаштування Executor

@Configuration
public class AsyncConfig {
    @Bean("microserviceExecutor")
    public Executor microserviceExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("ms-call-");
        executor.setRejectedExecutionHandler(new CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

Порівняння підходів

Підхід Швидкість Складність Відмовостійкість
Послідовний N × latency Низька Низька (один впав — все)
CompletableFuture.allOf max(latency) Середня Середня (потрібна обробка помилок)
Reactive (WebClient) max(latency) Висока Висока (retry, timeout, circuit breaker)

🔴 Senior Level

Resilience4j: Circuit Breaker + Retry

@Service
public class ResilientMicroserviceClient {

    private final CircuitBreakerRegistry cbRegistry;
    private final RetryRegistry retryRegistry;
    private final WebClient webClient;

    public CompletableFuture<User> getUserWithResilience(Long userId) {
        CircuitBreaker cb = cbRegistry.circuitBreaker("user-service");
        Retry retry = retryRegistry.retry("user-service");

        return CompletableFuture.supplyAsync(
            () -> Retry.decorateCheckedSupplier(retry,
                () -> CircuitBreaker.decorateCheckedSupplier(cb,
                    () -> webClient.get()
                        .uri("/users/{id}", userId)
                        .retrieve()
                        .bodyToMono(User.class)
                        // ⚠️ Якщо ви вже використовуєте WebClient, краще працювати з Mono безпосередньо
                        // і конвертувати в CompletableFuture через .toFuture(),
                        // ніж блокувати всередині supplyAsync.
                        .block()
                ).apply()
            ).apply(),
            executor
        );
    }
}

Bulkhead — обмеження паралелізму

// Проблема: 1000 запитів одночасно → перевантаження мікросервісу
// Рішення: Bulkhead — обмеження числа одночасних викликів

Bulkhead bulkhead = Bulkhead.of("microservice-calls",
    BulkheadConfig.custom()
        .maxConcurrentCalls(50)
        .maxWaitDuration(Duration.ofMillis(500))
        .build());

public CompletableFuture<List<User>> getUsers(List<Long> ids) {
    List<CompletableFuture<User>> futures = ids.stream()
        .map(id -> CompletableFuture.supplyAsync(
            Bulkhead.decorateSupplier(bulkhead,
                () -> callUserService(id)),
            executor
        ))
        .toList();

    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        .thenApply(v -> futures.stream()
            .map(CompletableFuture::join)
            .toList());
}

Partial Results — не чекати всіх

// Сценарій: 5 мікросервісів, один повільно відповідає
// Не хочемо чекати 10 секунд заради одного сервісу

public CompletableFuture<Dashboard> getDashboardPartial(Long userId) {
    CompletableFuture<User> user = callUserAsync(userId).orTimeout(2, SECONDS);
    CompletableFuture<List<Order>> orders = callOrdersAsync(userId).orTimeout(2, SECONDS);
    CompletableFuture<List<Review>> reviews = callReviewsAsync(userId).orTimeout(2, SECONDS);

    // Чекаємо 2 секунди, потім повертаємо що є
    CompletableFuture<Void> all = CompletableFuture.allOf(user, orders, reviews);

    // delayedExecutor повертає Executor, не CompletionStage.
    // Потрібно обгорнути в CF:
    CompletableFuture<Void> timeout = CompletableFuture
        .runAsync(() -> {}, CompletableFuture.delayedExecutor(2, SECONDS));
    return all.applyToEither(timeout, v -> buildDashboard(
            getOrDefault(user, null),
            getOrDefault(orders, List.of()),
            getOrDefault(reviews, List.of())
        )
    );
}

private <T> T getOrDefault(CompletableFuture<T> future, T defaultValue) {
    try {
        return future.getNow(defaultValue);
    } catch (CompletionException e) {
        return defaultValue;
    }
}

Грейди сервісу (Service Degradation)

public CompletableFuture<Dashboard> getDashboardWithDegradation(Long userId) {
    CompletableFuture<User> user = callUserAsync(userId)
        .exceptionally(ex -> {
            metrics.counter("degraded.user-service").increment();
            return User.unknown(); // Level 1 degradation
        });

    CompletableFuture<List<Order>> orders = callOrdersAsync(userId)
        .thenCompose(orderList -> {
            if (orderList.isEmpty()) {
                // Level 2 degradation: немає замовлень — пробуємо з кешу
                return cacheService.getCachedOrders(userId)
                    .exceptionally(ex -> List.of());
            }
            return CompletableFuture.completedFuture(orderList);
        });

    return CompletableFuture.allOf(user, orders)
        .thenApply(v -> new Dashboard(user.join(), orders.join()));
}

Monitoring і метрики

public CompletableFuture<Dashboard> getDashboardInstrumented(Long userId) {
    Timer.Sample sample = Timer.start(metricsRegistry);

    CompletableFuture<User> user = callUserAsync(userId);
    CompletableFuture<List<Order>> orders = callOrdersAsync(userId);

    return CompletableFuture.allOf(user, orders)
        .thenApply(v -> new Dashboard(user.join(), orders.join()))
        .whenComplete((result, ex) -> {
            sample.stop(Timer.builder("dashboard.latency")
                .tag("userId", String.valueOf(userId))
                .tag("status", ex != null ? "error" : "success")
                .register(metricsRegistry));

            // Метрики для кожного виклику
            metrics.timer("call.user-service.latency", ...);
            metrics.timer("call.order-service.latency", ...);
        });
}

Production Architecture

Client Request
     │
     ├─→ API Gateway
           │
           ├─→ CompletableFuture.allOf(
           │     ├─→ User Service (with Circuit Breaker)
           │     ├─→ Order Service (with Circuit Breaker)
           │     ├─→ Review Service (with Circuit Breaker)
           │     └─→ Notification Service (with Circuit Breaker)
           │   )
           │
           ├─→ Timeout: 3s
           ├─→ Partial results after 3s
           └─→ Response with degradation flags

Best Practices

// ✅ Паралельні виклики через CompletableFuture.allOf
// ✅ Timeout на кожен виклик (orTimeout)
// ✅ Circuit Breaker для кожного мікросервісу
// ✅ Fallback для graceful degradation
// ✅ Свій Executor (не ForkJoinPool.commonPool)
// ✅ Метрики і трейсинг для кожного виклику
// ✅ Partial results при таймауті
// ✅ Bulkhead для обмеження навантаження

// ❌ Послідовні виклики (блокуючі)
// ❌ Без timeout (нескінченне очікування)
// ❌ Без circuit breaker (каскадний отказ)
// ❌ Без fallback (повний отказ при помилці)
// ❌ Загальний ForkJoinPool (конкуренція з іншими задачами)

🎯 Шпаргалка для співбесіди

Обов’язково знати:

  • Паралельно: allOf(cf1, cf2, cf3), latency = max(усі CF). Послідовно: latency = sum(усі CF)
  • Timeout на кожен виклик: orTimeout(2, SECONDS)
  • Circuit Breaker (Resilience4j) для кожного мікросервісу
  • Fallback для graceful degradation
  • Bulkhead для обмеження паралелізму (maxConcurrentCalls)
  • Свій Executor, не commonPool

Часті уточнюючі питання:

  • allOf().join() кине якщо один CF впаде? — Так, CompletionException. Потрібен handle на кожному CF
  • Partial results — як повернути що є? — orTimeout на кожен CF + applyToEither з delayedExecutor
  • Bulkhead навіщо? — Обмежує число одночасних викликів, запобігає перевантаженню мікросервісу
  • Чому свій Executor? — Ізоляція: збій одного модуля не впливає на інші

Червоні прапорці (НЕ говорити):

  • «Послідовні виклики ок для 2-3 сервісів» — latency сумується, користувач чекає довше
  • «Без circuit breaker нормально» — каскадний отказ при недоступності сервісу
  • «commonPool підійде для HTTP запитів» — thread pool starvation

Пов’язані теми:

  • [[9. Що робить метод allOf() і коли його використовувати]]
  • [[8. Як комбінувати результати декількох CompletableFuture]]
  • [[20. Як реалізувати timeout для CompletableFuture]]
  • [[12. Який пул потоків використовується за замовчуванням для async методів]]