Что такое ForkJoinPool?
ForkJoinPool — это специальный пул потоков, оптимизированный для выполнения задач по принципу "Разделяй и властвуй" (Divide and Conquer).
Junior уровень
Базовое понимание
ForkJoinPool — это специальный пул потоков, оптимизированный для выполнения задач по принципу “Разделяй и властвуй” (Divide and Conquer).
Зачем: обычный пул с одной очередью неэффективен для рекурсивных задач — потоки простаивают, пока некоторые задачи ещё работают. ForkJoinPool решает это алгоритмом work-stealing: свободный поток «крадёт» задачи из хвоста очереди занятого потока.
Аналогия
Представьте, что вам нужно пересчитать 1,000,000 монет:
- Обычный пул: Один человек берёт задачу, считает, передаёт следующую
- ForkJoinPool: Делите монеты на кучки, раздаёте людям. Если кто-то закончил — помогает другим
Два ключевых метода
| Метод | Описание |
|---|---|
fork() |
Отправить подзадачу на выполнение (асинхронно) |
join() |
Дождаться результата подзадачи |
Простой пример: Сумма массива
// Задача с результатом
class SumTask extends RecursiveTask<Long> {
private final long[] numbers;
private final int start, end;
public SumTask(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// Если задач мало — считаем напрямую
if (end - start <= 1000) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
// Иначе — делим пополам
int mid = (start + end) / 2;
SumTask left = new SumTask(numbers, start, mid);
SumTask right = new SumTask(numbers, mid, end);
left.fork(); // Отправляем левую часть
long rightResult = right.compute(); // Правую считаем сами
long leftResult = left.join(); // Ждём результат левой
return leftResult + rightResult;
}
}
// Использование
long[] numbers = ...;
ForkJoinPool pool = ForkJoinPool.commonPool();
long total = pool.invoke(new SumTask(numbers, 0, numbers.length));
Где используется ForkJoinPool
| Место | Описание |
|---|---|
| Parallel Streams | list.parallelStream().map(...) |
| CompletableFuture | По умолчанию использует commonPool |
| Рекурсивные задачи | Обработка деревьев, графов |
Middle уровень
Алгоритм Work-Stealing (Кража работы)
В отличие от обычных пулов (одна общая очередь), в ForkJoinPool:
Обычный пул:
[Общая очередь: T1, T2, T3, T4, T5]
Поток 1 → T1
Поток 2 → T2
Поток 3 → (ждёт)
ForkJoinPool:
Поток 1: [T1a, T1b, T1c] ← свой Deque
Поток 2: [T2a, T2b] ← свой Deque
Поток 3: [] ← пусто! Крадёт из хвоста Потока 1
Правила работы
| Правило | Описание |
|---|---|
| Свой Deque | У каждого воркера своя двусторонняя очередь |
| LIFO (свой) | Воркер берёт задачи из ГОЛОВЫ своей очереди (Last-In-First-Out) |
| FIFO (чужой) | При краже берёт из ХВОСТА чужой очереди |
| Локальность | LIFO сохраняет locality of reference (локальность данных) — самая свежая подзадача работает с данными, которые ещё в кэш-линии CPU. Доступ из кэша = ~1ns, из RAM = ~100ns (в 100 раз медленнее). |
| Минимум конфликтов | Хозяин берёт с головы, вор крадёт с хвоста — не мешают друг другу |
RecursiveTask vs RecursiveAction
| Класс | Возвращает результат? | Пример |
|---|---|---|
RecursiveTask<V> |
Да | Сумма, поиск максимума |
RecursiveAction |
Нет | Сортировка, обработка |
// RecursiveTask — с результатом
class MaxTask extends RecursiveTask<Integer> {
@Override
protected Integer compute() {
// ...
return maxValue;
}
}
// RecursiveAction — без результата
class SortTask extends RecursiveAction {
@Override
protected void compute() {
// Сортируем на месте, результат не возвращаем
}
}
Common Pool
// Статический инстанс — доступен всегда
ForkJoinPool commonPool = ForkJoinPool.commonPool();
// Параллелизм = число ядер CPU - 1
int parallelism = commonPool.getParallelism();
Почему -1? Это оставляет один логический поток для других задач (фоновые операции). На сервере без GUI можно изменить через -Djava.util.concurrent.ForkJoinPool.common.parallelism=N.
ForkJoinPool vs ThreadPoolExecutor
| Характеристика | ThreadPoolExecutor | ForkJoinPool |
|---|---|---|
| Очередь | Одна общая (FIFO) | У каждого своя (LIFO/FIFO) |
| Тип задач | Независимые, блокирующие | Рекурсивные, вычислительные |
| Эффективность | Средняя при разном времени задач | Высокая (work-stealing) |
| Context Switching | Часто | Минимально |
| Лучше для | I/O-bound задач | CPU-bound задач |
Senior уровень
Когда НЕ использовать ForkJoinPool
- Простые независимые задачи — обычный ThreadPoolExecutor проще и эффективнее
- I/O-bound задачи — блокировка потока блокирует work-stealing, используйте виртуальные потоки (Java 21+)
- Малый объём данных — overhead fork/join превышает выгоду от параллелизма
- НЕ используйте commonPool для I/O — заблокируете все задачи, которые его используют
Under the Hood: Work-Queue структура
// Упрощённая структура
class ForkJoinPool {
// Массив очередей — по одной на каждый поток
WorkQueue[] queues;
static final class WorkQueue {
ForkJoinTask<?>[] array; // Циклический массив для задач
ForkJoinThread thread; // Воркер-поток
int base; // Хвост (для кражи — FIFO)
int top; // Голова (для владельца — LIFO)
}
}
Optimized Fork/Join паттерн
@Override
protected Long compute() {
if (size < THRESHOLD) {
return computeSequentially();
}
MyTask left = new MyTask(subList1);
MyTask right = new MyTask(subList2);
// OPTIMIZATION: fork() для одной, compute() для другой
left.fork(); // Асинхронно — в очередь
// Ключевая оптимизация: right.compute() выполняется в ТЕКУЩЕМ потоке
// (не тратим поток из пула), а left.fork() отправляется в очередь.
// Два fork() + два join() потратили бы 2 потока вместо 1.
long rightResult = right.compute(); // В текущем потоке — экономим поток!
long leftResult = left.join(); // Ждём результат
return leftResult + rightResult;
}
Всегда делайте fork() для одной части и compute() для другой. Если сделать fork() для обеих и потом join(), вы потратите лишний поток впустую.
ManagedBlocker — для блокирующих операций
// ForkJoinPool может создать доп. поток, если задача блокируется
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
private boolean done = false;
@Override
public boolean block() throws InterruptedException {
// Блокирующая операция
result = blockingCall();
done = true;
return true;
}
@Override
public boolean isReleasable() {
return done;
}
});
Производительность: THRESHOLD
// Слишком маленький — много задач, нагрузка на GC
// Слишком большое — мало параллелизма
// Эвристика: THRESHOLD должен быть таким, чтобы
// последовательная обработка занимала ~100-500 микросекунд
private static final int THRESHOLD = 1000;
Проблема: Starvation Common Pool
// ПЛОХО: блокирующая операция в commonPool
CompletableFuture.supplyAsync(() -> {
return httpClient.get(url); // Долгий blocking I/O!
}).thenApply(...);
// Заблокирует один из commonPool потоков → все parallel streams тормозят!
// ХОРОШО: отдельный пул для I/O
ExecutorService ioPool = Executors.newFixedThreadPool(20);
CompletableFuture.supplyAsync(() -> httpClient.get(url), ioPool);
Диагностика
Метрики ForkJoinPool
ForkJoinPool pool = ForkJoinPool.commonPool();
pool.getStealCount(); // Сколько задач "украдено" — высокая = неравномерные задачи
pool.getQueuedTaskCount(); // Задач в очередях
pool.getActiveThreadCount();// Активных потоков
pool.getParallelism(); // Целевой параллелизм
jstack
"ForkJoinPool.commonPool-worker-1" #10 daemon prio=5
"ForkJoinPool.commonPool-worker-2" #11 daemon prio=5
// Потоки ForkJoinPool имеют такие имена
Best Practices
- Используйте ForkJoinPool для CPU-bound задач (вычисления)
- Не используйте commonPool для I/O — блокировка парализует все parallel streams
- Оптимальный THRESHOLD — чтобы последовательная обработка занимала ~100-500 мкс
- fork() + compute() — всегда оптимизируйте, не fork()+fork()+join()+join()
- Мониторьте stealCount — высокая = задачи распределяются неравномерно
- ManagedBlocker — для блокирующих операций внутри ForkJoinPool
- Отдельный пул для I/O — не используйте commonPool
🎯 Шпаргалка для интервью
Обязательно знать:
- ForkJoinPool — пул для рекурсивных задач по принципу «разделяй и властвуй», использует алгоритм work-stealing
- Обычный пул = одна общая очередь (FIFO), ForkJoinPool = у каждого потока своя deque (владелец: LIFO с головы, вор: FIFO с хвоста)
- LIFO для владельца сохраняет locality of reference — самая свежая подзадача работает с данными из кэш-линии CPU
- RecursiveTask
возвращает результат, RecursiveAction — нет - commonPool размер = число ядер CPU - 1 (оставляет 1 поток для фоновых задач)
- Оптимизация: fork() для одной части, compute() для другой — экономим поток пула
- parallelStream() и CompletableFuture по умолчанию используют commonPool
- NEVER используйте commonPool для I/O — заблокируете все parallel streams в приложении
Частые уточняющие вопросы:
- Почему вор крадёт с хвоста (FIFO), а владелец берёт с головы (LIFO)? — Чтобы минимизировать конфликты: хозяин и вор работают с разных концов deque
- Чем fork()+compute() лучше чем fork()+fork()+join()+join()? — fork()+compute() выполняет одну часть в текущем потоке, экономя поток пула
- Почему commonPool плох для I/O? — Блокировка одного потока work-stealing парализует все задачи, использующие commonPool (включая parallel streams)
- Что такое ManagedBlocker? — Механизм чтобы ForkJoinPool мог создать дополнительный поток если задача блокируется
Красные флаги (НЕ говорить):
- “ForkJoinPool хорош для I/O-bound задач” — нет, блокировка парализует work-stealing
- “Использую commonPool для HTTP-запросов” — заблокируете все parallel streams в JVM
- “ForkJoinPool = обычный пул с несколькими очередями” — нет, ключевое отличие — work-stealing алгоритм
- “Делаю fork() для обеих подзадач” — антипаттерн, fork()+compute() экономит поток
Связанные темы:
- [[12. Что такое пул потоков (Thread Pool)]]
- [[13. Какие типы Thread Pool существуют в Java]]
- [[15. Что делает ExecutorService]]
- [[16. В чём разница между Executors.newFixedThreadPool() и newCachedThreadPool()]]