Вопрос 14 · Раздел 9

Что делает ExecutorService?

ExecutorService — это основной интерфейс в Java для управления пулом потоков и выполнения асинхронных задач.

Версии по языкам: English Russian Ukrainian

Junior уровень

Базовое понимание

ExecutorService — это основной интерфейс в Java для управления пулом потоков и выполнения асинхронных задач.

Без ExecutorService: вы вручную создаёте потоки (new Thread().start()), контролируете их жизненный цикл и обрабатываете ошибки. С ExecutorService: вы только отправляете задачи и получаете результаты — управление потоками берёт на себя фреймворк. Он отделяет “что делать” (задачу) от “как и когда делать” (механизм выполнения).

Простой пример

// Создание ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(3);

// Отправка задачи
executor.submit(() -> {
    System.out.println("Задача выполнена!");
});

// Завершение
executor.shutdown();

Основные способы отправки задач

Метод Возвращает Исключения Когда использовать
execute(Runnable) void Вылетают в консоль “Выстрелил и забыл”
submit(Runnable) Future<?> Упакованы в ExecutionException — обёртка нужна, чтобы отличить ошибку задачи от ошибки самого ExecutorService. Оригинальное исключение доступно через e.getCause(). Нужно отследить завершение
submit(Callable<T>) Future Упакованы в ExecutionException — обёртка нужна, чтобы отличить ошибку задачи от ошибки самого ExecutorService. Оригинальное исключение доступно через e.getCause(). Нужен результат

execute() vs submit()

// execute() — "выстрелил и забыл"
executor.execute(() -> {
    System.out.println("Выполняюсь...");
    // Ошибка здесь — просто напечатается в System.err
});

// submit() — можно получить результат и ошибку
Future<?> future = executor.submit(() -> {
    System.out.println("Выполняюсь...");
    // Ошибка здесь — будет в future.get()
});

// Проверка завершения
if (future.isDone()) {
    System.out.println("Задача завершена");
}

Закрытие пула

// Способ 1: Простое завершение
executor.shutdown(); // Перестаем принимать новые задачи
executor.awaitTermination(60, TimeUnit.SECONDS); // Ждем до 60 секунд

// Способ 2: Принудительное завершение
executor.shutdownNow(); // Прерывает активные задачи

Middle уровень

Жизненный цикл ExecutorService

┌──────────┐    shutdown()    ┌───────────┐  все задачи  ┌─────────┐
│ RUNNING  │ ──────────────→ │ SHUTDOWN  │ ────────────→ │ TIDYING │
│          │                 │           │               │         │
│ прини-   │  shutdownNow()  │ не прини- │               │         │
│ мает     │ ──────────────→ │ мает      │               │         │
│ задачи   │                 │ задачи    │               │         │
│          │  ←───────────── │ дорабаты- │               │         │
│ обраба-  │   interrupted    │ вает      │               │         │
│ тывает   │                 │ очередь   │               │         │
└──────────┘                 └───────────┘               └────┬────┘
      │                                                       │
      │                                                       │ terminated()
      │                                                       ▼
      │                                                 ┌───────────┐
      └────────────────────────────────────────────────│ TERMINATED│
                                                       └───────────┘
Состояние Описание
RUNNING Принимает и обрабатывает задачи
SHUTDOWN Не принимает новые, дорабатывает очередь
STOP Не принимает, прерывает активные
TIDYING Все задачи завершены, потоки остановлены
TERMINATED terminated() выполнен

submit(): Future и обработка ошибок

// Callable — возвращает результат
Future<String> future = executor.submit(() -> {
    return "Результат";
});

// Получение результата (блокирует!)
String result = future.get();

// Получение с таймаутом
try {
    String result = future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    future.cancel(true); // Отменяем задачу
}
// cancel(true) посылает задаче interrupt().
// Задача должна сама проверить Thread.interrupted() и завершиться.
// Если задача не обрабатывает прерывания — она продолжит работать!

invokeAll() и invokeAny()

// invokeAll() — ждёт ВСЕ задачи
List<Callable<String>> tasks = List.of(
    () -> "Task 1",
    () -> "Task 2",
    () -> "Task 3"
);
List<Future<String>> futures = executor.invokeAll(tasks);
// Блокирует, пока все не завершатся

// invokeAny() — ждёт ПЕРВУЮ успешную задачу
String result = executor.invokeAny(tasks);
// Возвращает результат первой завершённой задачи, остальные отменяет

Правильное завершение пула

public void shutdownGracefully(ExecutorService executor) {
    executor.shutdown(); // 1. Перестаем принимать
    try {
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // 2. Ждем
            executor.shutdownNow(); // 3. Принудительно
            if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                System.err.println("Pool did not terminate");
            }
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt(); // Восстанавливаем флаг
    }
}

submit() vs execute() — ловушка с исключениями

// execute() — исключение вылетает сразу
executor.execute(() -> {
    throw new RuntimeException("Ошибка!"); // → UncaughtExceptionHandler
});

// submit() — исключение "спрятано" в Future
Future<?> future = executor.submit(() -> {
    throw new RuntimeException("Ошибка!");
});
// Никакой ошибки пока вы не вызовете future.get()!
try {
    future.get(); // → ExecutionException
} catch (ExecutionException e) {
    System.out.println(e.getCause()); // → RuntimeException: Ошибка!
}

Senior уровень

Детали реализации ThreadPoolExecutor (ctl, очереди, ThreadFactory) — в файле [[12. Что такое пул потоков (Thread Pool)]].

Thread Pool Leaks

// ПЛОХО: создание пула внутри метода
public void process() {
    ExecutorService executor = Executors.newFixedThreadPool(10);
    executor.submit(() -> doWork());
    executor.shutdown();
    // Если doWork() бросает исключение — shutdown() не вызывается
    // Потоки утекают!
}

// ХОРОШО: пул — синглтон или управляется контейнером
@Component
public class TaskProcessor {
    private final ExecutorService executor;

    public TaskProcessor() {
        this.executor = new ThreadPoolExecutor(...);
    }

    @PreDestroy
    public void shutdown() {
        shutdownGracefully(executor);
    }
}

Correlation ID / Trace Context

// Проблема: Trace ID не пробрасывается автоматически
executor.submit(() -> {
    // MDC.get("traceId") = null!
});

// Решение: обёртка
public class TracingExecutorService implements ExecutorService {
    private final ExecutorService delegate;

    public Future<?> submit(Runnable task) {
        Map<String, String> mdcContext = MDC.getCopyOfContextMap();
        return delegate.submit(() -> {
            if (mdcContext != null) MDC.setContextMap(mdcContext);
            try {
                task.run();
            } finally {
                MDC.clear();
            }
        });
    }
}

Диагностика

Метрики для мониторинга

// Выводим в Prometheus:
executor.getActiveCount();         // Активных потоков
executor.getQueue().size();        // Размер очереди
executor.getCompletedTaskCount();  // Завершённых задач
executor.getTaskCount();           // Всего задач
executor.getPoolSize();            // Размер пула

Future.isDone() / isCancelled()

Future<?> future = executor.submit(task);

// Без блокировки
if (future.isDone()) {
    System.out.println("Завершена");
}
if (future.isCancelled()) {
    System.out.println("Отменена");
}

Best Practices

  1. Не создавайте ExecutorService внутри метода — утечка потоков
  2. Используйте submit() — если нужна обработка ошибок
  3. Всегда вызывайте shutdown() — в finally или @PreDestroy
  4. ArrayBlockingQueue — защита от OOM
  5. CallerRunsPolicy — back-pressure при перегрузке
  6. Кастомный ThreadFactory — именование потоков
  7. Trace ID propagation — для микросервисов
  8. Мониторьте queue.size() — растущая очередь = деградация

🎯 Шпаргалка для интервью

Обязательно знать:

  • ExecutorService — интерфейс управления пулом потоков: отправка задач, получение результатов, graceful shutdown
  • execute(Runnable) = void, исключения вылетают в консоль; submit() = Future, исключения спрятаны в ExecutionException
  • submit(Callable) возвращает Future — можно получить результат и обработать ошибку через get()
  • Жизненный цикл: RUNNING → SHUTDOWN (shutdown()) → STOP (shutdownNow()) → TIDYING → TERMINATED
  • invokeAll() — ждёт ВСЕ задачи; invokeAny() — возвращает результат ПЕРВОЙ успешной, остальные отменяет
  • Graceful shutdown: shutdown() → awaitTermination(timeout) → shutdownNow() → awaitTermination
  • Future.get(timeout) бросает TimeoutException — при таймауте нужно вызвать future.cancel(true)
  • ExecutorService должен управляться контейнером (Spring @Bean) или синглтоном, НЕ создаваться внутри метода

Частые уточняющие вопросы:

  • В чём разница execute() vs submit()? — execute «выстрелил и забыл», submit позволяет отследить результат и ошибку через Future
  • Почему submit() не бросает исключение сразу? — Исключение упаковано в ExecutionException и доступно только через future.get().getCause()
  • Чем shutdown() отличается от shutdownNow()? — shutdown дорабатывает очередь, shutdownNow прерывает активные задачи через interrupt()
  • Как пробросить MDC/Trace ID в ExecutorService? — Обёртка: MDC.getCopyOfContextMap() перед submit, MDC.setContextMap() внутри задачи

Красные флаги (НЕ говорить):

  • “Создаю ExecutorService внутри метода” — утечка потоков, если shutdown() не вызовется
  • “submit() сразу покажет исключение” — нет, нужно вызвать future.get()
  • “shutdownNow() гарантированно останавливает все задачи” — нет, только посылает interrupt(), задачи должны сами обработать
  • “cancel(true) мгновенно убивает задачу” — нет, только выставляет interrupted flag, задача должна проверить

Связанные темы:

  • [[12. Что такое пул потоков (Thread Pool)]]
  • [[13. Какие типы Thread Pool существуют в Java]]
  • [[16. В чём разница между Executors.newFixedThreadPool() и newCachedThreadPool()]]
  • [[17. Что такое ForkJoinPool]]