Stream并行流和Fork/Join框架

11/25/2019 LambdaStream

# Stream并行流

并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。 Java 8 中将并行进行了优化,我们可以很容易的对数据进行并行操作。Stream API可以声明性地通过 parallel()与sequential()在并行流与顺序流之间进行切换。

# Fork/Join 框架

Fork/Join 框架,采用 “工作窃取”模式(work-stealing): 当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线 程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。 相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的 处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因 无法继续运行,那么该线程会处于等待状态.而在fork/join框架实现中,如果 某个子问题由于等待另外一个子问题的完成而无法继续运行.那么处理该子 问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间,提高了性能.

Fork/Join计算数值累加

/**
 * @author black猫
 * @date 2019-11-26
 * fork join 把大的任务拆分成小任务,再合并,利用线程,充分利用 多核cpu
 */
public class ForkJoinCalculate extends RecursiveTask<Long> {

    /***
     * 开始值
     */
    private long start;

    /***
     * 结束值
     */
    private long end;

    /***
     * 临界值 大于这个值 就要拆分成多个小任务
     */
    private static final long THRESHOLD = 100000;

    /***
     * 构造函数
     * @param start 起始值
     * @param end 结束值
     */
    public ForkJoinCalculate(long start, long end) {
        this.start = start;
        this.end = end;
    }

    /***
     * main函数 测试
     * @param args
     */
    public static void main(String[] args) {
        //  0-1亿的累加
        ForkJoinCalculate forkJoinCalculate = new ForkJoinCalculate(0, 1000000000);
        //  fork join线程池
        ForkJoinPool pool = new ForkJoinPool();
        Long sum = pool.invoke(forkJoinCalculate);
        //  输出结果
        System.out.println(sum);
    }

    /**
     * 计算数值的累加和
     * 返回完成后的结果
     *
     * @return the result of the computation
     */
    @Override
    protected Long compute() {
        long length = end - start;
        if (length < THRESHOLD) {
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            //   中间数
            long middle = (start + end) / 2;

            //   左边
            ForkJoinCalculate left = new ForkJoinCalculate(start, middle);
            //   子任务
            left.fork();

            //   右边
            ForkJoinCalculate right = new ForkJoinCalculate(middle + 1, end);
            //  子任务
            right.fork();

            //  结果合起来
            return left.join() + right.join();

        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

对比for循环/ForkJoin框架/Stream顺序流和并行流,计算数值累加和时间

/**
 * @author black猫
 * @date 2019-11-26
 */
public class Stream04Test {

    long maxNum = 100000000000L;

    /***
     * 传统的 for循环:计算 maxNum的累加和
     */
    @Test
    void test01() {
        long startTime = System.currentTimeMillis();

        long sum = 0L;

        for (int i = 0; i < maxNum; i++) {
            sum += i;
        }

        System.out.println(sum);
        long endTime = System.currentTimeMillis();

        System.out.println("time:" + (endTime - startTime) + "ms");

    }


    /***
     * fork join 框架:计算 maxNum的累加和
     */
    @Test
    void test02() {
        long startTime = System.currentTimeMillis();
        //fork join 池
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinCalculate forkJoinCalculate = new ForkJoinCalculate(0, maxNum);
        Long sum = pool.invoke(forkJoinCalculate);
        System.out.println(sum);
        long endTime = System.currentTimeMillis();
        System.out.println("time:" + (endTime - startTime) + "ms");

    }

    /***
     * stream顺序流:计算 maxNum的累加和
     */
    @Test
    void test03() {
        long startTime = System.currentTimeMillis();
        Long optional = LongStream.rangeClosed(0L, maxNum).sequential().reduce(0, Long::sum);
        long endTime = System.currentTimeMillis();
        System.out.println(optional);
        System.out.println("time:" + (endTime - startTime) + "ms");
    }

    /***
     * stream并行流:计算 maxNum的累加和
     */
    @Test
    public void test04() {
        long startTime = System.currentTimeMillis();
        Long optional = LongStream.rangeClosed(0L, maxNum).parallel().reduce(0, Long::sum);
        long endTime = System.currentTimeMillis();
        System.out.println(optional);
        System.out.println("time:" + (endTime - startTime) + "ms");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

# 对比计算累加和时间

类型/计算累加数值 一千万 一亿 十亿 一百亿 一千亿
Test01(for循环) 8ms 50ms 462ms
Test02(fork join框架) 20ms 58ms 249ms 2189ms 17887ms
Test03(stream顺序流) 85ms 119ms 693ms 5370ms 52361ms
Test03(stream并行流) 22ms 35ms 193ms 1508ms 14776ms

# 我的小结

Stream并行流在计算多任务中,当线程的上下文切换的时间小于线程运行时间时,会充分提高cpu多核的利用率,效果更好。

文章首发于黑猫のBlog (opens new window)欢迎来留言啊!!!

Last Updated: 8/20/2021, 5:31:07 PM