现代的计算机已经向多CPU方向发展,即使是普通的PC,甚至现在的智能手机、多核处理器已被广泛应用。在未来,处理器的核心数将会发展的越来越多。 虽然硬件上的多核CPU已经十分成熟,但是很多应用程序并未这种多核CPU做好准备,因此并不能很好地利用多核CPU的性能优势。 为了充分利用多CPU、多核CPU的性能优势,级软基软件系统应该可以充分“挖掘”每个CPU的计算能力,决不能让某个CPU处于“空闲”状态。为此,可以考虑把一个任务拆分成多个“小任务”,把多个"小任务"放到多个处理器核心上并行执行。当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。 如下面的示意图所示: 第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。 第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。 Java提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合成总的计算结果。 ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool提供了如下两个常用的构造器。 public ForkJoinPool(int parallelism):创建一个包含parallelism个并行线程的ForkJoinPool public ForkJoinPool() :以Runtime.getRuntime().availableProcessors()的返回值作为parallelism来创建ForkJoinPool 创建ForkJoinPool实例后,可以钓鱼ForkJoinPool的submit(ForkJoinTask task)或者invoke(ForkJoinTask task)来执行指定任务。其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它有两个抽象子类:RecursiveAction和RecursiveTask。 RecursiveTask代表有返回值的任务 RecursiveAction代表没有返回值的任务。 一、RecursiveAction 下面以一个没有返回值的大任务为例,介绍一下RecursiveAction的用法。 大任务是:打印0-200的数值。 小任务是:每次只能打印50个数值。 - import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.RecursiveAction;
- import java.util.concurrent.TimeUnit;
-
- //RecursiveAction为ForkJoinTask的抽象子类,没有返回值的任务
- class PrintTask extends RecursiveAction {
- // 每个"小任务"最多只打印50个数
- private static final int MAX = 50;
-
- private int start;
- private int end;
-
- PrintTask(int start, int end) {
- this.start = start;
- this.end = end;
- }
-
- @Override
- protected void compute() {
- // 当end-start的值小于MAX时候,开始打印
- if ((end - start) < MAX) {
- for (int i = start; i < end; i++) {
- System.out.println(Thread.currentThread().getName() + "的i值:"
- + i);
- }
- } else {
- // 将大任务分解成两个小任务
- int middle = (start + end) / 2;
- PrintTask left = new PrintTask(start, middle);
- PrintTask right = new PrintTask(middle, end);
- // 并行执行两个小任务
- left.fork();
- right.fork();
- }
- }
- }
-
- public class ForkJoinPoolTest {
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
- ForkJoinPool forkJoinPool = new ForkJoinPool();
- // 提交可分解的PrintTask任务
- forkJoinPool.submit(new PrintTask(0, 200));
- forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
- // 关闭线程池
- forkJoinPool.shutdown();
- }
-
- }
复制代码 运行结果如下: 从上面结果来看,ForkJoinPool启动了两个线程来执行这个打印任务,这是因为笔者的计算机的CPU是双核的。不仅如此,读者可以看到程序虽然打印了0-199这两百个数字,但是并不是连续打印的,这是因为程序将这个打印任务进行了分解,分解后的任务会并行执行,所以不会按顺序从0打印 到199。 二、RecursiveTask 下面以一个有返回值的大任务为例,介绍一下RecursiveTask的用法。 大任务是:计算随机的100个数字的和。 小任务是:每次只能20个数值的和。 - import java.util.Random;
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.Future;
- import java.util.concurrent.RecursiveTask;
-
- //RecursiveTask为ForkJoinTask的抽象子类,有返回值的任务
- class SumTask extends RecursiveTask<Integer> {
- // 每个"小任务"最多只打印50个数
- private static final int MAX = 20;
- private int arr[];
- private int start;
- private int end;
-
- SumTask(int arr[], int start, int end) {
- this.arr = arr;
- this.start = start;
- this.end = end;
- }
-
- @Override
- protected Integer compute() {
- int sum = 0;
- // 当end-start的值小于MAX时候,开始打印
- if ((end - start) < MAX) {
- for (int i = start; i < end; i++) {
- sum += arr[i];
- }
- return sum;
- } else {
- System.err.println("=====任务分解======");
- // 将大任务分解成两个小任务
- int middle = (start + end) / 2;
- SumTask left = new SumTask(arr, start, middle);
- SumTask right = new SumTask(arr, middle, end);
- // 并行执行两个小任务
- left.fork();
- right.fork();
- // 把两个小任务累加的结果合并起来
- return left.join() + right.join();
- }
- }
-
- }
-
- public class ForkJoinPoolTest2 {
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- int arr[] = new int[100];
- Random random = new Random();
- int total = 0;
- // 初始化100个数字元素
- for (int i = 0; i < arr.length; i++) {
- int temp = random.nextInt(100);
- // 对数组元素赋值,并将数组元素的值添加到total总和中
- total += (arr[i] = temp);
- }
- System.out.println("初始化时的总和=" + total);
- // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
- ForkJoinPool forkJoinPool = new ForkJoinPool();
- // 提交可分解的PrintTask任务
- Future<Integer> future = forkJoinPool.submit(new SumTask(arr, 0,
- arr.length));
- System.out.println("计算出来的总和=" + future.get());
- // 关闭线程池
- forkJoinPool.shutdown();
- }
-
- }
复制代码 计算结果如下:- 初始化时的总和=4283
- =====任务分解======
- =====任务分解======
- =====任务分解======
- =====任务分解======
- =====任务分解======
- =====任务分解======
- =====任务分解======
- 计算出来的总和=4283
复制代码 从上面结果来看,ForkJoinPool将任务分解了7次,程序通过SumTask计算出来的结果,和初始化数组时统计出来的总和是相等的,这表明计算结果一切正常。 读者还参考以下文章加深对ForkJoinPool的理解: http://www.infoq.com/cn/articles/fork-join-introduction/ http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/ ================================================================================================== 作者:欧阳鹏 欢迎转载,与人分享是进步的源泉! 转载请保留原文地址:http://blog.csdn.net/ouyang_peng |