在路上

 找回密码
 立即注册
在路上 站点首页 学习 查看内容

RxWeekend——RxJava周末狂欢

2017-2-7 13:40| 发布者: zhangjf| 查看: 482| 评论: 0

摘要: Part 1: RxJava Essentials -- Operators Basic just() 方法可以传入1到9个参数,它们会按照传入的参数的顺序来发射它们。 Observable.empty() 需要一个 Oservable 但是什么都不发射 Observable.never() 传一 ...
Part 1: RxJava Essentials -- Operators Basic

just() 方法可以传入1到9个参数,它们会按照传入的参数的顺序来发射它们。

Observable.empty() 需要一个 Oservable 但是什么都不发射

Observable.never() 传一个不发射数据并永远不会结束的 Observable

Observable.throw() 创建一个不发射数据并且以错误结束的 Observable

repeat()

defer() 在观察者订阅时创建 Observable,而不是创建后立即执行,这篇文章有着更棒的解释:小鄧子:使用RxJava实现延迟订阅

range() 从一个指定的数字开始发射 N 个数字

interval(3, TimeUnit.SECONDES) 轮询时用:参数:指定两次发射时间间隔,时间单位。

timer() 一段时间后才发射 Observable

Filtering

filter(), take(), takeLast()

distinct() 去掉序列中重复项,是作用于一个完整的序列的

distinctUntilChanged() 在一个存在的序列上来创建一个新的不重复发射元素的序列


distinctuntilchanged

first(), last(), firstOrDefault(), lastOrDefault()

skip(), skipLast() 跳过前几个或者最后几个元素

elementAt() 发射指定元素。但如果元素不足可以使用:elementAtOrDefault()

sample(30,TimeUnit.SECONDS) 指定的时间间隔里发射最近一次的数值


sample

throttleFirst() 定时发射第一个元素

timeout() 限时,在指定时间间隔 Observable 不发射值的话, 就会触发 onError() 函数

debounce() 过滤发射速率过快的数据,即:在一个时间间隔过去之后,仍然没有发射的话,则发射最后的那个

Transforming

map() 接收到的对象应用到每个发射的值上

flatMap() 将发射的序列转换成另外一种对象的 Observable 序列,注意:它允许交叉,即 flatMap() 不保证最终生成的 Observable 和源 Observable 发射序列相同。 FlatMap

concatMap() 解决了 flatMap() 交叉的问题,提供了 能把发射值连续在一起的铺平函数,而非合并它们。

flatMapInterable() 类似于 flatMap() 只是它将源数据两两结成对并生成 Iterable,而不是原始数据项和生成的 Observables

switchMap() 和 flatMap() 区别在于每当源 Observable 发射一个新的数据项时,将取消订阅并停止监视之前那个数据项产生的 Observable,并开始监视当前发射的这个。

scan() 累加器,对原始Observable 发射的每项数据都应用一个函数,计算出函数的结果值,并填充回可观测序列,等待下一次发射的数据一起使用。

scan(R, Func2) 用初始值作为第一个发射的值

groupBy() 引用小鄧子的一段话来说是这样的:去这里看更详细的解释,会恍然大悟的:小鄧子-Architecting Android with RxJava

buffer() 将得到一个新的 Observable,这个 Observable 每次发射一组列表值而不是单个发射,你还可以指定它的 skip 值和 timespan 项数据

window() 类似于 buffer(),但它发射的是 Observable 而不是列表

cast() 将源 Observable 中每一项数据都转换成新的类型,转成了一个不同的 Class。

Combining

merge() 多个序列合并在一个最终发射的 Observable. mergeDelayError() 当所有的 Observable 都完成时,再处理有 error 的情况,发射 onError()

zip() 合并两个或多个 Observables 发射出的数据项,根据指定的函数 Func* 变换它们,并发射一个新值

join() 基于时间窗口将两个 Observables 发射的数据结合在一起,组成一个新的 Observable。它可以控制每个 Observable 产生结果的生命周期,在每个结果的生命周期内,可以与另一个 Observable 产生的结果按照一定的规则进行合并!


join

蓝线和粉色的线表示对应的Observable 上的元素的生命周期。Android RxJava使用介绍(四) RxJava的操作符

combineLatest() 像 zip() 的特殊形式,zip()作用于最近未打包的两个 Observables,相反 combineLatest() 作用于最近发射的数据项
combinelatest and(), then(), when(): 如下:
  1. Pattern2<O1, O2> pattern = JoinObservable.from(obserable1).and(obserable2);
  2. Plan0<O1> plan = pattern.then(this::updateTitle);
  3. JoinObservable.when(plan).toObservable().observeOn(…).subscribe(…);
复制代码

解释:两个发射序列 obserable1 和 obserable2 通过 and 链接。使用 pattern 对象创建 Plan 对象,然后使用 when...(好吧,我想不到使用场景...)


and_then_when

switch() 将一个发射多个 Observables 的 Observable 转换成另一个单独的 Observable,后者发射那些 Observables 最近发射的数据项,注:当源 Observable 发射一个新的 Observable 时,switch() 会立即取消订阅前一个发射数据的 Observable,然后订阅一个新的 Observable,并开始发射它的数据。

startWith() 与 concat() 对应,通过传一个参数来先发射一个数据序列

Part 2: Tips Tips1
  1. // Our sources (left as an exercise for the reader)
  2. Observable<Data> memory = ...;
  3. Observable<Data> disk = ...;
  4. Observable<Data> network = ...;
  5. // Retrieve the first source with data
  6. Observable<Data> source = Observable
  7. .concat(memory, disk, network)
  8. .first();
  9. //先取 memory 中的数据,如果有,就取出,然后停止检索队列;没有就取 disk 的数据,有就取出,然后停止检索队列;最后才是网络请求
复制代码
  1. //持久化数据or缓存数据
  2. Observable<Data> networkWithSave = network.doOnNext(new Action1<Data>() {
  3. @Override public void call(Data data) {
  4. saveToDisk(data);
  5. cacheInMemory(data);
  6. }
  7. });
  8. Observable<Data> diskWithCache = disk.doOnNext(new Action1<Data>() {
  9. @Override public void call(Data data) {
  10. cacheInMemory(data);
  11. }
  12. });
  13. //现在,如果你使用 networkWithSave 和 diskWithCache,数据将会在加载后自动保存
复制代码
  1. //处理陈旧数据
  2. Observable<Data> source = Observable
  3. .concat(memory, diskWithCache, networkWithSave)
  4. .first(new Func1<Data, Boolean>() {
  5. @Override public Boolean call(Data data) {
  6. return data.isUpToDate();//需要 update 的话,则筛选掉该数据源,检索下一个数据源
  7. }
  8. });//注:first() 和 takeFirst() 区别在于,如果没有符合的数据源,first() 会抛 NoSuchElementException 异常
复制代码
Tips2

.subsribeOn() 操作符可以改变Observable应该在哪个调度器上执行任务。

.observeOn() 操作符可以改变Observable将在哪个调度器上发送通知。

另外,默认情况下,链上的操作符将会在调用 .subsribeOn()的那个线程上执行任务。如下:

  1. Observable.just(1,2,3)
  2. .subscribeOn(Schedulers.newThread())
  3. .flatMap(/** 与UI线程无关的逻辑**//)//会在 subscribeOn() 指定的线程上执行任务
  4. .observeOn(AndroidSchedulers.mainThread())
  5. .subscribe();
复制代码
Tips3

Backpressure(背压): 事件产生的速度比消费快(在 producer-consumer(生产者-消费者) 模式中)。发生 overproucing 后,当链式结构不能承受数据压力时,就会抛出 MissingBackpressureException 异常。
最常见的 Backpressure 就是连续快速点击按钮....

Tips4

再重用操作符的方式上,使用 compose(),而不是 flatMap():


compose_flatmap Tips5

Schedulers:

将一个耗时的操作,通过 Scehdulers.io() 放到 I/O 线程中去处理

  1. public static void storeBitmap(Context context, Bitmap bitmap, String filename){
  2. Schedulers.io().createWorker().schedule(() -> {
  3. blockingStoreBitmap(context, bitmap, filename);
  4. })
  5. }
复制代码
Tips6

subject 可以同时是一个 Observable 也可以是一个 Observer,一个 Subject 可以订阅一个 Observable,就像一个观察者,并发射新数据,或者传递它接受到的数据,就像一个 Observable。see more

对于空的 subscribe() 意为仅仅是为了开启 Observable,而不用管已发出的值。

在 subscriber.onNext 或 subscriber.onCompleted() 前检测观察者的订阅情况,使代码更高效,因为如果没有观察者等待时我们就不生成没必要的数据项。就像这样:

  1. if (!subscriber.isUnsubscribed()){//避免生成不必要的数据项
  2. return;
  3. }
  4. subscriber.onNext();
  5. if (!subscriber.isUnsubscribed()){
  6. subscriber.onCompleted();
  7. }
复制代码
Tips7

我觉得这个 Tips 是最有用的

先祭出两个工具类

对于 SchedulersCompat 类,我们的目的,是为了写出这样的代码:

  1. .compose(SchedulersCompat.<SomeEntity>applyExecutorSchedulers());
复制代码

场景是这样的:work thread 中处理数据,然后 UI thread 中处理结果。当然,我们知道是要使用 subscribeOn() 和 observeOn() 进行处理。最常见的场景是,调server 的 API 接口取数据的时候,那么,那么多接口,反复写这两个操作符是蛋疼的,为了避免这种情况,我们可以通过 compse() 操作符来实现复用,上面这段代码就实现了这样的功能。

SchedulersCompat 类中有这么一段 Schedulers.from(ExecutorManager.eventExecutor),哇喔,这里ExecutorManager 类里维护了一个线程池!目的呢!避免线程反复创建,实现线程复用!!!这样,我就不需要每次都通过Schedulers.newThread()来实现了!!

如果你想了解更多,关于 compose()操作符,可以看这里:小鄧子-避免打断链式结构:使用.compose( )操作符

对于这个 Tips, 我给出一个项目实例:RxFace,这是我在做一个人脸识别的 demo 的时候所写的,用了 RxJava, retrofit, Okhttp。我在v1.1版本的时候增加通过compose()操作符复用 subscribeOn() 和 observeOn() 的逻辑。觉得还 OK 的话,可以点个 star 喔,哈哈

  1. /**
  2. * 这个类是 小鄧子 提供的!
  3. */
  4. public class SchedulersCompat {
  5. private static final Observable.Transformer computationTransformer =
  6. new Observable.Transformer() {
  7. @Override public Object call(Object observable) {
  8. return ((Observable) observable).subscribeOn(Schedulers.computation())
  9. .observeOn(AndroidSchedulers.mainThread());
  10. }
  11. };
  12. private static final Observable.Transformer ioTransformer = new Observable.Transformer() {
  13. @Override public Object call(Object observable) {
  14. return ((Observable) observable).subscribeOn(Schedulers.io())
  15. .observeOn(AndroidSchedulers.mainThread());
  16. }
  17. };
  18. private static final Observable.Transformer newTransformer = new Observable.Transformer() {
  19. @Override public Object call(Object observable) {
  20. return ((Observable) observable).subscribeOn(Schedulers.newThread())
  21. .observeOn(AndroidSchedulers.mainThread());
  22. }
  23. };
  24. private static final Observable.Transformer trampolineTransformer = new Observable.Transformer() {
  25. @Override public Object call(Object observable) {
  26. return ((Observable) observable).subscribeOn(Schedulers.trampoline())
  27. .observeOn(AndroidSchedulers.mainThread());
  28. }
  29. };
  30. private static final Observable.Transformer executorTransformer = new Observable.Transformer() {
  31. @Override public Object call(Object observable) {
  32. return ((Observable) observable).subscribeOn(Schedulers.from(ExecutorManager.eventExecutor))
  33. .observeOn(AndroidSchedulers.mainThread());
  34. }
  35. };
  36. /**
  37. * Don't break the chain: use RxJava's compose() operator
  38. */
  39. public static <T> Observable.Transformer<T, T> applyComputationSchedulers() {
  40. return (Observable.Transformer<T, T>) computationTransformer;
  41. }
  42. public static <T> Observable.Transformer<T, T> applyIoSchedulers() {
  43. return (Observable.Transformer<T, T>) ioTransformer;
  44. }
  45. public static <T> Observable.Transformer<T, T> applyNewSchedulers() {
  46. return (Observable.Transformer<T, T>) newTransformer;
  47. }
  48. public static <T> Observable.Transformer<T, T> applyTrampolineSchedulers() {
  49. return (Observable.Transformer<T, T>) trampolineTransformer;
  50. }
  51. public static <T> Observable.Transformer<T, T> applyExecutorSchedulers() {
  52. return (Observable.Transformer<T, T>) executorTransformer;
  53. }
  54. }
复制代码
  1. /**
  2. * 这个类也是 小鄧子 提供的!!
  3. */
  4. public class ExecutorManager {
  5. public static final int DEVICE_INFO_UNKNOWN = 0;
  6. public static ExecutorService eventExecutor;
  7. //private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
  8. private static final int CPU_COUNT = ExecutorManager.getCountOfCPU();
  9. private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
  10. private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
  11. private static final int KEEP_ALIVE = 1;
  12. private static final BlockingQueue<Runnable> eventPoolWaitQueue = new LinkedBlockingQueue<>(128);
  13. private static final ThreadFactory eventThreadFactory = new ThreadFactory() {
  14. private final AtomicInteger mCount = new AtomicInteger(1);
  15. public Thread newThread(@NonNull Runnable r) {
  16. return new Thread(r, "eventAsyncAndBackground #" + mCount.getAndIncrement());
  17. }
  18. };
  19. private static final RejectedExecutionHandler eventHandler =
  20. new ThreadPoolExecutor.CallerRunsPolicy();
  21. static {
  22. eventExecutor =
  23. new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS,
  24. eventPoolWaitQueue, eventThreadFactory, eventHandler);
  25. }
  26. /**
  27. * Linux中的设备都是以文件的形式存在,CPU也不例外,因此CPU的文件个数就等价与核数。
  28. * Android的CPU 设备文件位于/sys/devices/system/cpu/目录,文件名的的格式为cpud+。
  29. *
  30. * 引用:http://www.jianshu.com/p/f7add443cd32#,感谢 liangfeizc :)
  31. * https://github.com/facebook/device-year-class
  32. */
  33. public static int getCountOfCPU() {
  34. if (Build.VERSION.SDK_INT <= Build.VERSION_CODES.GINGERBREAD_MR1) {
  35. return 1;
  36. }
  37. int count;
  38. try {
  39. count = new File("/sys/devices/system/cpu/").listFiles(CPU_FILTER).length;
  40. } catch (SecurityException | NullPointerException e) {
  41. count = DEVICE_INFO_UNKNOWN;
  42. }
  43. return count;
  44. }
  45. private static final FileFilter CPU_FILTER = new FileFilter() {
  46. @Override public boolean accept(File pathname) {
  47. String path = pathname.getName();
  48. if (path.startsWith("cpu")) {
  49. for (int i = 3; i < path.length(); i++) {
  50. if (path.charAt(i) < '0' || path.charAt(i) > '9') {
  51. return false;
  52. }
  53. }
  54. return true;
  55. }
  56. return false;
  57. }
  58. };
  59. }
复制代码

来自: http://www.jianshu.com/p/ce228f517586

最新评论

小黑屋|在路上 ( 蜀ICP备15035742号-1 

;

GMT+8, 2025-7-9 08:37

Copyright 2015-2025 djqfx

返回顶部