Питання 13 · Розділ 19

Як вказати свій Executor для CompletableFuture?

Усі Async методи мають перевантажену версію, що приймає Executor:

Мовні версії: English Russian Ukrainian

🟢 Junior Level

Усі *Async методи мають перевантажену версію, що приймає Executor:

ExecutorService executor = Executors.newFixedThreadPool(10);

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    return fetchData();
}, executor);  // свій Executor

cf.thenApplyAsync(s -> transform(s), executor);  // теж свій Executor

Навіщо:

  • Контроль над кількістю потоків
  • Ізоляція від інших задач
  • Можливість моніторингу

🟡 Middle Level

Стратегії вибору пула

I/O-Bound (HTTP, БД, Файли):

Executor ioExecutor = new ThreadPoolExecutor(
    10, 100, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000),  // Обмеження черги обов'язкове!
    new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()  // Backpressure
);

CPU-Bound (Розрахунки, Маппінг):

Executor cpuExecutor = Executors.newFixedThreadPool(
    Runtime.getRuntime().availableProcessors()
);

Virtual Threads (Java 21+):

Executor vtExecutor = Executors.newVirtualThreadPerTaskExecutor();
CompletableFuture.supplyAsync(task, vtExecutor);

Пастка передачі Executor

Executor потрібно передавати на кожному асинхронному етапі:

// ❌ Погано: тільки перший етап у своєму пулі, решта — в commonPool
CF.supplyAsync(task, myExecutor)
  .thenApplyAsync(transform);  // commonPool!

// ✅ Добре: ізоляція зберігається
CF.supplyAsync(task, myExecutor)
  .thenApplyAsync(transform, myExecutor);

Lifecycle Management

@PreDestroy
public void shutdown() {
    ((ExecutorService) executor).shutdown();
}

Без shutdown() — “зомбі-потоки”, що заважають коректному завершенню процесу.


🔴 Senior Level

Internal Implementation

CallerRunsPolicy як Backpressure:

Коли черга переповнена → задача виконується у потоці, що викликає.
Це природно сповільнює продюсера запитів.

Thread Naming: Завжди використовуйте ThreadFactory (Guava, Spring), щоб давати потокам імена. Без цього аналіз дампів пам’яті у разі інциденту буде неможливий.

Просунуті техніки: ManagedBlocker

Якщо ви використовуєте ForkJoinPool (в т.ч. commonPool) для задач, які можуть блокуватися, ви можете використати інтерфейс ManagedBlocker.

  • Under the hood: Це дозволяє пулу динамічно створювати новий потік, щоб компенсувати заблокований, зберігаючи рівень паралелізму. Це складніше в реалізації, ніж просто FixedThreadPool, але набагато ефективніше.
// Приклад реалізації ManagedBlocker для блокуючих операцій:
class BlockingAdapter<T> implements ForkJoinPool.ManagedBlocker {
    private T result;
    private final Supplier<T> task;
    private boolean isDone = false;

    public boolean block() {
        result = task.get();  // блокуюча операція
        isDone = true;
        return true;
    }
    public boolean isReleasable() { return isDone; }
    public T getResult() { return result; }
}

Архітектурні Trade-offs

Підхід Плюси Мінуси
FixedThreadPool Передбачуваний розмір Немає work-stealing
CachedThreadPool Авто-масштабування Необмежені потоки
Virtual Threads Ідеальний для I/O Тільки Java 21+
ManagedBlocker Компенсація блокувань Складна реалізація

Пограничні випадки

  • Spring @Async: Якщо ви використовуєте CompletableFuture разом з @Async, Spring може підставити свій TaskExecutor. Переконайтеся, що ви контролюєте його конфігурацію.

When NOT to use custom Executor

  • CPU-bound задачі — ForkJoinPool.commonPool() оптимальний (work-stealing)
  • Мало задач — overhead на створення пулу > вигоди

Best Practices

// ✅ Завжди свій Executor для production
CompletableFuture.supplyAsync(task, ioExecutor);

// ✅ CallerRunsPolicy для backpressure
new ThreadPoolExecutor.CallerRunsPolicy();

// ✅ Thread naming для діагностики
new ThreadFactoryBuilder().setNameFormat("pool-%d").build();

// ✅ Virtual Threads для I/O (Java 21+)
Executors.newVirtualThreadPerTaskExecutor();

// ❌ Без shutdown() при виході
// ❌ commonPool для I/O
// ❌ Необмежені черги

Резюме для Senior

  • Передача Executor — це стандарт для Production.
  • Розділяйте I/O і CPU навантаження в різні пули.
  • Використовуйте CallerRunsPolicy для захисту від перевантаження.
  • На Java 21 переходьте на VirtualThreadPerTaskExecutor для I/O задач.

🎯 Шпаргалка для співбесіди

Обов’язково знати:

  • Усі *Async методи мають перевантажену версію з Executor
  • I/O-Bound: ThreadPoolExecutor(core=10, max=100, CallerRunsPolicy)
  • CPU-Bound: FixedThreadPool(availableProcessors)
  • Virtual Threads (Java 21+): Executors.newVirtualThreadPerTaskExecutor() — ідеальний для I/O
  • Executor потрібно передавати на КОЖНОМУ async етапі ланцюжка

Часті уточнюючі питання:

  • Чому CallerRunsPolicy? — Backpressure: при переповненні черги задача виконується потоком, що викликає, сповільнюючи продюсера
  • Навіщо ThreadFactory з іменами? — Діагностика: без імен потоків аналіз jstack неможливий
  • Що якщо не передати Executor на thenApplyAsync? — Використовується commonPool — втрата ізоляції
  • ManagedBlocker навіщо? — Компенсація блокувань у ForkJoinPool: пул створює дод. потік замість заблокованого

Червоні прапорці (НЕ говорити):

  • «CachedThreadPool для production» — необмежені потоки → OOM
  • «Executor потрібен тільки на першому етапі» — кожен *Async без Executor йде в commonPool
  • «Без shutdown() нормально» — zombie-потоки заважають завершенню процесу

Пов’язані теми:

  • [[12. Який пул потоків використовується за замовчуванням для async методів]]
  • [[15. Чому важливо уникати блокуючих операцій в CompletableFuture]]
  • [[11. У чому різниця між thenApply() і thenApplyAsync()]]
  • [[16. Що робить метод supplyAsync() і коли його використовувати]]