Как создать parallel stream?
Есть два основных способа создать параллельный стрим:
🟢 Junior Level
Есть два основных способа создать параллельный стрим:
// Способ 1: Из коллекции
List<String> list = List.of("a", "b", "c");
list.parallelStream().forEach(System.out::println);
// Способ 2: Через метод parallel()
list.stream().parallel().forEach(System.out::println);
Оба способа делают одно и то же. parallelStream() — просто сокращенная запись.
Проверка:
stream.isParallel(); // вернёт true
🟡 Middle Level
Способы создания
1. Collection.parallelStream():
Stream<String> pStream = list.parallelStream();
Вызывает StreamSupport.stream(collection.spliterator(), true). Флаг true означает параллельность.
2. Stream.parallel():
Stream<Integer> stream = Stream.of(1, 2, 3).parallel();
Позволяет сделать любой стрим параллельным “на лету”.
3. StreamSupport:
Stream<T> stream = StreamSupport.stream(mySpliterator, true);
Для низкоуровневой оптимизации или своих структур данных.
Важный нюанс: последний вызов побеждает
stream.parallel().sequential().parallel(); // будет параллельным
stream.parallel().sequential(); // будет последовательным
Частично параллельных стримов не бывает — весь пайплайн либо полностью параллельный, либо полностью последовательный.
🔴 Senior Level
Управление потоками через кастомный ForkJoinPool
По умолчанию используется ForkJoinPool.commonPool(). В production часто нужно ограничить параллелизм для конкретной задачи:
ForkJoinPool myPool = new ForkJoinPool(4);
long result = myPool.submit(() ->
list.parallelStream().mapToInt(this::doWork).sum()
).get();
Как работает: Параллельный стрим проверяет, запущен ли он внутри ForkJoinWorkerThread. Если да — использует текущий пул вместо commonPool. Это изолирует нагрузку.
Оптимизации
IntStream.range().parallel(): Один из самых эффективных способов — Spliterator для числовых диапазонов работает идеально.
Массив vs List: Arrays.stream(arr).parallel() быстрее, чем list.parallelStream(), потому что массив имеет точный размер (SIZED) и O(1) доступ по индексу. Это позволяет Spliterator’у делить его ровно пополам без overhead на Iterator.next().
Unordered оптимизация
Если источник упорядочен (LinkedHashSet), параллельный стрим тратит ресурсы на сохранение порядка. Если порядок не важен:
linkedHashSet.stream().unordered().parallel()...
// unordered() снимает с ForkJoinPool обязанность сохранять порядок при merge. // Потоки не должны синхронизироваться при объединении результатов → меньше contention.
Edge Cases
FlatMap Constraints: В параллельном стриме через flatMap внутренние стримы обрабатываются последовательно внутри каждой задачи ForkJoin.
Когда НЕ создавать кастомный ForkJoinPool
- Для простых задач — commonPool покрывает 95% случаев
- Для I/O-bound задач — используйте виртуальные потоки (Java 21+) или CompletableFuture
- Если не контролируете shutdown — утечка потоков при остановке приложения
Диагностика
- Thread Names: В лямбде выведите
Thread.currentThread().getName()— увидитеForkJoinPool.commonPool-worker-N - VisualVM: Вкладка “Threads” покажет загрузку всех воркеров
commonPool
🎯 Шпаргалка для интервью
Обязательно знать:
- Два способа:
collection.parallelStream()иstream.parallel()— делают одно и то же - По умолчанию используется
ForkJoinPool.commonPool()с размером = число ядер - 1 parallel()иsequential()можно chaining — последний вызов побеждает- Частично параллельных стримов не бывает — весь пайплайн либо параллельный, либо последовательный
- Для изоляции нагрузки в production используют кастомный
ForkJoinPool Arrays.stream(arr).parallel()быстрее, чемlist.parallelStream()(O(1) доступ, SIZED).unordered()снимает обязанность сохранять порядок при merge, снижая contention
Частые уточняющие вопросы:
- Как проверить, параллельный ли стрим? — Вызвать
stream.isParallel(), вернёт true/false - Можно ли сделать часть стрима параллельной? — Нет, флаг параллелизма применяется ко всему пайплайну
- Почему
Arrays.stream().parallel()быстрее? — Массив имеет SIZED характеристику и O(1) доступ, Spliterator делит его ровно пополам - Когда использовать кастомный ForkJoinPool? — В Spring Boot-приложениях для изоляции нагрузки между компонентами
Красные флаги (НЕ говорить):
- «Параллельный стрим всегда быстрее обычного» — на малых данных overhead ForkJoin сделает его медленнее
- «Можно контролировать число потоков через parallelStream» — без кастомного ForkJoinPool это невозможно
- «parallelStream и CompletableFuture — одно и то же» — это разные абстракции с разными use-cases
- «Можно сделать один стрим наполовину параллельным» — флаг бинарный, chaining переключает весь пайплайн
Связанные темы:
- [[Какие потенциальные проблемы могут быть с параллельными стримами]]
- [[Что такое ForkJoinPool и как он связан с parallel streams]]
- [[Когда использовать parallel streams]]
- [[Что такое параллельные стримы]]