blackcat

Stream并行流

并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。
Java 8 中将并行进行了优化,我们可以很容易的对数据进行并行操作。Stream API可以声明性地通过 parallel()与sequential()在并行流与顺序流之间进行切换。

Fork/Join 框架

Fork/Join 框架,采用 “工作窃取”模式(work-stealing): 当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线 程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。
相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的 处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因 无法继续运行,那么该线程会处于等待状态.而在fork/join框架实现中,如果 某个子问题由于等待另外一个子问题的完成而无法继续运行.那么处理该子 问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间,提高了性能.

Fork/Join计算数值累加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* @author black猫
* @date 2019-11-26
* fork join 把大的任务拆分成小任务,再合并,利用线程,充分利用 多核cpu
*/
public class ForkJoinCalculate extends RecursiveTask<Long> {

/***
* 开始值
*/
private long start;

/***
* 结束值
*/
private long end;

/***
* 临界值 大于这个值 就要拆分成多个小任务
*/
private static final long THRESHOLD = 100000;

/***
* 构造函数
* @param start 起始值
* @param end 结束值
*/
public ForkJoinCalculate(long start, long end) {
this.start = start;
this.end = end;
}

/***
* main函数 测试
* @param args
*/
public static void main(String[] args) {
// 0-1亿的累加
ForkJoinCalculate forkJoinCalculate = new ForkJoinCalculate(0, 1000000000);
// fork join线程池
ForkJoinPool pool = new ForkJoinPool();
Long sum = pool.invoke(forkJoinCalculate);
// 输出结果
System.out.println(sum);
}

/**
* 计算数值的累加和
* 返回完成后的结果
*
* @return the result of the computation
*/
@Override
protected Long compute() {
long length = end - start;
if (length < THRESHOLD) {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
// 中间数
long middle = (start + end) / 2;

// 左边
ForkJoinCalculate left = new ForkJoinCalculate(start, middle);
// 子任务
left.fork();

// 右边
ForkJoinCalculate right = new ForkJoinCalculate(middle + 1, end);
// 子任务
right.fork();

// 结果合起来
return left.join() + right.join();

}
}
}

对比for循环/ForkJoin框架/Stream顺序流和并行流,计算数值累加和时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* @author black猫
* @date 2019-11-26
*/
public class Stream04Test {

long maxNum = 100000000000L;

/***
* 传统的 for循环:计算 maxNum的累加和
*/
@Test
void test01() {
long startTime = System.currentTimeMillis();

long sum = 0L;

for (int i = 0; i < maxNum; i++) {
sum += i;
}

System.out.println(sum);
long endTime = System.currentTimeMillis();

System.out.println("time:" + (endTime - startTime) + "ms");

}


/***
* fork join 框架:计算 maxNum的累加和
*/
@Test
void test02() {
long startTime = System.currentTimeMillis();
//fork join 池
ForkJoinPool pool = new ForkJoinPool();
ForkJoinCalculate forkJoinCalculate = new ForkJoinCalculate(0, maxNum);
Long sum = pool.invoke(forkJoinCalculate);
System.out.println(sum);
long endTime = System.currentTimeMillis();
System.out.println("time:" + (endTime - startTime) + "ms");

}

/***
* stream顺序流:计算 maxNum的累加和
*/
@Test
void test03() {
long startTime = System.currentTimeMillis();
Long optional = LongStream.rangeClosed(0L, maxNum).sequential().reduce(0, Long::sum);
long endTime = System.currentTimeMillis();
System.out.println(optional);
System.out.println("time:" + (endTime - startTime) + "ms");
}

/***
* stream并行流:计算 maxNum的累加和
*/
@Test
public void test04() {
long startTime = System.currentTimeMillis();
Long optional = LongStream.rangeClosed(0L, maxNum).parallel().reduce(0, Long::sum);
long endTime = System.currentTimeMillis();
System.out.println(optional);
System.out.println("time:" + (endTime - startTime) + "ms");
}
}

对比计算累加和时间

类型/计算累加数值 一千万 一亿 十亿 一百亿 一千亿
Test01(for循环) 8ms 50ms 462ms
Test02(fork join框架) 20ms 58ms 249ms 2189ms 17887ms
Test03(stream顺序流) 85ms 119ms 693ms 5370ms 52361ms
Test03(stream并行流) 22ms 35ms 193ms 1508ms 14776ms

我的小结

Stream并行流在计算多任务中,当线程的上下文切换的时间小于线程运行时间时,会充分提高cpu多核的利用率,效果更好。


 评论