Що таке пул потоків (Thread Pool)?
Уявіть таксі:
Junior рівень
Базове розуміння
Thread Pool (пул потоків) — це набір заздалегідь створених потоків, які повторно використовуються для виконання задач.
Навіщо: створення потоку коштує ~1MB пам’яті (стек) та 1-10ms часу (виклик ОС + реєстрація в JVM). Якщо у вас 10 000 коротких задач — створення 10 000 потоків вб’є сервер. Thread Pool вирішує це, повторно використовуючи обмежене число потоків. Замість створення нового потоку для кожної задачі, ми беремо готовий потік з пулу.
Аналогія
Уявіть таксі:
- Без пулу: Для кожного пасажира купуємо нову машину, після поїздки викидаємо
- З пулом: 10 таксі їздять та обслуговують всіх пасажирів по черзі
Навіщо потрібен пул потоків?
// ПОГАНО: створення потоку для кожної задачі
for (int i = 0; i < 1000; i++) {
new Thread(() -> process(i)).start();
// Створення потоку = ~1MB пам'яті + час на створення
// 1000 потоків = ~1GB пам'яті!
}
// ГАЗАРД: повторне використання потоків
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
pool.submit(() -> process(i));
// 10 потоків обробляють 1000 задач
}
Переваги
| Перевага | Опис |
|---|---|
| Продуктивність | Не витрачаємо час на створення потоку (~1-10ms = виклик ОС для створення thread + виділення стека + реєстрація в JVM) |
| Пам’ять | Кожен потік = ~1MB стека, пул економить пам’ять |
| Управління | Обмежуємо макс. число потоків |
| Моніторинг | Легше відстежувати активність |
Простий приклад
// Створення пулу з 5 потоків
ExecutorService executor = Executors.newFixedThreadPool(5);
// Відправка задач
for (int i = 0; i < 20; i++) {
executor.submit(() -> {
System.out.println("Задача виконана потоком: " +
Thread.currentThread().getName());
});
}
// Завершення пулу
executor.shutdown(); // Перестаємо приймати задачі
executor.awaitTermination(60, TimeUnit.SECONDS); // Чекаємо завершення
Створення пулу вручну
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize — завжди живі потоки
10, // maximumPoolSize — макс. потоків
60, // keepAliveTime — час життя зайвих
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100) // черга задач
);
Middle рівень
Анатомія ThreadPoolExecutor
public ThreadPoolExecutor(
int corePoolSize, // 1. Мінімальне число потоків
int maximumPoolSize, // 2. Максимальне число потоків
long keepAliveTime, // 3. Час життя "зайвих" потоків
TimeUnit unit, // 4. Одиниця виміру часу
BlockingQueue<Runnable> workQueue, // 5. Черга задач
ThreadFactory threadFactory, // 6. Фабрика для створення потоків
RejectedExecutionHandler handler // 7. Обробник при переповненні
)
Алгоритм додавання задачі (КРИТИЧНО!)
Нова задача
│
▼
Потоків < corePoolSize?
│ так │ ні
▼ ▼
Створити новий Черга повна?
потік │ так │ ні
│ ▼ ▼
▼ Потоків < max? Додати в чергу
Виконуємо │ так │ ні
▼ ▼
Новий потік RejectedExecutionHandler
Важливо: Потік створюється понад corePoolSize ТІЛЬКИ якщо черга ПОВНА.
Типи черг
| Черга | Особливість | Коли використовувати |
|---|---|---|
LinkedBlockingQueue |
Безрозмірна (за замовчуванням) | Коли знаєте що задач мало |
ArrayBlockingQueue |
Фіксований розмір | Production — захист від OOM |
SynchronousQueue |
Ємність = 0 | Кожна task → новий потік |
PriorityBlockingQueue |
Пріоритетна черга | Важливі задачі вперед |
RejectedExecutionHandler стратегії
// 1. AbortPolicy (за замовчуванням) — кидає виключення
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// → RejectedExecutionException
// 2. CallerRunsPolicy — виконує в потоці-відправнику
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// Створює **back-pressure** (зворотний тиск) — оскільки задача виконується в потоці-відправнику, він не може відправляти нові, поки не звільниться пул. Це автоматично сповільнює джерело задач.
// 3. DiscardPolicy — мовчки ігнорує
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 4. DiscardOldestPolicy — видаляє найстарішу задачу з черги
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
ThreadFactory — іменування потоків
ThreadFactory factory = new ThreadFactory() {
private int count = 1;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("my-pool-worker-" + count++);
t.setDaemon(true); // Демони — не заважають завершенню JVM
return t;
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
factory // Кастомна фабрика
);
Senior рівень
Under the Hood: Внутрішній стан
ThreadPoolExecutor зберігає стан в одному AtomicInteger ctl:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Верхні 3 біти — стан пулу
// Нижні 29 біт — кількість потоків
Життєвий цикл пулу
| Стан | Приймає задачі? | Виконує задачі? | Опис |
|---|---|---|---|
| RUNNING | Так | Так | Нормальна робота |
| SHUTDOWN | Ні | Так | shutdown() — доробляє чергу |
| STOP | Ні | Ні | shutdownNow() — перериває активні |
| TIDYING | Ні | Ні | Всі задачі завершені, потоки зупинені |
| TERMINATED | Ні | Ні | terminated() виконаний |
Вибір розміру пулу
CPU-bound задачі (обчислення)
Threads = Number of Cores + 1 (емпіричне правило для CPU-bound задач; «+1» компенсує рідкісні page faults. На CPU з hyperthreading враховуйте логічні ядра. Завжди вимірюйте на своєму залізі).
int cores = Runtime.getRuntime().availableProcessors();
ExecutorService cpuPool = new ThreadPoolExecutor(
cores + 1, cores + 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000)
);
I/O-bound задачі (БД, API, файли)
Threads = Number of Cores * (1 + Wait time / Service time)
int cores = Runtime.getRuntime().availableProcessors();
// Якщо 90% часу чекаємо (БД, API):
int ioThreads = cores * (1 + 9) = cores * 10;
// 90% часу чекаємо I/O, 10% рахуємо → W/S = 90/10 = 9
// W/S оцінюється профайлером: якщо запит до БД займає 100ms, з них 90ms — очікування → W/S = 9
// Для 8-ядерного CPU: ~80 потоків
Проблема: ThreadLocal Leaks
// ПОГАНО: ThreadLocal не очищується
ThreadLocal<UserContext> context = new ThreadLocal<>();
executor.submit(() -> {
context.set(new UserContext("user123"));
// Задача завершена, але потік повернувся в пул!
// Наступна задача побачить user123!
});
// ГАЗАРД: завжди очищуйте
executor.submit(() -> {
try {
context.set(new UserContext("user123"));
process();
} finally {
context.remove(); // ОБОВ'ЯЗКОВО!
}
});
Graceful Shutdown
public void shutdown(ExecutorService executor) {
executor.shutdown(); // 1. Перестаємо приймати
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // 2. Чекаємо
executor.shutdownNow(); // 3. Примусово
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
Діагностика
Моніторинг через метрики
// Виводимо в Prometheus/Grafana:
executor.getPoolSize(); // Поточний розмір пулу
executor.getActiveCount(); // Активних потоків
executor.getQueue().size(); // Розмір черги
executor.getCompletedTaskCount(); // Завершених задач
executor.getTaskCount(); // Всього задач
jstack для аналізу
jstack <pid> | grep "my-pool-worker"
Java Flight Recorder
java -XX:StartFlightRecording=filename=rec.jfr MyApp
Події:
jdk.ThreadPoolSubmit— відправка задачjdk.ThreadPoolTerminate— завершення пулу
Best Practices
- Завжди обмежуйте чергу —
ArrayBlockingQueue, неLinkedBlockingQueue - Використовуйте CallerRunsPolicy — для back-pressure при перевантаженні
- Іменуйте потоки — кастомний ThreadFactory для відладки
- Очищуйте ThreadLocal — в finally блоці
- Не створюйте пул всередині методу — це витік потоків
- Моніторьте queue.size() — зростаюча черга = деградація
- Правильно shutdown — shutdown → awaitTermination → shutdownNow
- Розмір пулу — CPU-bound: N+1, I/O-bound: N * (1 + W/S)
🎯 Шпаргалка для співбесіди
Обов’язково знати:
- Thread Pool — набір повторно використовуваних потоків; створення нового потоку = ~1MB стека + 1-10ms часу
- Алгоритм додавання задачі: спочатку corePoolSize → потім черга → потім до maximumPoolSize → потім rejection
- Потік створюється понад corePoolSize ТІЛЬКИ якщо черга ПОВНА (це часте запитання на співбесідах)
- 4 RejectedExecutionHandler: AbortPolicy (default), CallerRunsPolicy (back-pressure), DiscardPolicy, DiscardOldestPolicy
- CPU-bound пул: N+1 потоків; I/O-bound: N * (1 + Wait/Service time) — формула Літтла
- ThreadPoolExecutor зберігає стан в одному AtomicInteger ctl (3 біти — статус, 29 — к-сть потоків)
- Життєвий цикл: RUNNING → SHUTDOWN → STOP → TIDYING → TERMINATED
- ThreadLocal обов’язково очищати в finally — інакше витік між задачами в пулі
Часті уточнюючі запитання:
- Чому черга повна → створюється потік, а не навпаки? — Щоб спочатку використати дешеву чергу, і тільки при перевантаженні масштабувати потоки
- Чим LinkedBlockingQueue відрізняється від ArrayBlockingQueue? — Перша безрозмірна (ризик OOM), друга з лімітом (production-safe)
- Що робить CallerRunsPolicy? — Виконує задачу в потоці-відправнику, створюючи back-pressure (відправник гальмує)
- Чому ThreadLocal витікає в пулі? — Потік повертається в пул з неочищеним ThreadLocal, наступна задача побачить чужі дані
Червоні прапори (НЕ говорити):
- “Створюю пул всередині методу для кожної операції” — витік потоків, пул має бути синглтоном
- “Використовую LinkedBlockingQueue в production” — безлімітна черга = ризик OOM
- “shutdownNow() одразу зупиняє всі потоки” — він лише посилає interrupt(), задачі мають самі обробити
Пов’язані теми:
- [[13. Які типи Thread Pool існують в Java]]
- [[15. Що робить ExecutorService]]
- [[16. В чому різниця між Executors.newFixedThreadPool() та newCachedThreadPool()]]
- [[17. Що таке ForkJoinPool]]