Вопрос 16 · Раздел 9

Что такое ForkJoinPool?

ForkJoinPool — это специальный пул потоков, оптимизированный для выполнения задач по принципу "Разделяй и властвуй" (Divide and Conquer).

Версии по языкам: English Russian Ukrainian

Junior уровень

Базовое понимание

ForkJoinPool — это специальный пул потоков, оптимизированный для выполнения задач по принципу “Разделяй и властвуй” (Divide and Conquer).

Зачем: обычный пул с одной очередью неэффективен для рекурсивных задач — потоки простаивают, пока некоторые задачи ещё работают. ForkJoinPool решает это алгоритмом work-stealing: свободный поток «крадёт» задачи из хвоста очереди занятого потока.

Аналогия

Представьте, что вам нужно пересчитать 1,000,000 монет:

  • Обычный пул: Один человек берёт задачу, считает, передаёт следующую
  • ForkJoinPool: Делите монеты на кучки, раздаёте людям. Если кто-то закончил — помогает другим

Два ключевых метода

Метод Описание
fork() Отправить подзадачу на выполнение (асинхронно)
join() Дождаться результата подзадачи

Простой пример: Сумма массива

// Задача с результатом
class SumTask extends RecursiveTask<Long> {
    private final long[] numbers;
    private final int start, end;

    public SumTask(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        // Если задач мало — считаем напрямую
        if (end - start <= 1000) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += numbers[i];
            }
            return sum;
        }

        // Иначе — делим пополам
        int mid = (start + end) / 2;
        SumTask left = new SumTask(numbers, start, mid);
        SumTask right = new SumTask(numbers, mid, end);

        left.fork();                    // Отправляем левую часть
        long rightResult = right.compute(); // Правую считаем сами
        long leftResult = left.join();  // Ждём результат левой

        return leftResult + rightResult;
    }
}

// Использование
long[] numbers = ...;
ForkJoinPool pool = ForkJoinPool.commonPool();
long total = pool.invoke(new SumTask(numbers, 0, numbers.length));

Где используется ForkJoinPool

Место Описание
Parallel Streams list.parallelStream().map(...)
CompletableFuture По умолчанию использует commonPool
Рекурсивные задачи Обработка деревьев, графов

Middle уровень

Алгоритм Work-Stealing (Кража работы)

В отличие от обычных пулов (одна общая очередь), в ForkJoinPool:

Обычный пул:
    [Общая очередь: T1, T2, T3, T4, T5]
    Поток 1 → T1
    Поток 2 → T2
    Поток 3 → (ждёт)

ForkJoinPool:
    Поток 1: [T1a, T1b, T1c] ← свой Deque
    Поток 2: [T2a, T2b]      ← свой Deque
    Поток 3: []              ← пусто! Крадёт из хвоста Потока 1

Правила работы

Правило Описание
Свой Deque У каждого воркера своя двусторонняя очередь
LIFO (свой) Воркер берёт задачи из ГОЛОВЫ своей очереди (Last-In-First-Out)
FIFO (чужой) При краже берёт из ХВОСТА чужой очереди
Локальность LIFO сохраняет locality of reference (локальность данных) — самая свежая подзадача работает с данными, которые ещё в кэш-линии CPU. Доступ из кэша = ~1ns, из RAM = ~100ns (в 100 раз медленнее).
Минимум конфликтов Хозяин берёт с головы, вор крадёт с хвоста — не мешают друг другу

RecursiveTask vs RecursiveAction

Класс Возвращает результат? Пример
RecursiveTask<V> Да Сумма, поиск максимума
RecursiveAction Нет Сортировка, обработка
// RecursiveTask — с результатом
class MaxTask extends RecursiveTask<Integer> {
    @Override
    protected Integer compute() {
        // ...
        return maxValue;
    }
}

// RecursiveAction — без результата
class SortTask extends RecursiveAction {
    @Override
    protected void compute() {
        // Сортируем на месте, результат не возвращаем
    }
}

Common Pool

// Статический инстанс — доступен всегда
ForkJoinPool commonPool = ForkJoinPool.commonPool();

// Параллелизм = число ядер CPU - 1
int parallelism = commonPool.getParallelism();

Почему -1? Это оставляет один логический поток для других задач (фоновые операции). На сервере без GUI можно изменить через -Djava.util.concurrent.ForkJoinPool.common.parallelism=N.

ForkJoinPool vs ThreadPoolExecutor

Характеристика ThreadPoolExecutor ForkJoinPool
Очередь Одна общая (FIFO) У каждого своя (LIFO/FIFO)
Тип задач Независимые, блокирующие Рекурсивные, вычислительные
Эффективность Средняя при разном времени задач Высокая (work-stealing)
Context Switching Часто Минимально
Лучше для I/O-bound задач CPU-bound задач

Senior уровень

Когда НЕ использовать ForkJoinPool

  1. Простые независимые задачи — обычный ThreadPoolExecutor проще и эффективнее
  2. I/O-bound задачи — блокировка потока блокирует work-stealing, используйте виртуальные потоки (Java 21+)
  3. Малый объём данных — overhead fork/join превышает выгоду от параллелизма
  4. НЕ используйте commonPool для I/O — заблокируете все задачи, которые его используют

Under the Hood: Work-Queue структура

// Упрощённая структура
class ForkJoinPool {
    // Массив очередей — по одной на каждый поток
    WorkQueue[] queues;

    static final class WorkQueue {
        ForkJoinTask<?>[] array; // Циклический массив для задач
        ForkJoinThread thread;   // Воркер-поток
        int base;                // Хвост (для кражи — FIFO)
        int top;                 // Голова (для владельца — LIFO)
    }
}

Optimized Fork/Join паттерн

@Override
protected Long compute() {
    if (size < THRESHOLD) {
        return computeSequentially();
    }

    MyTask left = new MyTask(subList1);
    MyTask right = new MyTask(subList2);

    // OPTIMIZATION: fork() для одной, compute() для другой
    left.fork();                     // Асинхронно — в очередь
    // Ключевая оптимизация: right.compute() выполняется в ТЕКУЩЕМ потоке
    // (не тратим поток из пула), а left.fork() отправляется в очередь.
    // Два fork() + два join() потратили бы 2 потока вместо 1.
    long rightResult = right.compute(); // В текущем потоке — экономим поток!
    long leftResult = left.join();   // Ждём результат

    return leftResult + rightResult;
}

Всегда делайте fork() для одной части и compute() для другой. Если сделать fork() для обеих и потом join(), вы потратите лишний поток впустую.

ManagedBlocker — для блокирующих операций

// ForkJoinPool может создать доп. поток, если задача блокируется
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    private boolean done = false;

    @Override
    public boolean block() throws InterruptedException {
        // Блокирующая операция
        result = blockingCall();
        done = true;
        return true;
    }

    @Override
    public boolean isReleasable() {
        return done;
    }
});

Производительность: THRESHOLD

// Слишком маленький — много задач, нагрузка на GC
// Слишком большое — мало параллелизма

// Эвристика: THRESHOLD должен быть таким, чтобы
// последовательная обработка занимала ~100-500 микросекунд

private static final int THRESHOLD = 1000;

Проблема: Starvation Common Pool

// ПЛОХО: блокирующая операция в commonPool
CompletableFuture.supplyAsync(() -> {
    return httpClient.get(url); // Долгий blocking I/O!
}).thenApply(...);
// Заблокирует один из commonPool потоков → все parallel streams тормозят!

// ХОРОШО: отдельный пул для I/O
ExecutorService ioPool = Executors.newFixedThreadPool(20);
CompletableFuture.supplyAsync(() -> httpClient.get(url), ioPool);

Диагностика

Метрики ForkJoinPool

ForkJoinPool pool = ForkJoinPool.commonPool();

pool.getStealCount();       // Сколько задач "украдено" — высокая = неравномерные задачи
pool.getQueuedTaskCount();  // Задач в очередях
pool.getActiveThreadCount();// Активных потоков
pool.getParallelism();      // Целевой параллелизм

jstack

"ForkJoinPool.commonPool-worker-1" #10 daemon prio=5
"ForkJoinPool.commonPool-worker-2" #11 daemon prio=5
// Потоки ForkJoinPool имеют такие имена

Best Practices

  1. Используйте ForkJoinPool для CPU-bound задач (вычисления)
  2. Не используйте commonPool для I/O — блокировка парализует все parallel streams
  3. Оптимальный THRESHOLD — чтобы последовательная обработка занимала ~100-500 мкс
  4. fork() + compute() — всегда оптимизируйте, не fork()+fork()+join()+join()
  5. Мониторьте stealCount — высокая = задачи распределяются неравномерно
  6. ManagedBlocker — для блокирующих операций внутри ForkJoinPool
  7. Отдельный пул для I/O — не используйте commonPool

🎯 Шпаргалка для интервью

Обязательно знать:

  • ForkJoinPool — пул для рекурсивных задач по принципу «разделяй и властвуй», использует алгоритм work-stealing
  • Обычный пул = одна общая очередь (FIFO), ForkJoinPool = у каждого потока своя deque (владелец: LIFO с головы, вор: FIFO с хвоста)
  • LIFO для владельца сохраняет locality of reference — самая свежая подзадача работает с данными из кэш-линии CPU
  • RecursiveTask возвращает результат, RecursiveAction — нет
  • commonPool размер = число ядер CPU - 1 (оставляет 1 поток для фоновых задач)
  • Оптимизация: fork() для одной части, compute() для другой — экономим поток пула
  • parallelStream() и CompletableFuture по умолчанию используют commonPool
  • NEVER используйте commonPool для I/O — заблокируете все parallel streams в приложении

Частые уточняющие вопросы:

  • Почему вор крадёт с хвоста (FIFO), а владелец берёт с головы (LIFO)? — Чтобы минимизировать конфликты: хозяин и вор работают с разных концов deque
  • Чем fork()+compute() лучше чем fork()+fork()+join()+join()? — fork()+compute() выполняет одну часть в текущем потоке, экономя поток пула
  • Почему commonPool плох для I/O? — Блокировка одного потока work-stealing парализует все задачи, использующие commonPool (включая parallel streams)
  • Что такое ManagedBlocker? — Механизм чтобы ForkJoinPool мог создать дополнительный поток если задача блокируется

Красные флаги (НЕ говорить):

  • “ForkJoinPool хорош для I/O-bound задач” — нет, блокировка парализует work-stealing
  • “Использую commonPool для HTTP-запросов” — заблокируете все parallel streams в JVM
  • “ForkJoinPool = обычный пул с несколькими очередями” — нет, ключевое отличие — work-stealing алгоритм
  • “Делаю fork() для обеих подзадач” — антипаттерн, fork()+compute() экономит поток

Связанные темы:

  • [[12. Что такое пул потоков (Thread Pool)]]
  • [[13. Какие типы Thread Pool существуют в Java]]
  • [[15. Что делает ExecutorService]]
  • [[16. В чём разница между Executors.newFixedThreadPool() и newCachedThreadPool()]]