Вопрос 15 · Раздел 19

Почему важно избегать блокирующих операций в CompletableFuture

CompletableFuture обычно выполняется в ForkJoinPool.commonPool(), который имеет очень мало потоков (обычно = число ядер CPU - 1).

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

CompletableFuture обычно выполняется в ForkJoinPool.commonPool(), который имеет очень мало потоков (обычно = число ядер CPU - 1).

Если один из потоков заблокируется, другие задачи не смогут выполняться — это называется thread pool starvation.

// ❌ Опасно — блокирует поток ForkJoinPool
CompletableFuture.supplyAsync(() -> {
    return httpClient.get(url);  // блокирующий I/O
});

// ✅ Безопасно — свой Executor для I/O
ExecutorService ioExecutor = Executors.newFixedThreadPool(20);
CompletableFuture.supplyAsync(() -> {
    return httpClient.get(url);
}, ioExecutor);

Простая аналогия:

  • ForkJoinPool — как 4 кассы в магазине
  • Если одна кассирка уснёт — очередь не двигается
  • Нужно не давать кассирам спать!

🟡 Middle Level

Thread pool starvation

// commonPool имеет 7 потоков (на 8-ядерном CPU)
// Если 7 задач заблокируются — все новые задачи ждут

for (int i = 0; i < 100; i++) {
    CompletableFuture.supplyAsync(() -> {
        Thread.sleep(5000);  // блокирует поток
        return "done";
    });
}
// Только первые 7 выполняются
// Остальные 93 ждут освобождения потока

Последствия

  1. Deadlock: ```java CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> { return cf2.join(); // ждёт cf2 });

CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> { return cf1.join(); // ждёт cf1 — deadlock! });


2. **Performance degradation:**
```java
// Все потоки заняты блокирующими операциями
// Latency растёт, throughput падает

Типичные ошибки

  1. Blocking в thenApply: ```java // ❌ thenApply выполняется в ForkJoinPool cf.thenApply(s -> { return httpClient.sendBlocking(url); // блокирует! });

// ✅ thenApplyAsync с IO Executor cf.thenApplyAsync(s -> httpClient.sendBlocking(url), ioExecutor);


---

## 🔴 Senior Level

### Internal Implementation

**ForkJoinPool.commonPool():**
```java
// Размер = Runtime.getRuntime().availableProcessors() - 1
// Для 8 ядер = 7 потоков
// Для CPU-bound задач — идеально
// По умолчанию размер пула = availableProcessors() - 1 (можно изменить через
// `java.util.concurrent.ForkJoinPool.common.parallelism`). Для CPU-bound задач
// подходит хорошо, для I/O — может стать узким местом при большом количестве
// блокирующих операций.

Work-stealing:

// ForkJoinPool использует work-stealing
// Но если все потоки blocked — stealing не помогает

// Thread 1: [BLOCKED on I/O]
// Thread 2: [BLOCKED on I/O]
// Thread 3: [BLOCKED on I/O]
// ...
// Queue: [waiting tasks...]  // никто не обрабатывает

Архитектурные Trade-offs

Подход Плюсы Минусы
commonPool Не нужно создавать Только CPU-bound
Свой Executor Контроль Нужно управлять
Virtual Threads Best of both Java 21+

Edge Cases

1. Cascading blocking:

// Один blocking вызов тянет другие
cf1.thenApply(s -> blockingCall1(s))
   .thenCompose(r -> blockingCall2(r))
   .thenAccept(result -> blockingCall3(result));
// Один CF = 3 blocking операции = 3x время блокировки

2. Mixed workload:

// CPU-bound + I/O в одном пуле
CompletableFuture.supplyAsync(() -> heavyCalculation());  // CPU
CompletableFuture.supplyAsync(() -> httpClient.get(url)); // I/O

// I/O блокирует поток — CPU задачи ждут

Производительность

Thread pool starvation эффект:
- В худшем случае, когда пул полностью насыщен задачами, одна блокирующая задача
  снижает throughput примерно на 1/N, где N — размер пула. Фактическая потеря
  зависит от характера задач и работы work-stealing.
- 7 blocking tasks = 100% потеря (полный deadlock)

Virtual Threads решение:
- 100000+ блокирующих задач = без проблем
- Каждый blocking — suspend, не блокирует OS thread

Production Experience

Detecting starvation:

# Thread dump
jstack <pid> | grep -A 10 "ForkJoinPool"

# Метрики
- Active threads < pool size
- Queue size растёт
- Latency растёт

Prevention:

// ✅ Разделение пулов
ExecutorService cpuExecutor = Executors.newFixedThreadPool(
    Runtime.getRuntime().availableProcessors());
ExecutorService ioExecutor = Executors.newFixedThreadPool(50);

// CPU задачи
CompletableFuture.supplyAsync(cpuTask, cpuExecutor);

// I/O задачи
CompletableFuture.supplyAsync(ioTask, ioExecutor);

// ✅ Virtual Threads (Java 21+)
ExecutorService vThreads = Executors.newVirtualThreadPerTaskExecutor();
CompletableFuture.supplyAsync(anyTask, vThreads);

Best Practices

// ✅ Свой Executor для I/O
CompletableFuture.supplyAsync(ioTask, ioExecutor);

// ✅ Virtual Threads
Executors.newVirtualThreadPerTaskExecutor();

// ✅ Мониторинг thread pool
metrics.recordQueueSize(executor.getQueue().size());

// ❌ Blocking в commonPool
// ❌ join()/get() без таймаута
// ❌ Игнорирование thread starvation

🎯 Шпаргалка для интервью

Обязательно знать:

  • ForkJoinPool.commonPool() имеет мало потоков (availableProcessors - 1)
  • Thread pool starvation: все потоки заблокированы, новые задачи ждут
  • Blocking в commonPool снижает throughput для ВСЕХ задач (Parallel Streams, другие CF)
  • Cascading blocking: один blocking вызов тянет другие в цепочке
  • Решение: свой Executor для I/O, Virtual Threads (Java 21+), разделение пулов

Частые уточняющие вопросы:

  • Что будет если 7 задач заблокируют commonPool? — Все новые задачи ждут, полный deadlock
  • Work-stealing помогает при blocking? — Нет, если все потоки blocked — stealing не работает
  • Как detect starvation? — jstack: ForkJoinPool-worker-X в BLOCKED/WAITING, queue size растёт
  • Virtual Threads решают проблему? — Да, 100000+ blocking задач без problems (Java 21+)

Красные флаги (НЕ говорить):

  • «Blocking в thenApply нормально — он же лёгкий» — thenApply в том же потоке, blocking убивает пул
  • «commonPool масштабируется автоматически» — ограничен availableProcessors - 1
  • «join() без таймаута в production — ок» — бесконечное ожидание, cascading failure

Связанные темы:

  • [[14. Что такое блокирующий код и как его отличить от неблокирующего]]
  • [[12. Какой пул потоков используется по умолчанию для async методов]]
  • [[13. Как указать свой Executor для CompletableFuture]]
  • [[16. Что делает метод supplyAsync() и когда его использовать]]