Вопрос 12 · Раздел 9

Что такое пул потоков (Thread Pool)?

Представьте такси:

Версии по языкам: English Russian Ukrainian

Junior уровень

Базовое понимание

Thread Pool (пул потоков) — это набор заранее созданных потоков, которые переиспользуются для выполнения задач.

Зачем: создание потока стоит ~1MB памяти (стек) и 1-10ms времени (вызов ОС + регистрация в JVM). Если у вас 10 000 коротких задач — создание 10 000 потоков убьёт сервер. Thread Pool решает это, переиспользуя ограниченное число потоков. Вместо создания нового потока для каждой задачи, мы берём готовый поток из пула.

Аналогия

Представьте такси:

  • Без пула: Для каждого пассажира покупаем новую машину, после поездки выбрасываем
  • С пулом: 10 такси ездят и обслуживают всех пассажиров по очереди

Зачем нужен пул потоков?

// ПЛОХО: создание потока для каждой задачи
for (int i = 0; i < 1000; i++) {
    new Thread(() -> process(i)).start();
    // Создание потока = ~1MB памяти + время на создание
    // 1000 потоков = ~1GB памяти!
}

// ХОРОШО: переиспользование потоков
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
    pool.submit(() -> process(i));
    // 10 потоков обрабатывают 1000 задач
}

Преимущества

Преимущество Описание
Производительность Не тратим время на создание потока (~1-10ms = вызов ОС для создания thread + выделение стека + регистрация в JVM)
Память Каждый поток = ~1MB стека, пул экономит память
Управление Ограничиваем макс. число потоков
Мониторинг Легче отслеживать активность

Простой пример

// Создание пула из 5 потоков
ExecutorService executor = Executors.newFixedThreadPool(5);

// Отправка задач
for (int i = 0; i < 20; i++) {
    executor.submit(() -> {
        System.out.println("Задача выполнена потоком: " +
            Thread.currentThread().getName());
    });
}

// Завершение пула
executor.shutdown(); // Перестаем принимать задачи
executor.awaitTermination(60, TimeUnit.SECONDS); // Ждем завершения

Создание пула вручную

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5,              // corePoolSize — всегда живые потоки
    10,             // maximumPoolSize — макс. потоков
    60,             // keepAliveTime — время жизни лишних
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100) // очередь задач
);

Middle уровень

Анатомия ThreadPoolExecutor

public ThreadPoolExecutor(
    int corePoolSize,              // 1. Минимальное число потоков
    int maximumPoolSize,           // 2. Максимальное число потоков
    long keepAliveTime,            // 3. Время жизни "лишних" потоков
    TimeUnit unit,                 // 4. Единица измерения времени
    BlockingQueue<Runnable> workQueue, // 5. Очередь задач
    ThreadFactory threadFactory,   // 6. Фабрика для создания потоков
    RejectedExecutionHandler handler // 7. Обработчик при переполнении
)

Алгоритм добавления задачи (КРИТИЧНО!)

Новая задача
    │
    ▼
Потоков < corePoolSize?
    │ да                    │ нет
    ▼                       ▼
Создать новый        Очередь полна?
поток                    │ да            │ нет
    │                    ▼               ▼
    ▼            Потоков < max?    Добавить в очередь
Выполняем            │ да     │ нет
                     ▼        ▼
              Новый поток   RejectedExecutionHandler

Важно: Поток создаётся сверх corePoolSize ТОЛЬКО если очередь ПОЛНА.

Типы очередей

Очередь Особенность Когда использовать
LinkedBlockingQueue Безразмерная (по умолчанию) Когда знаете что задач мало
ArrayBlockingQueue Фиксированный размер Production — защита от OOM
SynchronousQueue Ёмкость = 0 Каждый task → новый поток
PriorityBlockingQueue Приоритетная очередь Важные задачи вперёд

RejectedExecutionHandler стратегии

// 1. AbortPolicy (по умолчанию) — бросает исключение
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// → RejectedExecutionException

// 2. CallerRunsPolicy — выполняет в потоке-отправителе
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// Создаёт **back-pressure** (обратное давление) — поскольку задача выполняется в потоке-отправителе, он не может отправлять новые, пока не освободится пул. Это автоматически замедляет источник задач.

// 3. DiscardPolicy — молча игнорирует
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

// 4. DiscardOldestPolicy — удаляет oldest задачу из очереди
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

ThreadFactory — именование потоков

ThreadFactory factory = new ThreadFactory() {
    private int count = 1;
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName("my-pool-worker-" + count++);
        t.setDaemon(true); // Демоны — не мешают завершению JVM
        return t;
    }
};

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100),
    factory // Кастомная фабрика
);

Senior уровень

Under the Hood: Внутреннее состояние

ThreadPoolExecutor хранит состояние в одном AtomicInteger ctl:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Верхние 3 бита — состояние пула
// Нижние 29 бит — количество потоков

Жизненный цикл пула

Состояние Принимает задачи? Выполняет задачи? Описание
RUNNING Да Да Нормальная работа
SHUTDOWN Нет Да shutdown() — дорабатывает очередь
STOP Нет Нет shutdownNow() — прерывает активные
TIDYING Нет Нет Все задачи завершены, потоки остановлены
TERMINATED Нет Нет terminated() выполнен

Выбор размера пула

CPU-bound задачи (вычисления)

Threads = Number of Cores + 1 (эмпирическое правило для CPU-bound задач; «+1» компенсирует редкие page faults. На CPU с hyperthreading учитывайте логические ядра. Всегда измеряйте на своём железе).
int cores = Runtime.getRuntime().availableProcessors();
ExecutorService cpuPool = new ThreadPoolExecutor(
    cores + 1, cores + 1,
    0L, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<>(1000)
);

I/O-bound задачи (БД, API, файлы)

Threads = Number of Cores * (1 + Wait time / Service time)
int cores = Runtime.getRuntime().availableProcessors();
// Если 90% времени ждём (БД, API):
int ioThreads = cores * (1 + 9) = cores * 10;
// 90% времени ждём I/O, 10% считаем → W/S = 90/10 = 9
// W/S оценивается профайлером: если запрос к БД занимает 100ms, из них 90ms — ожидание → W/S = 9
// Для 8-ядерного CPU: ~80 потоков

Проблема: ThreadLocal Leaks

// ПЛОХО: ThreadLocal не очищается
ThreadLocal<UserContext> context = new ThreadLocal<>();

executor.submit(() -> {
    context.set(new UserContext("user123"));
    // Задача завершена, но поток вернулся в пул!
    // Следующая задача увидит user123!
});

// ХОРОШО: всегда очищайте
executor.submit(() -> {
    try {
        context.set(new UserContext("user123"));
        process();
    } finally {
        context.remove(); // ОБЯЗАТЕЛЬНО!
    }
});

Graceful Shutdown

public void shutdown(ExecutorService executor) {
    executor.shutdown(); // 1. Перестаем принимать
    try {
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // 2. Ждем
            executor.shutdownNow(); // 3. Принудительно
            if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                System.err.println("Pool did not terminate");
            }
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

Диагностика

Мониторинг через метрики

// Выводим в Prometheus/Grafana:
executor.getPoolSize();        // Текущий размер пула
executor.getActiveCount();     // Активных потоков
executor.getQueue().size();    // Размер очереди
executor.getCompletedTaskCount(); // Завершённых задач
executor.getTaskCount();       // Всего задач

jstack для анализа

jstack <pid> | grep "my-pool-worker"

Java Flight Recorder

java -XX:StartFlightRecording=filename=rec.jfr MyApp

События:

  • jdk.ThreadPoolSubmit — отправка задач
  • jdk.ThreadPoolTerminate — завершение пула

Best Practices

  1. Всегда ограничивайте очередьArrayBlockingQueue, не LinkedBlockingQueue
  2. Используйте CallerRunsPolicy — для back-pressure при перегрузке
  3. Именуйте потоки — кастомный ThreadFactory для отладки
  4. Очищайте ThreadLocal — в finally блоке
  5. Не создавайте пул внутри метода — это утечка потоков
  6. Мониторьте queue.size() — растущая очередь = деградация
  7. Правильно shutdown — shutdown → awaitTermination → shutdownNow
  8. Размер пула — CPU-bound: N+1, I/O-bound: N * (1 + W/S)

🎯 Шпаргалка для интервью

Обязательно знать:

  • Thread Pool — набор переиспользуемых потоков; создание нового потока = ~1MB стека + 1-10ms времени
  • Алгоритм добавления задачи: сначала corePoolSize → потом очередь → потом до maximumPoolSize → потом rejection
  • Поток создаётся сверх corePoolSize ТОЛЬКО если очередь ПОЛНА (это частый вопрос на собеседованиях)
  • 4 RejectedExecutionHandler: AbortPolicy (default), CallerRunsPolicy (back-pressure), DiscardPolicy, DiscardOldestPolicy
  • CPU-bound пул: N+1 потоков; I/O-bound: N * (1 + Wait/Service time) — формула Литтла
  • ThreadPoolExecutor хранит состояние в одном AtomicInteger ctl (3 бита — статус, 29 — кол-во потоков)
  • Жизненный цикл: RUNNING → SHUTDOWN → STOP → TIDYING → TERMINATED
  • ThreadLocal обязательно очищать в finally — иначе утечка между задачами в пуле

Частые уточняющие вопросы:

  • Почему очередь полна → создаётся поток, а не наоборот? — Чтобы сначала использовать дешёвую очередь, и только при перегрузке масштабировать потоки
  • Чем LinkedBlockingQueue отличается от ArrayBlockingQueue? — Первая безразмерная (риск OOM), вторая с лимитом (production-safe)
  • Что делает CallerRunsPolicy? — Выполняет задачу в потоке-отправителе, создавая back-pressure (отправитель тормозит)
  • Почему ThreadLocal утекает в пуле? — Поток возвращается в пул с непроочищенным ThreadLocal, следующая задача увидит чужие данные

Красные флаги (НЕ говорить):

  • “Создаю пул внутри метода для каждой операции” — утечка потоков, пул должен быть синглтоном
  • “Использую LinkedBlockingQueue в production” — безлимитная очередь = риск OOM
  • “shutdownNow() сразу останавливает все потоки” — он лишь посылает interrupt(), задачи должны сами обработать

Связанные темы:

  • [[13. Какие типы Thread Pool существуют в Java]]
  • [[15. Что делает ExecutorService]]
  • [[16. В чём разница между Executors.newFixedThreadPool() и newCachedThreadPool()]]
  • [[17. Что такое ForkJoinPool]]