Як вказати свій Executor для CompletableFuture?
Усі Async методи мають перевантажену версію, що приймає Executor:
🟢 Junior Level
Усі *Async методи мають перевантажену версію, що приймає Executor:
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
return fetchData();
}, executor); // свій Executor
cf.thenApplyAsync(s -> transform(s), executor); // теж свій Executor
Навіщо:
- Контроль над кількістю потоків
- Ізоляція від інших задач
- Можливість моніторингу
🟡 Middle Level
Стратегії вибору пула
I/O-Bound (HTTP, БД, Файли):
Executor ioExecutor = new ThreadPoolExecutor(
10, 100, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // Обмеження черги обов'язкове!
new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy() // Backpressure
);
CPU-Bound (Розрахунки, Маппінг):
Executor cpuExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
Virtual Threads (Java 21+):
Executor vtExecutor = Executors.newVirtualThreadPerTaskExecutor();
CompletableFuture.supplyAsync(task, vtExecutor);
Пастка передачі Executor
Executor потрібно передавати на кожному асинхронному етапі:
// ❌ Погано: тільки перший етап у своєму пулі, решта — в commonPool
CF.supplyAsync(task, myExecutor)
.thenApplyAsync(transform); // commonPool!
// ✅ Добре: ізоляція зберігається
CF.supplyAsync(task, myExecutor)
.thenApplyAsync(transform, myExecutor);
Lifecycle Management
@PreDestroy
public void shutdown() {
((ExecutorService) executor).shutdown();
}
Без shutdown() — “зомбі-потоки”, що заважають коректному завершенню процесу.
🔴 Senior Level
Internal Implementation
CallerRunsPolicy як Backpressure:
Коли черга переповнена → задача виконується у потоці, що викликає.
Це природно сповільнює продюсера запитів.
Thread Naming:
Завжди використовуйте ThreadFactory (Guava, Spring), щоб давати потокам імена. Без цього аналіз дампів пам’яті у разі інциденту буде неможливий.
Просунуті техніки: ManagedBlocker
Якщо ви використовуєте ForkJoinPool (в т.ч. commonPool) для задач, які можуть блокуватися, ви можете використати інтерфейс ManagedBlocker.
- Under the hood: Це дозволяє пулу динамічно створювати новий потік, щоб компенсувати заблокований, зберігаючи рівень паралелізму. Це складніше в реалізації, ніж просто
FixedThreadPool, але набагато ефективніше.
// Приклад реалізації ManagedBlocker для блокуючих операцій:
class BlockingAdapter<T> implements ForkJoinPool.ManagedBlocker {
private T result;
private final Supplier<T> task;
private boolean isDone = false;
public boolean block() {
result = task.get(); // блокуюча операція
isDone = true;
return true;
}
public boolean isReleasable() { return isDone; }
public T getResult() { return result; }
}
Архітектурні Trade-offs
| Підхід | Плюси | Мінуси |
|---|---|---|
| FixedThreadPool | Передбачуваний розмір | Немає work-stealing |
| CachedThreadPool | Авто-масштабування | Необмежені потоки |
| Virtual Threads | Ідеальний для I/O | Тільки Java 21+ |
| ManagedBlocker | Компенсація блокувань | Складна реалізація |
Пограничні випадки
- Spring @Async: Якщо ви використовуєте
CompletableFutureразом з@Async, Spring може підставити свійTaskExecutor. Переконайтеся, що ви контролюєте його конфігурацію.
When NOT to use custom Executor
- CPU-bound задачі — ForkJoinPool.commonPool() оптимальний (work-stealing)
- Мало задач — overhead на створення пулу > вигоди
Best Practices
// ✅ Завжди свій Executor для production
CompletableFuture.supplyAsync(task, ioExecutor);
// ✅ CallerRunsPolicy для backpressure
new ThreadPoolExecutor.CallerRunsPolicy();
// ✅ Thread naming для діагностики
new ThreadFactoryBuilder().setNameFormat("pool-%d").build();
// ✅ Virtual Threads для I/O (Java 21+)
Executors.newVirtualThreadPerTaskExecutor();
// ❌ Без shutdown() при виході
// ❌ commonPool для I/O
// ❌ Необмежені черги
Резюме для Senior
- Передача
Executor— це стандарт для Production. - Розділяйте I/O і CPU навантаження в різні пули.
- Використовуйте
CallerRunsPolicyдля захисту від перевантаження. - На Java 21 переходьте на
VirtualThreadPerTaskExecutorдля I/O задач.
🎯 Шпаргалка для співбесіди
Обов’язково знати:
- Усі *Async методи мають перевантажену версію з Executor
- I/O-Bound: ThreadPoolExecutor(core=10, max=100, CallerRunsPolicy)
- CPU-Bound: FixedThreadPool(availableProcessors)
- Virtual Threads (Java 21+): Executors.newVirtualThreadPerTaskExecutor() — ідеальний для I/O
- Executor потрібно передавати на КОЖНОМУ async етапі ланцюжка
Часті уточнюючі питання:
- Чому CallerRunsPolicy? — Backpressure: при переповненні черги задача виконується потоком, що викликає, сповільнюючи продюсера
- Навіщо ThreadFactory з іменами? — Діагностика: без імен потоків аналіз jstack неможливий
- Що якщо не передати Executor на thenApplyAsync? — Використовується commonPool — втрата ізоляції
- ManagedBlocker навіщо? — Компенсація блокувань у ForkJoinPool: пул створює дод. потік замість заблокованого
Червоні прапорці (НЕ говорити):
- «CachedThreadPool для production» — необмежені потоки → OOM
- «Executor потрібен тільки на першому етапі» — кожен *Async без Executor йде в commonPool
- «Без shutdown() нормально» — zombie-потоки заважають завершенню процесу
Пов’язані теми:
- [[12. Який пул потоків використовується за замовчуванням для async методів]]
- [[15. Чому важливо уникати блокуючих операцій в CompletableFuture]]
- [[11. У чому різниця між thenApply() і thenApplyAsync()]]
- [[16. Що робить метод supplyAsync() і коли його використовувати]]