在路上

 找回密码
 立即注册
在路上 站点首页 学习 查看内容

Java8 中的纯异步编程

2016-12-20 13:12| 发布者: zhangjf| 查看: 411| 评论: 0

摘要: 当系统越来越复杂之后,服务化的模块的接口调用会越来越多,最终模块之间的IO 成为影响整体系统性能的关键因素。传统的阻塞IO + 线程池模型应对这种场景比较无力,只能依靠增加线程数量,但是服务器本身的线程数是有 ...

当系统越来越复杂之后,服务化的模块的接口调用会越来越多,最终模块之间的IO 成为影响整体系统性能的关键因素。传统的阻塞IO + 线程池模型应对这种场景比较无力,只能依靠增加线程数量,但是服务器本身的线程数是有上线的。一个模块接口性能的波动,啥有不慎就会造成调用者线程池被 IO打满,压垮整个服务。这时候纯异步编程就有了用武之地,因为IO 不再占用线程来执行,仅需要开少量的线程用于CPU 密集的操作,模块本身对服务接口超时的容忍程度也大大增加。

所谓纯异步编程,就是中间完全没有阻塞的操作,所有的IO 调用均是异步的。一般讲到纯异步编程,都会让人望而生畏,其实纯异步程序好不好写,和语言的特性是很相关的。像Golang 这样原生支持协程的语言基本上写出的程序就是全异步的,使用者就像在写同步程序一样;而C#, python, 以及最新的nodejs 这样的语言,虽然没有原生协程支持,但是支持Generator,写出的纯异步程序也和同步程序看起来差不多。如果一个语言连Generator 也没有,就只能使用回调的方法来写异步程序,而回调本身是编写难度很高、很容易出错的方式,尤其是还要考虑异常处理、传递这些问题,所谓「回调地狱」即是如此。

为了降低写回调式的纯异步程序的难度,就有了一种可以称之为「Managed callback」的编程模式,其代表有Google guava 的concurrent lib,twitter 使用scala 完成的Finagle 等。这种模式的特点是程序书写的顺序看起来和执行的顺序一致,自动管理异常的捕获传递,避免深层次的嵌套,函数式风格,偏向于使用粒度较小的函数组合来完成程序,使用Immutable的对象等。

Java 中原本的Future 类也是设计用来完成异步编程的,但是Future 本身的接口和功能比较有限,这才有了Guava 中的ListenableFuture 等各种增强的实现。Java8 提供了新的CompletableFuture,并且有了基本的对于函数编程的支持,已经很合适来进行Managed callback 模式的异步编程了。

首先我们基于Netty实现了一个异步的Http Server 和 Http Client,假设有一个Http client 基础接口如下:

  1. CompletableFuture<ImmutableResponse> request(ImmutableRequest request);
复制代码

另外有一个Async 的Http Server,可以注册如下接口的Hanlder:

  1. /**
  2. * Async http process interface
  3. *
  4. * @author Dong Liu dongliu@wandoujia.com
  5. */
  6. public interface AsyncHandler {
  7. /**
  8. * handle request.
  9. */
  10. CompletableFuture<ImmutableResponse> handle(ImmutableRequest request);
  11. }
复制代码

假设我们要写一个纯异步的程序,从 A 接口获取数据,这个数据是一个整数,在这个接口上加 10 再通过Http 接口返回。

首先我们觉得request 方法太原始,封装一个更简单的方法:

  1. public CompletableFuture<String> get(String url) {
  2. ImmutableRequest request = ImmutableRequest.newBuilder()
  3. .withMethod(HttpMethod.GET)
  4. .withUri(url)
  5. .build();
  6. return request(request).thenApply(response -> new String(response.getBody()));
  7. }
复制代码

这里演示了CompletableFuture 中thenApply 方法的用法,这个方法在CompletableFuture 中管理的回调完成之后进行调用,对结果进行处理。

现在来完成我们的Handler:

  1. public class AddNumberHandler implements AsyncHandler {
  2. private HttpClient httpClient = ...;
  3. @Override
  4. public CompletableFuture<ImmutableResponse> handle(ImmutableRequest request) {
  5. CompletableFuture<String> resultFuture = httpClient.get("http://127.0.0.1/a");
  6. return resultFuture.thenApply(result -> {
  7. int total = Integer.parseInt(result) + 10;
  8. return ImmutableResponse.newBuilder().withBody(String.valueOf(total)).build();
  9. });
  10. }
  11. }
复制代码

让情况变得更复杂一些。假设我们需要调用A, B, C 三个接口,并把三个接口所返回的数字相加。因为A, B, C三个接口调用是独立的,所以决定并行的来请求三个接口,以提升效率:

  1. public class AddNumberHandler implements AsyncHandler {
  2. private HttpClient httpClient = ...;
  3. @Override
  4. public CompletableFuture<ImmutableResponse> handle(ImmutableRequest request) {
  5. CompletableFuture<String> aFuture = httpClient.get("http://127.0.0.1/a");
  6. CompletableFuture<String> bFuture = httpClient.get("http://127.0.0.1/b");
  7. CompletableFuture<String> cFuture = httpClient.get("http://127.0.0.1/b");
  8. return FutureUtils.combine(aFuture, bFuture, cFuture).thenApply(abc -> {
  9. int total = parseInt(abc._1()) + parseInt(abc._2()) + parseInt(abc._3());
  10. return ImmutableResponse.newBuilder().withBody(String.valueOf(total)).build();
  11. });
  12. }
  13. }
复制代码

FutureUtils.combine 方法是一个简单的封装,封装了CompletableFuture 的 Combine 方法以方便使用。可以看到,使用CompletableFuture 来写这种并行多个请求的异步程序是很容易的事情。

这中间我们都没有特别的对异常进行处理,如果直接使用callback 的话,这是肯定不可行的,必须手动捕获所有异常,否则这个callback 就永远不会返回了。但是现在,CompletableFuture管理了我们提供的回调函数, 会帮我们捕获异常并进行管理。这个异常在调用thenApply 或者combine 方法的时候是自动传递的,如果第一步就失败了,后面注册的回调就不会被执行。要对异常进行处理的话,可以使用exceptionally 方法:

  1. public class AddNumberHandler implements AsyncHandler {
  2. private HttpClient httpClient = ...;
  3. private Logger logger = ...;
  4. @Override
  5. public CompletableFuture<ImmutableResponse> handle(ImmutableRequest request) {
  6. CompletableFuture<String> aFuture = httpClient.get("http://127.0.0.1/a");
  7. CompletableFuture<String> bFuture = httpClient.get("http://127.0.0.1/b");
  8. CompletableFuture<String> cFuture = httpClient.get("http://127.0.0.1/b");
  9. aFuture.isCompletedExceptionally();
  10. CompletableFuture<Integer> totalFuture = FutureUtils.combine(aFuture, bFuture, cFuture)
  11. .thenApply(abc -> parseInt(abc._1()) + parseInt(abc._2()) + parseInt(abc._3()))
  12. .exceptionally(t -> {
  13. logger.error("", t);
  14. return -1;
  15. });
  16. return totalFuture.thenApply(total -> {
  17. return ImmutableResponse.newBuilder().withBody(String.valueOf(total)).build();
  18. });
  19. }
  20. }
复制代码

这里举得例子都比较简单,想更多的了解可以参见CompletableFuture 的文档。

原文链接: http://www.dongliu.net/post/622452

最新评论

小黑屋|在路上 ( 蜀ICP备15035742号-1 

;

GMT+8, 2025-7-8 02:18

Copyright 2015-2025 djqfx

返回顶部