在路上

 找回密码
 立即注册
在路上 站点首页 学习 查看内容

Java 线程池详解

2016-8-29 13:48| 发布者: zhangjf| 查看: 555| 评论: 0

摘要: 系统启动一个线程的成本是比较高的,因为它涉及到与操作系统的交互,使用线程池的好处是提高性能,当系统中包含大量并发的线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池的最大线程数参数可以控制系统 ...

系统启动一个线程的成本是比较高的,因为它涉及到与操作系统的交互,使用线程池的好处是提高性能,当系统中包含大量并发的线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池的最大线程数参数可以控制系统中并发线程数不超过次数。

一、Executors 工厂类用来产生线程池,该工厂类包含以下几个静态工厂方法来创建对应的线程池。创建的线程池是一个ExecutorService对象,使用该对象的submit方法或者是execute方法执行相应的Runnable或者是Callable任务。线程池本身在不再需要的时候调用shutdown()方法停止线程池,调用该方法后,该线程池将不再允许任务添加进来,但是会直到已添加的所有任务执行完成后才死亡。

1、newCachedThreadPool(),创建一个具有缓存功能的线程池,提交到该线程池的任务(Runnable或Callable对象)创建的线程,如果执行完成,会被缓存到CachedThreadPool中,供后面需要执行的任务使用。

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class CacheThreadPool {
  4. static class Task implements Runnable {
  5. @Override
  6. public void run() {
  7. System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
  8. + Thread.currentThread().getAllStackTraces().size());
  9. }
  10. }
  11. public static void main(String[] args) {
  12. ExecutorService cacheThreadPool = Executors.newCachedThreadPool();
  13. //先添加三个任务到线程池
  14. for(int i = 0 ; i < 3; i++) {
  15. cacheThreadPool.execute(new Task());
  16. }
  17. //等三个线程执行完成后,再次添加三个任务到线程池
  18. try {
  19. Thread.sleep(3000);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. for(int i = 0 ; i < 3; i++) {
  24. cacheThreadPool.execute(new Task());
  25. }
  26. }
  27. }
复制代码

执行结果如下:

  1. CacheThreadPool$Task@2d312eb9 pool-1-thread-1 AllStackTraces map size: 7
  2. CacheThreadPool$Task@59522b86 pool-1-thread-3 AllStackTraces map size: 7
  3. CacheThreadPool$Task@73dbb89f pool-1-thread-2 AllStackTraces map size: 7
  4. CacheThreadPool$Task@5795cedc pool-1-thread-3 AllStackTraces map size: 7
  5. CacheThreadPool$Task@256d5600 pool-1-thread-1 AllStackTraces map size: 7
  6. CacheThreadPool$Task@7d1c5894 pool-1-thread-2 AllStackTraces map size: 7
复制代码

线程池中的线程对象进行了缓存,当有新任务执行时进行了复用。但是如果有特别多的并发时,缓存线程池还是会创建很多个线程对象。

2、newFixedThreadPool(int nThreads) 创建一个指定线程个数,线程可复用的线程池。

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class FixedThreadPool {
  4. static class Task implements Runnable {
  5. @Override
  6. public void run() {
  7. System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
  8. + Thread.currentThread().getAllStackTraces().size());
  9. }
  10. }
  11. public static void main(String[] args) {
  12. ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
  13. // 先添加三个任务到线程池
  14. for (int i = 0; i < 5; i++) {
  15. fixedThreadPool.execute(new Task());
  16. }
  17. // 等三个线程执行完成后,再次添加三个任务到线程池
  18. try {
  19. Thread.sleep(3);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. for (int i = 0; i < 3; i++) {
  24. fixedThreadPool.execute(new Task());
  25. }
  26. }
  27. }
复制代码

执行结果:

  1. FixedThreadPool$Task@7045c12d pool-1-thread-2 AllStackTraces map size: 7
  2. FixedThreadPool$Task@50fa0bef pool-1-thread-2 AllStackTraces map size: 7
  3. FixedThreadPool$Task@ccb1870 pool-1-thread-2 AllStackTraces map size: 7
  4. FixedThreadPool$Task@7392b4e3 pool-1-thread-1 AllStackTraces map size: 7
  5. FixedThreadPool$Task@5bdeff18 pool-1-thread-2 AllStackTraces map size: 7
  6. FixedThreadPool$Task@7d5554e1 pool-1-thread-1 AllStackTraces map size: 7
  7. FixedThreadPool$Task@24468092 pool-1-thread-3 AllStackTraces map size: 7
  8. FixedThreadPool$Task@fa7b978 pool-1-thread-2 AllStackTraces map size: 7
复制代码

3、newSingleThreadExecutor(),创建一个只有单线程的线程池,相当于调用newFixedThreadPool(1)

4、newSheduledThreadPool(int corePoolSize),创建指定线程数的线程池,它可以在指定延迟后执行线程。也可以以某一周期重复执行某一线程,知道调用shutdown()关闭线程池。

示例如下:

  1. import java.util.concurrent.Executors;
  2. import java.util.concurrent.ScheduledExecutorService;
  3. import java.util.concurrent.TimeUnit;
  4. public class ScheduledThreadPool {
  5. static class Task implements Runnable {
  6. @Override
  7. public void run() {
  8. System.out.println("time " + System.currentTimeMillis() + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
  9. + Thread.currentThread().getAllStackTraces().size());
  10. }
  11. }
  12. public static void main(String[] args) {
  13. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
  14. scheduledExecutorService.schedule(new Task(), 3, TimeUnit.SECONDS);
  15. scheduledExecutorService.scheduleAtFixedRate(new Task(), 3, 5, TimeUnit.SECONDS);
  16. try {
  17. Thread.sleep(30 * 1000);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. scheduledExecutorService.shutdown();
  22. }
  23. }
复制代码

运行结果如下:

  1. time 1458921795240 pool-1-thread-1 AllStackTraces map size: 6
  2. time 1458921795241 pool-1-thread-2 AllStackTraces map size: 6
  3. time 1458921800240 pool-1-thread-1 AllStackTraces map size: 7
  4. time 1458921805240 pool-1-thread-1 AllStackTraces map size: 7
  5. time 1458921810240 pool-1-thread-1 AllStackTraces map size: 7
  6. time 1458921815240 pool-1-thread-1 AllStackTraces map size: 7
  7. time 1458921820240 pool-1-thread-1 AllStackTraces map size: 7
复制代码

由运行时间可看出,任务是按照5秒的周期执行的。

5、newSingleThreadScheduledExecutor() 创建一个只有一个线程的线程池,同调用newScheduledThreadPool(1)。

二、ForkJoinPool和ForkJoinTask

ForkJoinPool是ExecutorService的实现类,支持将一个任务划分为多个小任务并行计算,在把多个小任务的计算结果合并成总的计算结果。它有两个构造函数

ForkJoinPool(int parallelism)创建一个包含parallelism个并行线程的ForkJoinPool。

ForkJoinPool(),以Runtime.availableProcessors()方法返回值作为parallelism参数来创建ForkJoinPool。

ForkJoinTask 代表一个可以并行,合并的任务。它是实现了Future接口的抽象类,它有两个抽象子类,代表无返回值任务的RecuriveAction和有返回值的RecursiveTask。可根据具体需求继承这两个抽象类实现自己的对象,然后调用ForkJoinPool的submit 方法执行。

RecuriveAction 示例如下,实现并行输出0-300的数字。

  1. import java.util.concurrent.ForkJoinPool;
  2. import java.util.concurrent.RecursiveAction;
  3. import java.util.concurrent.TimeUnit;
  4. public class ActionForkJoinTask {
  5. static class PrintTask extends RecursiveAction {
  6. private static final int THRESHOLD = 50;
  7. private int start;
  8. private int end;
  9. public PrintTask(int start, int end) {
  10. this.start = start;
  11. this.end = end;
  12. }
  13. @Override
  14. protected void compute() {
  15. if (end - start < THRESHOLD) {
  16. for(int i = start; i < end; i++) {
  17. System.out.println(Thread.currentThread().getName() + " " + i);
  18. }
  19. } else {
  20. int middle = (start + end) / 2;
  21. PrintTask left = new PrintTask(start, middle);
  22. PrintTask right = new PrintTask(middle, end);
  23. left.fork();
  24. right.fork();
  25. }
  26. }
  27. }
  28. public static void main(String[] args) {
  29. ForkJoinPool pool = new ForkJoinPool();
  30. pool.submit(new PrintTask(0, 300));
  31. try {
  32. pool.awaitTermination(2, TimeUnit.SECONDS);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. pool.shutdown();
  37. }
  38. }
复制代码

在拆分小任务后,调用任务的fork()方法,加入到ForkJoinPool中并行执行。

RecursiveTask示例,实现并行计算100个整数求和。拆分为每20个数求和后获取结果,在最后合并为最后的结果。

  1. import java.util.Random;
  2. import java.util.concurrent.ExecutionException;
  3. import java.util.concurrent.ForkJoinPool;
  4. import java.util.concurrent.Future;
  5. import java.util.concurrent.RecursiveTask;
  6. public class TaskForkJoinTask {
  7. static class CalTask extends RecursiveTask<Integer> {
  8. private static final int THRESHOLD = 20;
  9. private int arr[];
  10. private int start;
  11. private int end;
  12. public CalTask(int[] arr, int start, int end) {
  13. this.arr = arr;
  14. this.start = start;
  15. this.end = end;
  16. }
  17. @Override
  18. protected Integer compute() {
  19. int sum = 0;
  20. if (end - start < THRESHOLD) {
  21. for (int i = start; i < end; i++) {
  22. sum += arr[i];
  23. }
  24. System.out.println(Thread.currentThread().getName() + " sum:" + sum);
  25. return sum;
  26. } else {
  27. int middle = (start + end) / 2;
  28. CalTask left = new CalTask(arr, start, middle);
  29. CalTask right = new CalTask(arr, middle, end);
  30. left.fork();
  31. right.fork();
  32. return left.join() + right.join();
  33. }
  34. }
  35. }
  36. public static void main(String[] args) {
  37. int arr[] = new int[100];
  38. Random random = new Random();
  39. int total = 0;
  40. for (int i = 0; i < arr.length; i++) {
  41. int tmp = random.nextInt(20);
  42. total += (arr[i] = tmp);
  43. }
  44. System.out.println("total " + total);
  45. ForkJoinPool pool = new ForkJoinPool(4);
  46. Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length));
  47. try {
  48. System.out.println("cal result: " + future.get());
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. } catch (ExecutionException e) {
  52. e.printStackTrace();
  53. }
  54. pool.shutdown();
  55. }
  56. }
复制代码

执行结果如下:

  1. total 912
  2. ForkJoinPool-1-worker-2 sum:82
  3. ForkJoinPool-1-worker-2 sum:123
  4. ForkJoinPool-1-worker-2 sum:144
  5. ForkJoinPool-1-worker-3 sum:119
  6. ForkJoinPool-1-worker-2 sum:106
  7. ForkJoinPool-1-worker-2 sum:128
  8. ForkJoinPool-1-worker-2 sum:121
  9. ForkJoinPool-1-worker-3 sum:89
  10. cal result: 912
复制代码

子任务执行完后,调用任务的join()方法获取子任务执行结果,再相加获得最后的结果。

最新评论

小黑屋|在路上 ( 蜀ICP备15035742号-1 

;

GMT+8, 2025-7-8 00:40

Copyright 2015-2025 djqfx

返回顶部