Вопрос 11 · Раздел 8

Как создать parallel stream?

Есть два основных способа создать параллельный стрим:

Версии по языкам: English Russian Ukrainian

🟢 Junior Level

Есть два основных способа создать параллельный стрим:

// Способ 1: Из коллекции
List<String> list = List.of("a", "b", "c");
list.parallelStream().forEach(System.out::println);

// Способ 2: Через метод parallel()
list.stream().parallel().forEach(System.out::println);

Оба способа делают одно и то же. parallelStream() — просто сокращенная запись.

Проверка:

stream.isParallel(); // вернёт true

🟡 Middle Level

Способы создания

1. Collection.parallelStream():

Stream<String> pStream = list.parallelStream();

Вызывает StreamSupport.stream(collection.spliterator(), true). Флаг true означает параллельность.

2. Stream.parallel():

Stream<Integer> stream = Stream.of(1, 2, 3).parallel();

Позволяет сделать любой стрим параллельным “на лету”.

3. StreamSupport:

Stream<T> stream = StreamSupport.stream(mySpliterator, true);

Для низкоуровневой оптимизации или своих структур данных.

Важный нюанс: последний вызов побеждает

stream.parallel().sequential().parallel(); // будет параллельным
stream.parallel().sequential();            // будет последовательным

Частично параллельных стримов не бывает — весь пайплайн либо полностью параллельный, либо полностью последовательный.

🔴 Senior Level

Управление потоками через кастомный ForkJoinPool

По умолчанию используется ForkJoinPool.commonPool(). В production часто нужно ограничить параллелизм для конкретной задачи:

ForkJoinPool myPool = new ForkJoinPool(4);
long result = myPool.submit(() ->
    list.parallelStream().mapToInt(this::doWork).sum()
).get();

Как работает: Параллельный стрим проверяет, запущен ли он внутри ForkJoinWorkerThread. Если да — использует текущий пул вместо commonPool. Это изолирует нагрузку.

Оптимизации

IntStream.range().parallel(): Один из самых эффективных способов — Spliterator для числовых диапазонов работает идеально.

Массив vs List: Arrays.stream(arr).parallel() быстрее, чем list.parallelStream(), потому что массив имеет точный размер (SIZED) и O(1) доступ по индексу. Это позволяет Spliterator’у делить его ровно пополам без overhead на Iterator.next().

Unordered оптимизация

Если источник упорядочен (LinkedHashSet), параллельный стрим тратит ресурсы на сохранение порядка. Если порядок не важен:

linkedHashSet.stream().unordered().parallel()...

// unordered() снимает с ForkJoinPool обязанность сохранять порядок при merge. // Потоки не должны синхронизироваться при объединении результатов → меньше contention.

Edge Cases

FlatMap Constraints: В параллельном стриме через flatMap внутренние стримы обрабатываются последовательно внутри каждой задачи ForkJoin.

Когда НЕ создавать кастомный ForkJoinPool

  1. Для простых задач — commonPool покрывает 95% случаев
  2. Для I/O-bound задач — используйте виртуальные потоки (Java 21+) или CompletableFuture
  3. Если не контролируете shutdown — утечка потоков при остановке приложения

Диагностика

  • Thread Names: В лямбде выведите Thread.currentThread().getName() — увидите ForkJoinPool.commonPool-worker-N
  • VisualVM: Вкладка “Threads” покажет загрузку всех воркеров commonPool

🎯 Шпаргалка для интервью

Обязательно знать:

  • Два способа: collection.parallelStream() и stream.parallel() — делают одно и то же
  • По умолчанию используется ForkJoinPool.commonPool() с размером = число ядер - 1
  • parallel() и sequential() можно chaining — последний вызов побеждает
  • Частично параллельных стримов не бывает — весь пайплайн либо параллельный, либо последовательный
  • Для изоляции нагрузки в production используют кастомный ForkJoinPool
  • Arrays.stream(arr).parallel() быстрее, чем list.parallelStream() (O(1) доступ, SIZED)
  • .unordered() снимает обязанность сохранять порядок при merge, снижая contention

Частые уточняющие вопросы:

  • Как проверить, параллельный ли стрим? — Вызвать stream.isParallel(), вернёт true/false
  • Можно ли сделать часть стрима параллельной? — Нет, флаг параллелизма применяется ко всему пайплайну
  • Почему Arrays.stream().parallel() быстрее? — Массив имеет SIZED характеристику и O(1) доступ, Spliterator делит его ровно пополам
  • Когда использовать кастомный ForkJoinPool? — В Spring Boot-приложениях для изоляции нагрузки между компонентами

Красные флаги (НЕ говорить):

  • «Параллельный стрим всегда быстрее обычного» — на малых данных overhead ForkJoin сделает его медленнее
  • «Можно контролировать число потоков через parallelStream» — без кастомного ForkJoinPool это невозможно
  • «parallelStream и CompletableFuture — одно и то же» — это разные абстракции с разными use-cases
  • «Можно сделать один стрим наполовину параллельным» — флаг бинарный, chaining переключает весь пайплайн

Связанные темы:

  • [[Какие потенциальные проблемы могут быть с параллельными стримами]]
  • [[Что такое ForkJoinPool и как он связан с parallel streams]]
  • [[Когда использовать parallel streams]]
  • [[Что такое параллельные стримы]]