青岛网站专业制作,做电商网站需要会些什么条件,网站开发合同样本,网站模版编辑器在教学和指导RxJava以及撰写本书之后 #xff0c;我注意到某些领域尤其成问题。 我决定发布一些简短的提示#xff0c;以解决最常见的陷阱。 这是第一部分。 Observable和Flowable本质上是惰性的。 这意味着无论您在Flowable放置了多么繁琐或长时间运行的逻辑#xff0c;仅… 在教学和指导RxJava以及撰写本书之后 我注意到某些领域尤其成问题。 我决定发布一些简短的提示以解决最常见的陷阱。 这是第一部分。 Observable和Flowable本质上是惰性的。 这意味着无论您在Flowable放置了多么繁琐或长时间运行的逻辑仅当有人订阅时它才会被评估。 并且还有某人订阅的次数。 下面的代码段对此进行了说明 private static String slow() throws InterruptedException {logger.info(Running);TimeUnit.SECONDS.sleep(1);return abc;
}//...Flowable flo Flowable.fromCallable(this::slow);
logger.info(Created);
flo.subscribe();
flo.subscribe();
logger.info(Done); 这样的Observable或Flowable将不可避免地打印 19:37:57.368 [main] - Created
19:37:57.379 [main] - Running
19:37:58.383 [main] - Running
19:37:59.388 [main] - Done 请注意您需要两次支付sleep()的价格sleep()两次订阅。 此外所有逻辑都在客户端 main 线程中运行除非通过subscriptionOn subscribeOn()请求或异步流隐式可用否则RxJava中没有隐式线程。 问题是我们是否可以热切地强制运行订阅逻辑以便每当有人订阅该流时就已经对其进行了预先计算或至少开始了计算 完全渴望评估 最明显但有缺陷的解决方案是急于计算流返回的任何内容并简单地将其包装为固定的Flowable FlowableString eager() {final String slow slow();return Flowable.just(slow);
} 不幸的是这大大破坏了RxJava的目的。 首先像subscribeOn()这样的运算符将不再起作用并且无法将计算卸载到其他线程。 更糟糕的是即使eager()返回了Flowable但按照定义它将始终阻止客户端线程。 很难推理组合和管理此类流。 通常即使需要进行急切的评估也应避免使用这种模式而应选择延迟加载。 使用 下一个示例仅使用cache()运算符 FlowableString eager3() throws InterruptedException {final FlowableString cached Flowable.fromCallable(this::slow).cache();cached.subscribe();return cached;
} 这个想法很简单用惰性Flowable包装计算并缓存它。 cache()运算符所做的是它会记住第一次订阅时发出的所有事件以便在出现第二个Subscriber 它将接收相同的事件缓存序列。 但是cache()运算符像大多数其他运算符一样是惰性的因此我们必须第一次强制订阅。 调用subscribe()将预填充缓存此外如果第二个订户出现在slow()计算完成之前它将同样等待它而不是第二次启动它。 此解决方案有效但请记住由于未涉及Scheduler subscribe()实际上将被阻止。 如果要在后台预填充Flowable 请尝试subscribeOn() FlowableString eager3() throws InterruptedException {final FlowableString cached Flowable.fromCallable(this::slow).subscribeOn(justDontAlwaysUse_Schedulers.io()).cache();cached.subscribe();return cached;
} 是的在生产系统上使用Schedulers.io()存在问题且难以维护因此请避免使用自定义线程池。 错误处理 令人遗憾的是吞噬RxJava中的异常非常容易。 如果slow()方法失败这就是我们上一个示例中可能发生的情况。 异常不会完全被吞没但是默认情况下如果没有人对此感兴趣它将在System.err上打印堆栈跟踪。 同样未处理的异常也包装在OnErrorNotImplementedException 。 如果您执行任何形式的集中式日志记录则不太方便很可能会丢失。 您可以使用doOnError()操作进行日志记录但它仍然通过例外下游RxJava认为未处理的为好一次包装与OnErrorNotImplementedException 。 因此让我们在subscribe()实现onError回调 FlowableString eager3() throws InterruptedException {final FlowableString cached Flowable.fromCallable(this::slow).cache();cached.subscribe(x - {/* ignore */},e - logger.error(Prepopulation error, e));return cached;
} 我们不想处理实际事件而只是处理subscribe()错误。 此时您可以安全地返回此类Flowable 。 急切且有希望的是只要您订阅了它数据就已经可用。 注意例如Hystrix的observe()方法也很急切而懒惰的toObservable()相反。 这是你的选择。 翻译自: https://www.javacodegeeks.com/2017/08/eager-subscription-rxjava-faq.html