Вопрос 13 · Раздел 19

Как указать свой Executor для CompletableFuture?

Все Async методы имеют перегруженную версию, принимающую Executor:

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

Все *Async методы имеют перегруженную версию, принимающую Executor:

ExecutorService executor = Executors.newFixedThreadPool(10);

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    return fetchData();
}, executor);  // свой Executor

cf.thenApplyAsync(s -> transform(s), executor);  // тоже свой Executor

Зачем:

  • Контроль над количеством потоков
  • Изоляция от других задач
  • Возможность мониторин

🟡 Middle Level

Стратегии выбора пула

I/O-Bound (HTTP, БД, Файлы):

Executor ioExecutor = new ThreadPoolExecutor(
    10, 100, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000),  // Ограничение очереди обязательно!
    new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()  // Backpressure
);

CPU-Bound (Расчёты, Маппинг):

Executor cpuExecutor = Executors.newFixedThreadPool(
    Runtime.getRuntime().availableProcessors()
);

Virtual Threads (Java 21+):

Executor vtExecutor = Executors.newVirtualThreadPerTaskExecutor();
CompletableFuture.supplyAsync(task, vtExecutor);

Ловушка передачи Executor

Executor нужно передавать на каждом асинхронном этапе:

// ❌ Плохо: только первый этап в своём пуле, остальные — в commonPool
CF.supplyAsync(task, myExecutor)
  .thenApplyAsync(transform);  // commonPool!

// ✅ Хорошо: изоляция сохраняется
CF.supplyAsync(task, myExecutor)
  .thenApplyAsync(transform, myExecutor);

Lifecycle Management

@PreDestroy
public void shutdown() {
    ((ExecutorService) executor).shutdown();
}

Без shutdown() — “зомби-потоки”, мешающие корректному завершению процесса.


🔴 Senior Level

Internal Implementation

CallerRunsPolicy как Backpressure:

Когда очередь переполнена → задача выполняется в вызывающем потоке.
Это естественно замедляет продюсера запросов.

Thread Naming: Всегда используйте ThreadFactory (Guava, Spring), чтобы давать потокам имена. Без этого анализ дампов памяти в случае инцидента будет невозможен.

Продвинутые техники: ManagedBlocker

Если вы используете ForkJoinPool (в т.ч. commonPool) для задач, которые могут блокироваться, вы можете использовать интерфейс ManagedBlocker.

  • Under the hood: Это позволяет пулу динамически создавать новый поток, чтобы компенсировать заблокированный, сохраняя уровень параллелизма. Это сложнее в реализации, чем просто FixedThreadPool, но гораздо эффективнее.
// Пример реализации ManagedBlocker для блокирующих операций:
class BlockingAdapter<T> implements ForkJoinPool.ManagedBlocker {
    private T result;
    private final Supplier<T> task;
    private boolean isDone = false;

    public boolean block() {
        result = task.get();  // блокирующая операция
        isDone = true;
        return true;
    }
    public boolean isReleasable() { return isDone; }
    public T getResult() { return result; }
}

Архитектурные Trade-offs

Подход Плюсы Минусы
FixedThreadPool Предсказуемый размер Нет work-stealing
CachedThreadPool Авто-масштабирование Неограниченные потоки
Virtual Threads Идеален для I/O Только Java 21+
ManagedBlocker Компенсация блокировок Сложная реализация

Пограничные случаи

  • Spring @Async: Если вы используете CompletableFuture совместно с @Async, Spring может подставить свой TaskExecutor. Убедитесь, что вы контролируете его конфигурацию.

When NOT to use custom Executor

  • CPU-bound задачи — ForkJoinPool.commonPool() оптимален (work-stealing)
  • Мало задач — overhead на создание пула > выгоды

Best Practices

// ✅ Всегда свой Executor для production
CompletableFuture.supplyAsync(task, ioExecutor);

// ✅ CallerRunsPolicy для backpressure
new ThreadPoolExecutor.CallerRunsPolicy();

// ✅ Thread naming для диагностики
new ThreadFactoryBuilder().setNameFormat("pool-%d").build();

// ✅ Virtual Threads для I/O (Java 21+)
Executors.newVirtualThreadPerTaskExecutor();

// ❌ Без shutdown() при выходе
// ❌ commonPool для I/O
// ❌ Неограниченные очереди

Резюме для Senior

  • Передача Executor — это стандарт для Production.
  • Разделяйте I/O и CPU нагрузки в разные пулы.
  • Используйте CallerRunsPolicy для защиты от перегрузки.
  • На Java 21 переходите на VirtualThreadPerTaskExecutor для I/O задач.

🎯 Шпаргалка для интервью

Обязательно знать:

  • Все *Async методы имеют перегруженную версию с Executor
  • I/O-Bound: ThreadPoolExecutor(core=10, max=100, CallerRunsPolicy)
  • CPU-Bound: FixedThreadPool(availableProcessors)
  • Virtual Threads (Java 21+): Executors.newVirtualThreadPerTaskExecutor() — идеален для I/O
  • Executor нужно передавать на КАЖДОМ async этапе цепочки

Частые уточняющие вопросы:

  • Почему CallerRunsPolicy? — Backpressure: при переполнении очереди задача выполняется вызывающим потоком, замедляя продюсера
  • Зачем ThreadFactory с именами? — Диагностика: без имён потоков анализ jstack невозможен
  • Что если не передать Executor на thenApplyAsync? — Используется commonPool — потеря изоляции
  • ManagedBlocker зачем? — Компенсация блокировок в ForkJoinPool: пул создаёт доп. поток взамен заблокированного

Красные флаги (НЕ говорить):

  • «CachedThreadPool для production» — неограниченные потоки → OOM
  • «Executor нужен только на первом этапе» — каждый *Async без Executor идёт в commonPool
  • «Без shutdown() нормально» — zombie-потоки мешают завершению процесса

Связанные темы:

  • [[12. Какой пул потоков используется по умолчанию для async методов]]
  • [[15. Почему важно избегать блокирующих операций в CompletableFuture]]
  • [[11. В чём разница между thenApply() и thenApplyAsync()]]
  • [[16. Что делает метод supplyAsync() и когда его использовать]]