Что такое пул потоков (Thread Pool)?
Представьте такси:
Junior уровень
Базовое понимание
Thread Pool (пул потоков) — это набор заранее созданных потоков, которые переиспользуются для выполнения задач.
Зачем: создание потока стоит ~1MB памяти (стек) и 1-10ms времени (вызов ОС + регистрация в JVM). Если у вас 10 000 коротких задач — создание 10 000 потоков убьёт сервер. Thread Pool решает это, переиспользуя ограниченное число потоков. Вместо создания нового потока для каждой задачи, мы берём готовый поток из пула.
Аналогия
Представьте такси:
- Без пула: Для каждого пассажира покупаем новую машину, после поездки выбрасываем
- С пулом: 10 такси ездят и обслуживают всех пассажиров по очереди
Зачем нужен пул потоков?
// ПЛОХО: создание потока для каждой задачи
for (int i = 0; i < 1000; i++) {
new Thread(() -> process(i)).start();
// Создание потока = ~1MB памяти + время на создание
// 1000 потоков = ~1GB памяти!
}
// ХОРОШО: переиспользование потоков
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
pool.submit(() -> process(i));
// 10 потоков обрабатывают 1000 задач
}
Преимущества
| Преимущество | Описание |
|---|---|
| Производительность | Не тратим время на создание потока (~1-10ms = вызов ОС для создания thread + выделение стека + регистрация в JVM) |
| Память | Каждый поток = ~1MB стека, пул экономит память |
| Управление | Ограничиваем макс. число потоков |
| Мониторинг | Легче отслеживать активность |
Простой пример
// Создание пула из 5 потоков
ExecutorService executor = Executors.newFixedThreadPool(5);
// Отправка задач
for (int i = 0; i < 20; i++) {
executor.submit(() -> {
System.out.println("Задача выполнена потоком: " +
Thread.currentThread().getName());
});
}
// Завершение пула
executor.shutdown(); // Перестаем принимать задачи
executor.awaitTermination(60, TimeUnit.SECONDS); // Ждем завершения
Создание пула вручную
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize — всегда живые потоки
10, // maximumPoolSize — макс. потоков
60, // keepAliveTime — время жизни лишних
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100) // очередь задач
);
Middle уровень
Анатомия ThreadPoolExecutor
public ThreadPoolExecutor(
int corePoolSize, // 1. Минимальное число потоков
int maximumPoolSize, // 2. Максимальное число потоков
long keepAliveTime, // 3. Время жизни "лишних" потоков
TimeUnit unit, // 4. Единица измерения времени
BlockingQueue<Runnable> workQueue, // 5. Очередь задач
ThreadFactory threadFactory, // 6. Фабрика для создания потоков
RejectedExecutionHandler handler // 7. Обработчик при переполнении
)
Алгоритм добавления задачи (КРИТИЧНО!)
Новая задача
│
▼
Потоков < corePoolSize?
│ да │ нет
▼ ▼
Создать новый Очередь полна?
поток │ да │ нет
│ ▼ ▼
▼ Потоков < max? Добавить в очередь
Выполняем │ да │ нет
▼ ▼
Новый поток RejectedExecutionHandler
Важно: Поток создаётся сверх corePoolSize ТОЛЬКО если очередь ПОЛНА.
Типы очередей
| Очередь | Особенность | Когда использовать |
|---|---|---|
LinkedBlockingQueue |
Безразмерная (по умолчанию) | Когда знаете что задач мало |
ArrayBlockingQueue |
Фиксированный размер | Production — защита от OOM |
SynchronousQueue |
Ёмкость = 0 | Каждый task → новый поток |
PriorityBlockingQueue |
Приоритетная очередь | Важные задачи вперёд |
RejectedExecutionHandler стратегии
// 1. AbortPolicy (по умолчанию) — бросает исключение
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// → RejectedExecutionException
// 2. CallerRunsPolicy — выполняет в потоке-отправителе
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// Создаёт **back-pressure** (обратное давление) — поскольку задача выполняется в потоке-отправителе, он не может отправлять новые, пока не освободится пул. Это автоматически замедляет источник задач.
// 3. DiscardPolicy — молча игнорирует
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 4. DiscardOldestPolicy — удаляет oldest задачу из очереди
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
ThreadFactory — именование потоков
ThreadFactory factory = new ThreadFactory() {
private int count = 1;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("my-pool-worker-" + count++);
t.setDaemon(true); // Демоны — не мешают завершению JVM
return t;
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
factory // Кастомная фабрика
);
Senior уровень
Under the Hood: Внутреннее состояние
ThreadPoolExecutor хранит состояние в одном AtomicInteger ctl:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Верхние 3 бита — состояние пула
// Нижние 29 бит — количество потоков
Жизненный цикл пула
| Состояние | Принимает задачи? | Выполняет задачи? | Описание |
|---|---|---|---|
| RUNNING | Да | Да | Нормальная работа |
| SHUTDOWN | Нет | Да | shutdown() — дорабатывает очередь |
| STOP | Нет | Нет | shutdownNow() — прерывает активные |
| TIDYING | Нет | Нет | Все задачи завершены, потоки остановлены |
| TERMINATED | Нет | Нет | terminated() выполнен |
Выбор размера пула
CPU-bound задачи (вычисления)
Threads = Number of Cores + 1 (эмпирическое правило для CPU-bound задач; «+1» компенсирует редкие page faults. На CPU с hyperthreading учитывайте логические ядра. Всегда измеряйте на своём железе).
int cores = Runtime.getRuntime().availableProcessors();
ExecutorService cpuPool = new ThreadPoolExecutor(
cores + 1, cores + 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000)
);
I/O-bound задачи (БД, API, файлы)
Threads = Number of Cores * (1 + Wait time / Service time)
int cores = Runtime.getRuntime().availableProcessors();
// Если 90% времени ждём (БД, API):
int ioThreads = cores * (1 + 9) = cores * 10;
// 90% времени ждём I/O, 10% считаем → W/S = 90/10 = 9
// W/S оценивается профайлером: если запрос к БД занимает 100ms, из них 90ms — ожидание → W/S = 9
// Для 8-ядерного CPU: ~80 потоков
Проблема: ThreadLocal Leaks
// ПЛОХО: ThreadLocal не очищается
ThreadLocal<UserContext> context = new ThreadLocal<>();
executor.submit(() -> {
context.set(new UserContext("user123"));
// Задача завершена, но поток вернулся в пул!
// Следующая задача увидит user123!
});
// ХОРОШО: всегда очищайте
executor.submit(() -> {
try {
context.set(new UserContext("user123"));
process();
} finally {
context.remove(); // ОБЯЗАТЕЛЬНО!
}
});
Graceful Shutdown
public void shutdown(ExecutorService executor) {
executor.shutdown(); // 1. Перестаем принимать
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // 2. Ждем
executor.shutdownNow(); // 3. Принудительно
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
Диагностика
Мониторинг через метрики
// Выводим в Prometheus/Grafana:
executor.getPoolSize(); // Текущий размер пула
executor.getActiveCount(); // Активных потоков
executor.getQueue().size(); // Размер очереди
executor.getCompletedTaskCount(); // Завершённых задач
executor.getTaskCount(); // Всего задач
jstack для анализа
jstack <pid> | grep "my-pool-worker"
Java Flight Recorder
java -XX:StartFlightRecording=filename=rec.jfr MyApp
События:
jdk.ThreadPoolSubmit— отправка задачjdk.ThreadPoolTerminate— завершение пула
Best Practices
- Всегда ограничивайте очередь —
ArrayBlockingQueue, неLinkedBlockingQueue - Используйте CallerRunsPolicy — для back-pressure при перегрузке
- Именуйте потоки — кастомный ThreadFactory для отладки
- Очищайте ThreadLocal — в finally блоке
- Не создавайте пул внутри метода — это утечка потоков
- Мониторьте queue.size() — растущая очередь = деградация
- Правильно shutdown — shutdown → awaitTermination → shutdownNow
- Размер пула — CPU-bound: N+1, I/O-bound: N * (1 + W/S)
🎯 Шпаргалка для интервью
Обязательно знать:
- Thread Pool — набор переиспользуемых потоков; создание нового потока = ~1MB стека + 1-10ms времени
- Алгоритм добавления задачи: сначала corePoolSize → потом очередь → потом до maximumPoolSize → потом rejection
- Поток создаётся сверх corePoolSize ТОЛЬКО если очередь ПОЛНА (это частый вопрос на собеседованиях)
- 4 RejectedExecutionHandler: AbortPolicy (default), CallerRunsPolicy (back-pressure), DiscardPolicy, DiscardOldestPolicy
- CPU-bound пул: N+1 потоков; I/O-bound: N * (1 + Wait/Service time) — формула Литтла
- ThreadPoolExecutor хранит состояние в одном AtomicInteger ctl (3 бита — статус, 29 — кол-во потоков)
- Жизненный цикл: RUNNING → SHUTDOWN → STOP → TIDYING → TERMINATED
- ThreadLocal обязательно очищать в finally — иначе утечка между задачами в пуле
Частые уточняющие вопросы:
- Почему очередь полна → создаётся поток, а не наоборот? — Чтобы сначала использовать дешёвую очередь, и только при перегрузке масштабировать потоки
- Чем LinkedBlockingQueue отличается от ArrayBlockingQueue? — Первая безразмерная (риск OOM), вторая с лимитом (production-safe)
- Что делает CallerRunsPolicy? — Выполняет задачу в потоке-отправителе, создавая back-pressure (отправитель тормозит)
- Почему ThreadLocal утекает в пуле? — Поток возвращается в пул с непроочищенным ThreadLocal, следующая задача увидит чужие данные
Красные флаги (НЕ говорить):
- “Создаю пул внутри метода для каждой операции” — утечка потоков, пул должен быть синглтоном
- “Использую LinkedBlockingQueue в production” — безлимитная очередь = риск OOM
- “shutdownNow() сразу останавливает все потоки” — он лишь посылает interrupt(), задачи должны сами обработать
Связанные темы:
- [[13. Какие типы Thread Pool существуют в Java]]
- [[15. Что делает ExecutorService]]
- [[16. В чём разница между Executors.newFixedThreadPool() и newCachedThreadPool()]]
- [[17. Что такое ForkJoinPool]]