Java 8 streams serial vs parallel performance

Refresh

November 2018

Views

3.1k time

10

On my machine, the program below prints:

OptionalLong[134043]
 PARALLEL took 127869 ms
OptionalLong[134043]
 SERIAL took 60594 ms

It's not clear to my why executing the program in serial is faster than executing it in parallel. I've given both programs -Xms2g -Xmx2g on an 8gb box thats relatively quiet. Can someone clarify whats going on?

import java.util.stream.LongStream;
import java.util.stream.LongStream.Builder;

public class Problem47 {

    public static void main(String[] args) {

        final long startTime = System.currentTimeMillis();
        System.out.println(LongStream.iterate(1, n -> n + 1).parallel().limit(1000000).filter(n -> fourConsecutives(n)).findFirst());
        final long endTime = System.currentTimeMillis();
        System.out.println(" PARALLEL took " +(endTime - startTime) + " ms");

        final long startTime2 = System.currentTimeMillis();
        System.out.println(LongStream.iterate(1, n -> n + 1).limit(1000000).filter(n -> fourConsecutives(n)).findFirst());
        final long endTime2 = System.currentTimeMillis();
        System.out.println(" SERIAL took " +(endTime2 - startTime2) + " ms");
    }

    static boolean fourConsecutives(final long n) {
        return distinctPrimeFactors(n).count() == 4 &&
                distinctPrimeFactors(n + 1).count() == 4 &&
                distinctPrimeFactors(n + 2).count() == 4 &&
                distinctPrimeFactors(n + 3).count() == 4;
    }

    static LongStream distinctPrimeFactors(long number) {
        final Builder builder = LongStream.builder();
        final long limit = number / 2;
        long n = number;
        for (long i = 2; i <= limit; i++) {
            while (n % i == 0) {
                builder.accept(i);
                n /= i;
            }
        }
        return builder.build().distinct();
    }

}

2 answers

13

В то время как Брайан Гетц прав насчет вашей установки, например , что вы должны использовать , .range(1, 1000000)а не .iterate(1, n -> n + 1).limit(1000000)и , что ваш тест метод является очень упрощенным, я хочу подчеркнуть важный момент:

даже после устранения этих проблем, даже используя настенные часы и TaskManager вы можете увидеть, что есть что-то не так. На моей машине эта операция занимает около половины минуты, и вы можете видеть, что параллелизм падает до одного ядра примерно через две секунды. Даже если специализированный тест инструмент может привести к различным результатам это не имеет значения, если вы не хотите, чтобы запустить конечное приложение в тесте инструмент все время ...

Теперь мы могли бы попытаться дразнить больше о вашей установке или сказать вам , что вы должны изучить специальные вещи о Fork / Join рамки , как в реализаторах сделали в списке обсуждения .

Или попробуйте альтернативную реализацию:

ExecutorService es=Executors.newFixedThreadPool(
                       Runtime.getRuntime().availableProcessors());
AtomicLong found=new AtomicLong(Long.MAX_VALUE);
LongStream.range(1, 1000000).filter(n -> found.get()==Long.MAX_VALUE)
    .forEach(n -> es.submit(()->{
        if(found.get()>n && fourConsecutives(n)) for(;;) {
            long x=found.get();
            if(x<n || found.compareAndSet(x, n)) break;
        }
    }));
es.shutdown();
try { es.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); }
catch (InterruptedException ex) {throw new AssertionError(ex); }
long result=found.get();
System.out.println(result==Long.MAX_VALUE? "not found": result);

На моей машине это то , что я ожидал бы от параллельного выполнения , принимая лишь немного больше , чем ⟨sequential time⟩/⟨number of cpu cores⟩. Не меняя ничего в вашей fourConsecutivesреализации.

Суть в том , что, по крайней мере , при обработке одного элемента занимает значительное время, текущая Streamреализация (или лежащая в основе вилки / рамки Join) имеют проблемы , как уже обсуждалась в этом связанном с этим вопросом . Если вы хотите надежный параллелизм я бы рекомендовал использовать доказанные и проверенные ExecutorServiceс. Как вы можете увидеть в моем примере, это не значит отказаться от возможности Java 8, они хорошо согласуются между собой. Только автоматизированная параллелизм введена Stream.parallelследует использовать с осторожностью (учитывая текущую реализацию).

18

Мы можем сделать его легче выполнять параллельно, но мы не можем обязательно сделать параллелизм легко.

Виновник в вашем коде является сочетанием предела + параллельно. Реализация предела () тривиально для последовательных потоков, но достаточно дорогой для параллельных потоков. Это происходит потому, что определение операции предела связанно с порядком столкновения потока. Потоки с ограничением () расположены параллельно часто медленнее, чем в последовательном, если расчет делается на элемент не очень высок.

Ваш выбор источника потока также ограничивает параллелизм. Использование iterate(0, n->n+1)дает положительные целые числа, но iterateпринципиально последовательным; Вы не можете вычислить п - й элемент до тех пор , пока вы вычислили (п-1) -й элемент. Поэтому , когда мы пытаемся разделить этот поток, мы в конечном итоге расщепление (первый, отдых). Попробуйте использовать range(0,k)вместо; это раскалывает гораздо более красиво, разделив аккуратно половинами с произвольным доступом.