Питання 12 · Розділ 9

Що таке пул потоків (Thread Pool)?

Уявіть таксі:

Мовні версії: English Russian Ukrainian

Junior рівень

Базове розуміння

Thread Pool (пул потоків) — це набір заздалегідь створених потоків, які повторно використовуються для виконання задач.

Навіщо: створення потоку коштує ~1MB пам’яті (стек) та 1-10ms часу (виклик ОС + реєстрація в JVM). Якщо у вас 10 000 коротких задач — створення 10 000 потоків вб’є сервер. Thread Pool вирішує це, повторно використовуючи обмежене число потоків. Замість створення нового потоку для кожної задачі, ми беремо готовий потік з пулу.

Аналогія

Уявіть таксі:

  • Без пулу: Для кожного пасажира купуємо нову машину, після поїздки викидаємо
  • З пулом: 10 таксі їздять та обслуговують всіх пасажирів по черзі

Навіщо потрібен пул потоків?

// ПОГАНО: створення потоку для кожної задачі
for (int i = 0; i < 1000; i++) {
    new Thread(() -> process(i)).start();
    // Створення потоку = ~1MB пам'яті + час на створення
    // 1000 потоків = ~1GB пам'яті!
}

// ГАЗАРД: повторне використання потоків
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
    pool.submit(() -> process(i));
    // 10 потоків обробляють 1000 задач
}

Переваги

Перевага Опис
Продуктивність Не витрачаємо час на створення потоку (~1-10ms = виклик ОС для створення thread + виділення стека + реєстрація в JVM)
Пам’ять Кожен потік = ~1MB стека, пул економить пам’ять
Управління Обмежуємо макс. число потоків
Моніторинг Легше відстежувати активність

Простий приклад

// Створення пулу з 5 потоків
ExecutorService executor = Executors.newFixedThreadPool(5);

// Відправка задач
for (int i = 0; i < 20; i++) {
    executor.submit(() -> {
        System.out.println("Задача виконана потоком: " +
            Thread.currentThread().getName());
    });
}

// Завершення пулу
executor.shutdown(); // Перестаємо приймати задачі
executor.awaitTermination(60, TimeUnit.SECONDS); // Чекаємо завершення

Створення пулу вручну

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5,              // corePoolSize — завжди живі потоки
    10,             // maximumPoolSize — макс. потоків
    60,             // keepAliveTime — час життя зайвих
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100) // черга задач
);

Middle рівень

Анатомія ThreadPoolExecutor

public ThreadPoolExecutor(
    int corePoolSize,              // 1. Мінімальне число потоків
    int maximumPoolSize,           // 2. Максимальне число потоків
    long keepAliveTime,            // 3. Час життя "зайвих" потоків
    TimeUnit unit,                 // 4. Одиниця виміру часу
    BlockingQueue<Runnable> workQueue, // 5. Черга задач
    ThreadFactory threadFactory,   // 6. Фабрика для створення потоків
    RejectedExecutionHandler handler // 7. Обробник при переповненні
)

Алгоритм додавання задачі (КРИТИЧНО!)

Нова задача
    │
    ▼
Потоків < corePoolSize?
    │ так                    │ ні
    ▼                       ▼
Створити новий        Черга повна?
потік                    │ так            │ ні
    │                    ▼               ▼
    ▼            Потоків < max?    Додати в чергу
Виконуємо            │ так     │ ні
                     ▼        ▼
              Новий потік   RejectedExecutionHandler

Важливо: Потік створюється понад corePoolSize ТІЛЬКИ якщо черга ПОВНА.

Типи черг

Черга Особливість Коли використовувати
LinkedBlockingQueue Безрозмірна (за замовчуванням) Коли знаєте що задач мало
ArrayBlockingQueue Фіксований розмір Production — захист від OOM
SynchronousQueue Ємність = 0 Кожна task → новий потік
PriorityBlockingQueue Пріоритетна черга Важливі задачі вперед

RejectedExecutionHandler стратегії

// 1. AbortPolicy (за замовчуванням) — кидає виключення
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// → RejectedExecutionException

// 2. CallerRunsPolicy — виконує в потоці-відправнику
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// Створює **back-pressure** (зворотний тиск) — оскільки задача виконується в потоці-відправнику, він не може відправляти нові, поки не звільниться пул. Це автоматично сповільнює джерело задач.

// 3. DiscardPolicy — мовчки ігнорує
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

// 4. DiscardOldestPolicy — видаляє найстарішу задачу з черги
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

ThreadFactory — іменування потоків

ThreadFactory factory = new ThreadFactory() {
    private int count = 1;
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName("my-pool-worker-" + count++);
        t.setDaemon(true); // Демони — не заважають завершенню JVM
        return t;
    }
};

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100),
    factory // Кастомна фабрика
);

Senior рівень

Under the Hood: Внутрішній стан

ThreadPoolExecutor зберігає стан в одному AtomicInteger ctl:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Верхні 3 біти — стан пулу
// Нижні 29 біт — кількість потоків

Життєвий цикл пулу

Стан Приймає задачі? Виконує задачі? Опис
RUNNING Так Так Нормальна робота
SHUTDOWN Ні Так shutdown() — доробляє чергу
STOP Ні Ні shutdownNow() — перериває активні
TIDYING Ні Ні Всі задачі завершені, потоки зупинені
TERMINATED Ні Ні terminated() виконаний

Вибір розміру пулу

CPU-bound задачі (обчислення)

Threads = Number of Cores + 1 (емпіричне правило для CPU-bound задач; «+1» компенсує рідкісні page faults. На CPU з hyperthreading враховуйте логічні ядра. Завжди вимірюйте на своєму залізі).
int cores = Runtime.getRuntime().availableProcessors();
ExecutorService cpuPool = new ThreadPoolExecutor(
    cores + 1, cores + 1,
    0L, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<>(1000)
);

I/O-bound задачі (БД, API, файли)

Threads = Number of Cores * (1 + Wait time / Service time)
int cores = Runtime.getRuntime().availableProcessors();
// Якщо 90% часу чекаємо (БД, API):
int ioThreads = cores * (1 + 9) = cores * 10;
// 90% часу чекаємо I/O, 10% рахуємо → W/S = 90/10 = 9
// W/S оцінюється профайлером: якщо запит до БД займає 100ms, з них 90ms — очікування → W/S = 9
// Для 8-ядерного CPU: ~80 потоків

Проблема: ThreadLocal Leaks

// ПОГАНО: ThreadLocal не очищується
ThreadLocal<UserContext> context = new ThreadLocal<>();

executor.submit(() -> {
    context.set(new UserContext("user123"));
    // Задача завершена, але потік повернувся в пул!
    // Наступна задача побачить user123!
});

// ГАЗАРД: завжди очищуйте
executor.submit(() -> {
    try {
        context.set(new UserContext("user123"));
        process();
    } finally {
        context.remove(); // ОБОВ'ЯЗКОВО!
    }
});

Graceful Shutdown

public void shutdown(ExecutorService executor) {
    executor.shutdown(); // 1. Перестаємо приймати
    try {
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // 2. Чекаємо
            executor.shutdownNow(); // 3. Примусово
            if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                System.err.println("Pool did not terminate");
            }
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

Діагностика

Моніторинг через метрики

// Виводимо в Prometheus/Grafana:
executor.getPoolSize();        // Поточний розмір пулу
executor.getActiveCount();     // Активних потоків
executor.getQueue().size();    // Розмір черги
executor.getCompletedTaskCount(); // Завершених задач
executor.getTaskCount();       // Всього задач

jstack для аналізу

jstack <pid> | grep "my-pool-worker"

Java Flight Recorder

java -XX:StartFlightRecording=filename=rec.jfr MyApp

Події:

  • jdk.ThreadPoolSubmit — відправка задач
  • jdk.ThreadPoolTerminate — завершення пулу

Best Practices

  1. Завжди обмежуйте чергуArrayBlockingQueue, не LinkedBlockingQueue
  2. Використовуйте CallerRunsPolicy — для back-pressure при перевантаженні
  3. Іменуйте потоки — кастомний ThreadFactory для відладки
  4. Очищуйте ThreadLocal — в finally блоці
  5. Не створюйте пул всередині методу — це витік потоків
  6. Моніторьте queue.size() — зростаюча черга = деградація
  7. Правильно shutdown — shutdown → awaitTermination → shutdownNow
  8. Розмір пулу — CPU-bound: N+1, I/O-bound: N * (1 + W/S)

🎯 Шпаргалка для співбесіди

Обов’язково знати:

  • Thread Pool — набір повторно використовуваних потоків; створення нового потоку = ~1MB стека + 1-10ms часу
  • Алгоритм додавання задачі: спочатку corePoolSize → потім черга → потім до maximumPoolSize → потім rejection
  • Потік створюється понад corePoolSize ТІЛЬКИ якщо черга ПОВНА (це часте запитання на співбесідах)
  • 4 RejectedExecutionHandler: AbortPolicy (default), CallerRunsPolicy (back-pressure), DiscardPolicy, DiscardOldestPolicy
  • CPU-bound пул: N+1 потоків; I/O-bound: N * (1 + Wait/Service time) — формула Літтла
  • ThreadPoolExecutor зберігає стан в одному AtomicInteger ctl (3 біти — статус, 29 — к-сть потоків)
  • Життєвий цикл: RUNNING → SHUTDOWN → STOP → TIDYING → TERMINATED
  • ThreadLocal обов’язково очищати в finally — інакше витік між задачами в пулі

Часті уточнюючі запитання:

  • Чому черга повна → створюється потік, а не навпаки? — Щоб спочатку використати дешеву чергу, і тільки при перевантаженні масштабувати потоки
  • Чим LinkedBlockingQueue відрізняється від ArrayBlockingQueue? — Перша безрозмірна (ризик OOM), друга з лімітом (production-safe)
  • Що робить CallerRunsPolicy? — Виконує задачу в потоці-відправнику, створюючи back-pressure (відправник гальмує)
  • Чому ThreadLocal витікає в пулі? — Потік повертається в пул з неочищеним ThreadLocal, наступна задача побачить чужі дані

Червоні прапори (НЕ говорити):

  • “Створюю пул всередині методу для кожної операції” — витік потоків, пул має бути синглтоном
  • “Використовую LinkedBlockingQueue в production” — безлімітна черга = ризик OOM
  • “shutdownNow() одразу зупиняє всі потоки” — він лише посилає interrupt(), задачі мають самі обробити

Пов’язані теми:

  • [[13. Які типи Thread Pool існують в Java]]
  • [[15. Що робить ExecutorService]]
  • [[16. В чому різниця між Executors.newFixedThreadPool() та newCachedThreadPool()]]
  • [[17. Що таке ForkJoinPool]]