来自: http://blog.kifile.com/reactivex/2015/12/07/rx_introduce.html 前言 先简单介绍一下 ReactiveX. ReactiveX 并不特指某种编程语言,他应该算是一种编程思维,反应式编程. 反应式编程的核心在于,当触发特定行为逻辑后(对于ReactiveX而言,就是调用了 subscribe 指令),根据进行指定操作,并根据操作结果执行特定操作. 这种编程思维特别适合用于交互式软件上,例如Android,iOS,通常用户触发某个条件(比如说点击操作)后,我们需要根据用户的操作行为, 可能接下来要执行一系列操作,最后再根据操作结果在ui界面上呈现给用户.而ReactiveX 为我们提供了这种交互流程的封装. 下面以RxJava为例,分析一下ReactiveX的框架实现原理. RxJava执行流程 首先奉上一个最简单的 ReactiveX 的代码实现. - 1 Observable.create(new Observable.OnSubscribe<String>() {
- 2 @Override
- 3 public void call(Subscriber<? super String> subscriber) {
- 4 subscriber.onNext("Sample");
- 5 subscriber.onCompleted();
- 6 }
- 7 }).subscribe(new Observer<String>() {
- 8 @Override
- 9 public void onCompleted() {
- 10 System.out.println("Complete");
- 11 }
- 12
- 13 @Override
- 14 public void onError(Throwable e) {
- 15 e.printStackTrace();
- 16 }
- 17
- 18 @Override
- 19 public void onNext(String s) {
- 20 System.out.println("Get:" + s);
- 21 }
- 22 });
复制代码 运行上面的代码,我们可以看到以下输出. 代码执行完毕,让我们看看整个流程的实现逻辑. - 1 private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
- 2 ...
- 3 subscriber.onStart();
- 4 ...
- 5 try {
- 6 hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
- 7 return hook.onSubscribeReturn(subscriber);
- 8 } catch (Throwable e) {
- 9 Exceptions.throwIfFatal(e);
- 10 try {
- 11 subscriber.onError(hook.onSubscribeError(e));
- 12 } catch (Throwable e2) {
- 13 Exceptions.throwIfFatal(e2);
- 14 // if this happens it means the onError itself failed (perhaps an invalid function implementation)
- 15 // so we are unable to propagate the error correctly and will just throw
- 16 RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
- 17 // TODO could the hook be the cause of the error in the on error handling.
- 18 hook.onSubscribeError(r);
- 19 // TODO why aren't we throwing the hook's return value.
- 20 throw r;
- 21 }
- 22 return Subscriptions.unsubscribed();
- 23 }
- 24 }
复制代码 在上面的代码里,会发现一个 hook 对象,这是个什么鬼? 追踪一下,发现原来他是一个RxJava每个方法都会返回一个Observable对象ExecutionHook对象,类图如下: 可以看出,RxJavaObservableExecutionHook中针对RxJava的subscribe流程进行注入,方便自己更改相关逻辑, 当然对于默认的RxJavaObservableExecutionHook,你会发现他并没有做任何处理,如果你想自己实现可以调用 RxJavaPlugin.getInstance() 设置自定义Hook. 看完上面的例子,给人感觉挺简单地啊,而且比较类似Android中的AsyncTask,都是属于执行任务后进行回调,那他相比AsyncTask有什么优势吗? Operator 虽然从上面的例子中,看起来RxJava的确其貌不扬,但是ReactiveX也不止这点技法. 为了扩展ReactiveX的相关属性,在 RxJava 中使用代理模式实现了很多有用的逻辑,例如类型转换,遍历数据个数限制,定时响应等额外特性. 这些逻辑被称为操作(Operator),每一个Operator都继承了Func1(也就是内部会有一个call()方法),RxJava框架会调用lift方法将Operator包装成为Observable: - 1 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
- 2 return new Observable<R>(new OnSubscribe<R>() {
- 3 @Override
- 4 public void call(Subscriber<? super R> o) {
- 5 try {
- 6 Subscriber<? super T> st = hook.onLift(operator).call(o);
- 7 try {
- 8 // new Subscriber created and being subscribed with so 'onStart' it
- 9 st.onStart();
- 10 onSubscribe.call(st);
- 11 } catch (Throwable e) {
- 12 // localized capture of errors rather than it skipping all operators
- 13 // and ending up in the try/catch of the subscribe method which then
- 14 // prevents onErrorResumeNext and other similar approaches to error handling
- 15 Exceptions.throwIfFatal(e);
- 16 st.onError(e);
- 17 }
- 18 } catch (Throwable e) {
- 19 Exceptions.throwIfFatal(e);
- 20 // if the lift function failed all we can do is pass the error to the final Subscriber
- 21 // as we don't have the operator available to us
- 22 o.onError(e);
- 23 }
- 24 }
- 25 });
- 26 }
复制代码 如果想了解更多详细的操作信息,可以点击这里: Operators 其实这里有个问题,既然大部分的Operator都需要这样封装,为什么不直接让Operator对象继承OnSubscribe对象,进而减少方法调用层级? 如果是为了防止OnSubscribe和Func1的方法重名,那么更改函数名就好了啊? 如果为了进行onLift回调,也可以在新类中增加回调调用位置啊? 同时为了避免代码冗余,对于这些方法,RxJava都使用了构造者模式的一种变体,每个方法都会返回一个Observable对象,保证其能够形成类似下面这样的操作链. - 1 Observable.just("Hello", "Operator", "Chain").map(s -> s + " map" )
- 2 .buffer(2).take(1).subscribe(new Subscriber<List<String>>() {
- 3 @Override
- 4 public void onCompleted() {
- 5 System.out.println("Complete");
- 6 }
- 7
- 8 @Override
- 9 public void onError(Throwable e) {
- 10
- 11 }
- 12
- 13 @Override
- 14 public void onNext(List<String> strings) {
- 15 System.out.println("Get:" + strings);
- 16 }
- 17 });
复制代码 输出结果如下: - Get:[Hello map, Operator map]
- Complete
复制代码 可以看到,我们输入了三个String对象,但是只一次输入了两个String,并且两个String 都额外多了一个 map 后缀,当然这也是我想要的结果 使用Single替代Observable 其实在绝大部分的使用场景中,用户触发操作后,对于我们而言返回结果其实只有成功失败两种,而Observable中是有 onNext , onComplete , onError 三种状态,这样看来似乎不太满足需求. 当然ReactiveX中也考虑到了这种情况,它在Observable的基础上衍生出了Single类,这个类的实现机制同Observable近乎相同,只是不过订阅他的不再是 Subscriber 对象,而是 SingleSubscriber . 在 SingleSubscriber 中有 onSuccess 和 onError 两种结果状态,并且只会调用其中一个,恰好满足我们的需求. 使用Scheduler进行多线程调用 按照上面所说,不论是Observable还是Single,其实都是在同一个线程中,不断按照执行逻辑执行代码指令,也就是说他始终是在同一线程中进行执行的. 但是有时候,我们希望能够在异步线程中执行耗时操作,避免ui堵塞,这时候ReactiveX就为我们提供了 Scheduler 类. Scheduler类其实并不负责异步线程处理,他只负责通过 createWorker() 类创建出一个 Worker 对象,真正负责任务的延时处理. 每一个Scheduler类,都会实现自己的Worker类,用于执行Scheduler任务. 我们可以使用ReactiveX中的 subscribeOn 和 observeOn 两个方法,分别设置获取数据的操作和分发消息的操作的执行Scheduler,从而实现数据的异步处理. subscribeOn 和 observeOn 其实都是构建一个Operator对象,在call方法里,使用线程执行数据获取和分发操作. 总结 其实ReactiveX就是一个针对观察者模式的扩展,如果忽略掉ReactiveX框架为我们实现各种的Operator,那么它就是一个简单的设计模式而已. 单就这一点而言,Android的AsyncTask和LoaderManager框架是胜过ReactiveX的,因为他针对Android的生命周期做了处理. 但由于ReactiveX中增加了很多Operator,能够很方便的帮助我们对响应式任务进行操作,不论是类型转换还是异步执行. 反而导致看起来ReactiveX中的比AsyncTask要好用,但我觉得如果吧ReactiveX结合到AsyncTask和LoaderManager中应该会更加的完美. |