Питання 16 · Розділ 9

Що таке ForkJoinPool?

ForkJoinPool — це спеціальний пул потоків, оптимізований для виконання задач за принципом "Розділяй і володарюй" (Divide and Conquer).

Мовні версії: English Russian Ukrainian

Junior рівень

Базове розуміння

ForkJoinPool — це спеціальний пул потоків, оптимізований для виконання задач за принципом “Розділяй і володарюй” (Divide and Conquer).

Навіщо: звичайний пул з однією чергою неефективний для рекурсивних задач — потоки простоюють, поки деякі задачі ще працюють. ForkJoinPool вирішує це алгоритмом work-stealing: вільний потік “краде” задачі з хвоста черги зайнятого потоку.

Аналогія

Уявіть, що вам потрібно перерахувати 1,000,000 монет:

  • Звичайний пул: Одна людина бере задачу, рахує, передає наступну
  • ForkJoinPool: Ділите монети на купки, роздаєте людям. Якщо хтось закінчив — допомагає іншим

Два ключові методи

Метод Опис
fork() Надіслати підзадачу на виконання (асинхронно)
join() Дочекатися результату підзадачі

Простий приклад: Сума масиву

// Задача з результатом
class SumTask extends RecursiveTask<Long> {
    private final long[] numbers;
    private final int start, end;

    public SumTask(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        // Якщо задач мало — рахуємо напряму
        if (end - start <= 1000) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += numbers[i];
            }
            return sum;
        }

        // Інакше — ділимо навпіл
        int mid = (start + end) / 2;
        SumTask left = new SumTask(numbers, start, mid);
        SumTask right = new SumTask(numbers, mid, end);

        left.fork();                    // Надсилаємо ліву частину
        long rightResult = right.compute(); // Праву рахуємо самі
        long leftResult = left.join();  // Чекаємо результат лівої

        return leftResult + rightResult;
    }
}

// Використання
long[] numbers = ...;
ForkJoinPool pool = ForkJoinPool.commonPool();
long total = pool.invoke(new SumTask(numbers, 0, numbers.length));

Де використовується ForkJoinPool

Місце Опис
Parallel Streams list.parallelStream().map(...)
CompletableFuture За замовчуванням використовує commonPool
Рекурсивні задачі Обробка дерев, графів

Middle рівень

Алгоритм Work-Stealing (Крадіжка роботи)

На відміну від звичайних пулів (одна спільна черга), у ForkJoinPool:

Звичайний пул:
    [Спільна черга: T1, T2, T3, T4, T5]
    Потік 1 → T1
    Потік 2 → T2
    Потік 3 → (чекає)

ForkJoinPool:
    Потік 1: [T1a, T1b, T1c] ← свій Deque
    Потік 2: [T2a, T2b]      ← свій Deque
    Потік 3: []              ← порожньо! Краде з хвоста Потоку 1

Правила роботи

Правило Опис
Свій Deque У кожного воркера своя двостороння черга
LIFO (свій) Воркер бере задачі з ГОЛОВИ своєї черги (Last-In-First-Out)
FIFO (чужий) При крадіжці бере з ХВОСТА чужої черги
Локальність LIFO зберігає locality of reference (локальність даних) — найсвіжіша підзадача працює з даними, які ще в кеш-лінії CPU. Доступ з кешу = ~1ns, з RAM = ~100ns (у 100 разів повільніше).
Мінімум конфліктів Господар бере з голови, вор краде з хвоста — не заважають один одному

RecursiveTask vs RecursiveAction

Клас Повертає результат? Приклад
RecursiveTask<V> Так Сума, пошук максимуму
RecursiveAction Ні Сортування, обробка
// RecursiveTask — з результатом
class MaxTask extends RecursiveTask<Integer> {
    @Override
    protected Integer compute() {
        // ...
        return maxValue;
    }
}

// RecursiveAction — без результату
class SortTask extends RecursiveAction {
    @Override
    protected void compute() {
        // Сортуємо на місці, результат не повертаємо
    }
}

Common Pool

// Статичний інстанс — доступний завжди
ForkJoinPool commonPool = ForkJoinPool.commonPool();

// Паралелізм = число ядер CPU - 1
int parallelism = commonPool.getParallelism();

Чому -1? Це залишає один логічний потік для інших задач (фонові операції). На сервері без GUI можна змінити через -Djava.util.concurrent.ForkJoinPool.common.parallelism=N.

ForkJoinPool vs ThreadPoolExecutor

Характеристика ThreadPoolExecutor ForkJoinPool
Черга Одна спільна (FIFO) У кожного своя (LIFO/FIFO)
Тип задач Незалежні, блокуючі Рекурсивні, обчислювальні
Ефективність Середня при різному часі задач Висока (work-stealing)
Context Switching Часто Мінімально
Краще для I/O-bound задач CPU-bound задач

Senior рівень

Коли НЕ використовувати ForkJoinPool

  1. Прості незалежні задачі — звичайний ThreadPoolExecutor простіший та ефективніший
  2. I/O-bound задачі — блокування потоку блокує work-stealing, використовуйте віртуальні потоки (Java 21+)
  3. Малий об’єм даних — overhead fork/join перевищує вигоду від паралелізму
  4. НЕ використовуйте commonPool для I/O — заблокуєте всі задачі, що його використовують

Under the Hood: Work-Queue структура

// Спрощена структура
class ForkJoinPool {
    // Масив черг — по одній на кожен потік
    WorkQueue[] queues;

    static final class WorkQueue {
        ForkJoinTask<?>[] array; // Циклічний масив для задач
        ForkJoinThread thread;   // Воркер-потік
        int base;                // Хвіст (для крадіжки — FIFO)
        int top;                 // Голова (для власника — LIFO)
    }
}

Optimized Fork/Join патерн

@Override
protected Long compute() {
    if (size < THRESHOLD) {
        return computeSequentially();
    }

    MyTask left = new MyTask(subList1);
    MyTask right = new MyTask(subList2);

    // OPTIMIZATION: fork() для однієї, compute() для іншої
    left.fork();                     // Асинхронно — в чергу
    // Ключова оптимізація: right.compute() виконується в ПОТОЧНОМУ потоці
    // (не витрачаємо потік з пулу), а left.fork() надсилається в чергу.
    // Два fork() + два join() витратили б 2 потоки замість 1.
    long rightResult = right.compute(); // У поточному потоці — економимо потік!
    long leftResult = left.join();   // Чекаємо результат

    return leftResult + rightResult;
}

Завжди робіть fork() для однієї частини та compute() для іншої. Якщо зробити fork() для обох і потім join(), ви витратите зайвий потік марно.

ManagedBlocker — для блокуючих операцій

// ForkJoinPool може створити додат. потік, якщо задача блокується
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    private boolean done = false;

    @Override
    public boolean block() throws InterruptedException {
        // Блокуюча операція
        result = blockingCall();
        done = true;
        return true;
    }

    @Override
    public boolean isReleasable() {
        return done;
    }
});

Продуктивність: THRESHOLD

// Занадто маленьке — багато задач, навантаження на GC
// Занадто велике — мало паралелізму

// Евристика: THRESHOLD має бути таким, щоб
// послідовна обробка займала ~100-500 мікросекунд

private static final int THRESHOLD = 1000;

Проблема: Starvation Common Pool

// ПОГАНО: блокуюча операція в commonPool
CompletableFuture.supplyAsync(() -> {
    return httpClient.get(url); // Довгий blocking I/O!
}).thenApply(...);
// Заблокує один з commonPool потоків → всі parallel streams гальмують!

// ГАРНО: окремий пул для I/O
ExecutorService ioPool = Executors.newFixedThreadPool(20);
CompletableFuture.supplyAsync(() -> httpClient.get(url), ioPool);

Діагностика

Метрики ForkJoinPool

ForkJoinPool pool = ForkJoinPool.commonPool();

pool.getStealCount();       // Скільки задач "вкрадено" — висока = нерівномірні задачі
pool.getQueuedTaskCount();  // Задач у чергах
pool.getActiveThreadCount();// Активних потоків
pool.getParallelism();      // Цільовий паралелізм

jstack

"ForkJoinPool.commonPool-worker-1" #10 daemon prio=5
"ForkJoinPool.commonPool-worker-2" #11 daemon prio=5
// Потоки ForkJoinPool мають такі імена

Best Practices

  1. Використовуйте ForkJoinPool для CPU-bound задач (обчислення)
  2. Не використовуйте commonPool для I/O — блокування паралізує всі parallel streams
  3. Оптимальний THRESHOLD — щоб послідовна обробка займала ~100-500 мкс
  4. fork() + compute() — завжди оптимізуйте, не fork()+fork()+join()+join()
  5. Моніторте stealCount — висока = задачі розподіляються нерівномірно
  6. ManagedBlocker — для блокуючих операцій всередині ForkJoinPool
  7. Окремий пул для I/O — не використовуйте commonPool

🎯 Шпаргалка для інтерв’ю

Обов’язково знати:

  • ForkJoinPool — пул для рекурсивних задач за принципом «розділяй і володарюй», використовує алгоритм work-stealing
  • Звичайний пул = одна спільна черга (FIFO), ForkJoinPool = у кожного потоку своя deque (власник: LIFO з голови, вор: FIFO з хвоста)
  • LIFO для власника зберігає locality of reference — найсвіжіша підзадача працює з даними з кеш-лінії CPU
  • RecursiveTask повертає результат, RecursiveAction — ні
  • commonPool розмір = число ядер CPU - 1 (залишає 1 потік для фонових задач)
  • Оптимізація: fork() для однієї частини, compute() для іншої — економимо потік пулу
  • parallelStream() та CompletableFuture за замовчуванням використовують commonPool
  • NEVER використовуйте commonPool для I/O — заблокуєте всі parallel streams у додатку

Часті уточнюючі запитання:

  • Чому вор краде з хвоста (FIFO), а власник бере з голови (LIFO)? — Щоб мінімізувати конфлікти: господар і вор працюють з різних кінців deque
  • Чим fork()+compute() краще ніж fork()+fork()+join()+join()? — fork()+compute() виконує одну частину в поточному потоці, економлячи потік пулу
  • Чому commonPool поганий для I/O? — Блокування одного потоку work-stealing паралізує всі задачі, що використовують commonPool (включаючи parallel streams)
  • Що таке ManagedBlocker? — Механізм щоб ForkJoinPool міг створити додатковий потік якщо задача блокується

Червоні прапорці (НЕ говорити):

  • “ForkJoinPool гарний для I/O-bound задач” — ні, блокування паралізує work-stealing
  • “Використовую commonPool для HTTP-запитів” — заблокуєте всі parallel streams у JVM
  • “ForkJoinPool = звичайний пул з декількома чергами” — ні, ключова відмінність — work-stealing алгоритм
  • “Роблю fork() для обох підзадач” — антипатерн, fork()+compute() економить потік

Пов’язані теми:

  • [[12. Що таке пул потоків (Thread Pool)]]
  • [[13. Які типи Thread Pool існують в Java]]
  • [[15. Що робить ExecutorService]]
  • [[16. В чому різниця між Executors.newFixedThreadPool() та newCachedThreadPool()]]