在路上

 找回密码
 立即注册
在路上 站点首页 学习 查看内容

Java 8并行流:必备技巧

2017-2-7 13:43| 发布者: zhangjf| 查看: 491| 评论: 0

摘要: Java 8 并行流(parallel stream)采用共享线程池,对性能造成了严重影响。可以包装流来调用自己的线程池解决性能问题。 问题 Java 8 的并行流可以让我们相对轻松地执行并行任务。 myList.parallelStream.map(obj ...

Java 8 并行流(parallel stream)采用共享线程池,对性能造成了严重影响。可以包装流来调用自己的线程池解决性能问题。

问题

Java 8 的并行流可以让我们相对轻松地执行并行任务。

  1. myList.parallelStream.map(obj -> longRunningOperation())
复制代码

但是这样存在一个严重的问题:在 JVM 的后台,使用通用的 fork/join 池来完成上述功能,该池是所有并行流共享的。默认情况,fork/join 池会为每个处理器分配一个线程。假设你有一台16核的机器,这样你就只能创建16个线程。对 CPU 密集型的任务来说,这样是有意义的,因为你的机器确实只能执行16个线程。但是真实情况下,不是所有的任务都是 CPU 密集型的。例如:

  1. myList.parallelStream
  2. .map(this::retrieveFromA)
  3. .map(this::processUsingB)
  4. .forEach(this::saveToC)
  5. myList.parallelStream
  6. .map(this::retrieveFromD)
  7. .map(this::processUsingE)
  8. .forEach(this::saveToD)
复制代码

这两个流很大程度上是受限于IO操作,所以会等待其他系统。但这两个流使用相同的(小)线程池,因此会相互等待而被阻塞。这个非常不好,可以改进。我们以一个流为例:

  1. final List<Integer> firstRange = buildIntRange();
  2. firstRange.parallelStream().forEach((number) -> {
  3. try {
  4. // do something slow
  5. Thread.sleep(5);
  6. } catch (InterruptedException e) { }
  7. });
复制代码

完整的代码可以在gist上查看。

在执行期间,我获取了一份线程dump的文件。这是相关的线程(在我的Macbook上):

  1. ForkJoinPool.commonPool-worker-1
  2. ForkJoinPool.commonPool-worker-2
  3. ForkJoinPool.commonPool-worker-3
  4. ForkJoinPool.commonPool-worker-4
复制代码

现在,我要并行的执行这两个并行流(对于那些不是以英语为母语的人士,我感到非常抱歉!)

  1. Runnable firstTask = () -> {
  2. firstRange.parallelStream().forEach((number) -> {
  3. try {
  4. // do something slow
  5. Thread.sleep(5);
  6. } catch (InterruptedException e) { }
  7. });
  8. };
  9. Runnable secondTask = () -> {
  10. secondRange.parallelStream().forEach((number) -> {
  11. try {
  12. // do something slow
  13. Thread.sleep(5);
  14. } catch (InterruptedException e) { }
  15. });
  16. };
  17. // run threads
复制代码

完整的代码可以在gist上查看。

这次我们再看一下线程dump文件:

  1. ForkJoinPool.commonPool-worker-1
  2. ForkJoinPool.commonPool-worker-2
  3. ForkJoinPool.commonPool-worker-3
  4. ForkJoinPool.commonPool-worker-4
复制代码

正如你所见,结果是一样的。我们只使用了4个线程。

一种变通方案

正如我所提到的,JVM 后台使用 fork/join 池,在 ForkJoinTask 的文档中,我们可以看到:

如果合适,安排一个异步执行的任务到当前正在运行的池中。如果任务不在inForkJoinPool()中,也可以调用ForkJoinPool.commonPool()获取新的池来执行。

让我试一试……

  1. ForkJoinPool forkJoinPool = new ForkJoinPool(3);
  2. forkJoinPool.submit(() -> {
  3. firstRange.parallelStream().forEach((number) -> {
  4. try {
  5. Thread.sleep(5);
  6. } catch (InterruptedException e) { }
  7. });
  8. });
  9. ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);
  10. forkJoinPool2.submit(() -> {
  11. secondRange.parallelStream().forEach((number) -> {
  12. try {
  13. Thread.sleep(5);
  14. } catch (InterruptedException e) {
  15. }
  16. });
  17. });
复制代码

完整的代码可以在gist上查看。

现在,我们再次查看线程池:

  1. ForkJoinPool-1-worker-1
  2. ForkJoinPool-1-worker-2
  3. ForkJoinPool-1-worker-3
  4. ForkJoinPool-1-worker-4
  5. ForkJoinPool-2-worker-1
  6. ForkJoinPool-2-worker-2
  7. ForkJoinPool-2-worker-3
  8. ForkJoinPool-1-worker-4
复制代码

因为我们创建自己的线程池,所以可以避免共享线程池,如果有需要,甚至可以分配比处理机数量更多的线程。

  1. ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>);
复制代码
原文链接: tobyhobson 翻译: ImportNew.com - paddx
译文链接: http://www.importnew.com/16801.html

最新评论

小黑屋|在路上 ( 蜀ICP备15035742号-1 

;

GMT+8, 2025-7-9 20:26

Copyright 2015-2025 djqfx

返回顶部