黄岩做网站公司电话,做网站登入见面,新泰市住房和城乡建设局网站,wordpress如何实现精确查询lift()变换原理
这些变换虽然功能各有不同#xff0c;但实质上都是针对事件序列的处理和再发送。而在RxJava的内部#xff0c;它们是基于同一个基础的变换方法#xff1a;lift()。
首先看一下lift() 的内部实现#xff08;仅显示了部分主要逻辑代码):
public R …lift()变换原理
这些变换虽然功能各有不同但实质上都是针对事件序列的处理和再发送。而在RxJava的内部它们是基于同一个基础的变换方法lift()。
首先看一下lift() 的内部实现仅显示了部分主要逻辑代码):
public R ObservableR lift(Operator? extends R, ? super T operator) {return Observable.create(new OnSubscribeR() {Overridepublic void call(Subscriber subscriber) {Subscriber newSubscriber operator.call(subscriber);newSubscriber.onStart();onSubscribe.call(newSubscriber);}});
}方法用于将当前的 Observable 对象转换成另一种类型的 Observable 对象。它接受一个 Operator 参数用于定义转换的规则。返回的是一个新的 Observable 对象。
它创建了一个新的 Observable 对象并且将 operator 对象作用于当前 Observable 对象的订阅过程中。
在 Observable.create 方法中通过创建一个匿名内部类实现了 OnSubscribe 接口的 call 方法。在 call 方法中首先通过调用 operator.call(subscriber)将原始的 Subscriber 对象转换成一个新的 Subscriber 对象 newSubscriber。然后调用 newSubscriber.onStart() 方法进行一些初始化操作。最后调用 onSubscribe.call(newSubscriber)将转换后的 newSubscriber 对象传递给原始的 onSubscribe 对象进行订阅操作。
类似于这个图(别的地方扒下来的) RxJava不建议开发者自定义Operator来直接使用lift()而是建议尽量使用已有的lift()包装方法如map()、flatMap()等进行组合来实现需求因为直接使用lift()非常容易发生一些难以发现的错误。 线程控制Scheduler
在不指定线程的情况下RxJava遵循的是线程不变的原则即在哪个线程调用subscribe()方法就在哪个线程生产事件在哪个线程生产事件就在哪个线程消费事件。也就是说事件的发出和消费都是在同一个线程的。观察者模式本身的目的就是『后台处理前台回调』的异步机制因此异步对于RxJava是至关重要的。而要实现异步则需要用到RxJava的另一个概念Scheduler。
Scheduler简介
在RxJava中,Scheduler相当于线程控制器通过使用 Scheduler 可以实现事件的异步处理和线程切换。Scheduler 可以指定事件发送和处理所在的线程从而实现异步的操作RxJava 提供了多种类型的 Scheduler
Schedulers.immediate(): 直接在当前线程运行相当于不指定线程。这是默认的Scheduler。Schedulers.newThread(): 总是启用新线程并在新线程执行操作。Schedulers.io(): I/O 操作读写文件、读写数据库、网络信息交互等所使用的Scheduler。行为模式和newThread()差不多区别在于io() 的内部实现是是用一个无数量上限的线程池可以重用空闲的线程因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中可以避免创建不必要的线程。Schedulers.computation(): 计算所使用的Scheduler。这个计算指的是CPU密集型计算即不会被I/O等操作限制性能的操作例如图形的计算。这个Scheduler 使用的固定的线程池大小为CPU核数。不要把I/O操作放在computation()中否则I/O操作的等待时间会浪费CPU。另外Android还有一个专用的AndroidSchedulers.mainThread()它指定的操作将在Android主线程运行。
有了这几个Scheduler就可以使用subscribeOn()和observeOn()两个方法来对线程进行控制了。subscribeOn()指定subscribe()所发生的线程即Observable.OnSubscribe()被激活时所处的线程或者叫做事件产生的线程。observeOn()指定Subscriber所运行在的线程或者叫做事件消费的线程。
Observable.just(Hello).subscribeOn(Schedulers.io()) // 在 IO 线程发送事件.map(str - str World).observeOn(AndroidSchedulers.mainThread()) // 在主线程中处理事件.subscribe(str - {// 更新 UItextView.setText(str);}, throwable - {// 处理错误Log.e(TAG, Error: throwable.getMessage());});上面这段代码中subscribeOn(Schedulers.io())的指定会让创建的事件的内容Hello 、World !将会在IO线程发出而由于observeOn(AndroidScheculers.mainThread()) 的指定因此subscriber()方法设置后的回调中内容的打印将发生在主线程中。事实上这种在subscribe()之前写上两句subscribeOn(Scheduler.io())和observeOn(AndroidSchedulers.mainThread())的使用方式非常常见它适用于多数的***后台线程取数据主线程显示***的程序策略。
Scheduler的原理
我们可以多切换几次线程因为observeOn()指定的是Subscriber的线程而这个Subscriber并不是subscribe() 参数中的Subscriber而是observeOn()执行时的当前Observable所对应的Subscriber即它直接对应的Subscriber。换句话说observeOn() 指定的是它之后的操作所在的线程。所以想要多次切换线程只要在每个想要切换线程的位置调用一次observeOn()即可。
Observable.just(Hello).subscribeOn(Schedulers.io()) // 在 IO 线程执行.observeOn(Schedulers.computation()) // 切换到计算线程执行.map(s - s World).observeOn(AndroidSchedulers.mainThread()) // 切换到主线程执行.subscribe(s - {// 更新 UItextView.setText(s);});如上通过observeOn()的多次调用程序实现了线程的多次切换。 不过不同于observeOn(),subscribeOn()的位置放在哪里都可以但它是只能调用一次的。 subscribeOn()和observeOn()的内部实现也是用的lift()。 具体看图不同颜色的箭头表示不同的线程,subscribeOn()原理图 observeOn()原理图: 从图中可以看出,subscribeOn()和observeOn()都做了线程切换的工作图中的schedule...部位。不同的是,subscribeOn()的线程切换发生在OnSubscribe中即在它通知上一级 OnSubscribe时这时事件还没有开始发送因此subscribeOn()的线程控制可以从事件发出的开端就造成影响而observeOn()的线程切换则发生在它内建的Subscriber中即发生在它即将给下一级Subscriber发送事件时因此observeOn()控制的是它后面的线程。
用一张图来扒的解释当多个subscribeOn()和observeOn()混合使用时线程调度是怎么发生的 图中共有5处含有对事件的操作。由图中可以看出①和②两处受第一个subscribeOn()影响运行在红色线程③和④处受第一个observeOn()的影响运行在绿色线程⑤处受第二个 onserveOn()影响运行在紫色线程而第二个subscribeOn()由于在通知过程中线程就被第一个subscribeOn() 截断因此对整个流程并没有任何影响。这里也就回答了前面的问题当使用了多个subscribeOn()的时候只有第一个subscribeOn()起作用。
在前面讲Subscriber的时候提到过Subscriber的onStart()可以用作流程开始前的初始化。然而onStart()由于在subscribe()发生时就被调用了因此不能指定线程而是只能执行在subscribe()被调用时的线程。这就导致如果onStart()中含有对线程有要求的代码例如在界面上显示一个ProgressBar这必须在主线程执行将会有线程非法的风险因为有时你无法预测subscribe()将会在什么线程执行。
而与Subscriber.onStart()相对应的有一个方法Observable.doOnSubscribe()。它和Subscriber.onStart()同样是在subscribe()调用后而且在事件发送前执行但区别在于它可以指定线程。默认情况下,doOnSubscribe()执行在subscribe()发生的线程而如果在doOnSubscribe()之后有subscribeOn()的话它将执行在离它最近的subscribeOn()所指定的线程。
示例代码
Observable.create(onSubscribe).subscribeOn(Schedulers.io()).doOnSubscribe(new Action0() {Overridepublic void call() {progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行}}).subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);Agera
之前Google发布agera它在Github上的介绍是:Reactive Programming for Android可以进行了解。它为 Android 应用程序提供了一种简单且灵活的方式来处理数据流和事件驱动的编程模型。很轻量化很适合安卓。
但是缺点也很明显与 RxJava 相比Agera 的功能相对较为有限操作符和功能较少。对于一些复杂的数据流操作和并发处理可能需要额外的工作量来实现