网站建设网页设计公司,网站服务器容器,做买家秀的网站,郑州网站建设方案phpRxJava 2.x中共有三个无缝相似的运算符#xff1a; flatMap() #xff0c; concatMap()和concatMapEager() 。 它们都接受相同的参数-从原始流的单个项目到任意类型的#xff08;子#xff09;流的函数。 换句话说#xff0c;如果您有FlowableT则可以为任意R类型提… RxJava 2.x中共有三个无缝相似的运算符 flatMap() concatMap()和concatMapEager() 。 它们都接受相同的参数-从原始流的单个项目到任意类型的子流的函数。 换句话说如果您有FlowableT则可以为任意R类型提供从T到FlowableR的函数。 应用任何这些运算符后您最终得到FlowableR 。 那么它们有何不同 样例项目 首先让我们构建一个示例应用程序。 我们将使用Retrofit2 HTTP客户端包装器该包装器具有RxJava2的内置插件。 我们的任务是利用GeoNames API来查找世界上任何城市的人口。 该界面如下所示 public interface GeoNames {FlowableLong populationOf(String city);} 该接口的实现由Retrofit自动生成向下滚动以查看胶粘源代码。 暂时假设我们有一个函数该函数采用具有城市名称的String并异步返回具有该城市人口的单元素流。 还要假设我们有固定的城市要查找 FlowableString cities Flowable.just(Warsaw, Paris, London, Madrid
); 我们的目标是获取每个城市的人口。 带有concatMap()的示例应用程序如下所示 cities.concatMap(geoNames::populationOf).subscribe(response - log.info(Population: {}, response)); 在看到结果之前让我们研究一下concatMap()在做什么。 对于每个上游事件 城市 它都调用一个函数该函数用子流替换该事件。 在我们的情况下它是Long的一元流 FlowableLong 。 因此与所有运算符进行比较之后我们最终得到的是Long流 FlowableFlowableLong 流。 当我们分析操作员为展平此类嵌套流所做的操作时就会出现真正的区别。 concatMap()将首先订阅第一concatMap()流 FlowableLong代表华沙的人口。 订阅实际上是指进行物理HTTP调用。 仅当第一concatMap()流完成时在我们的情况下发出单个Long并发出完成信号 concatMap()才会继续。 继续意味着订阅第二个子流并等待其完成。 最后一个子流完成时结果流完成。 这导致了随后的潮流170213921385517556900和3255944。因此恰好是华沙巴黎伦敦和马德里的人口。 输出顺序完全可以预测。 但是它也是完全顺序的。 完全没有并发发生只有在第一个HTTP结束时才进行第二个HTTP调用。 RxJava所增加的复杂性根本没有回报 23:33:33.531 | Rx-1 | -- GET .../searchJSON?qWarsaw http/1.1
23:33:33.656 | Rx-1 | -- 200 OK .../searchJSON?qWarsaw (123ms)
23:33:33.674 | Rx-1 | Population: 1702139
23:33:33.676 | Rx-1 | -- GET .../searchJSON?qParis http/1.1
23:33:33.715 | Rx-1 | -- 200 OK .../searchJSON?qParis (38ms)
23:33:33.715 | Rx-1 | Population: 2138551
23:33:33.716 | Rx-1 | -- GET .../searchJSON?qLondon http/1.1
23:33:33.754 | Rx-1 | -- 200 OK .../searchJSON?qLondon (37ms)
23:33:33.754 | Rx-1 | Population: 7556900
23:33:33.755 | Rx-1 | -- GET .../searchJSON?qMadrid http/1.1
23:33:33.795 | Rx-1 | -- 200 OK .../searchJSON?qMadrid (40ms)
23:33:33.796 | Rx-1 | Population: 3255944 如您所见没有多线程发生请求是顺序的彼此等待。 从技术上讲并非所有这些都必须在同一线程中发生但是它们绝不会重叠并且可以利用并发性。 最大的好处是可以保证结果事件的顺序一旦我们进入flatMap() 就不会那么明显了…… flatMap()代码几乎完全相同 cities.flatMap(geoNames::populationOf).subscribe(response - log.info(Population: {}, response)); 就像之前一样我们从Long流开始 FlowableFlowableLong 。 但是 flatMap()运算符不是一次又一次地订阅每个子流而是急切地一次订阅所有子流。 这意味着我们看到在不同线程中同时启动多个HTTP请求 00:10:04.919 | Rx-2 | -- GET .../searchJSON?qParis http/1.1
00:10:04.919 | Rx-1 | -- GET .../searchJSON?qWarsaw http/1.1
00:10:04.919 | Rx-3 | -- GET .../searchJSON?qLondon http/1.1
00:10:04.919 | Rx-4 | -- GET .../searchJSON?qMadrid http/1.1
00:10:05.449 | Rx-3 | -- 200 OK .../searchJSON (529ms)
00:10:05.462 | Rx-3 | Population: 7556900
00:10:05.477 | Rx-1 | -- 200 OK .../searchJSON (557ms)
00:10:05.478 | Rx-1 | Population: 1702139
00:10:05.751 | Rx-4 | -- 200 OK .../searchJSON (831ms)
00:10:05.752 | Rx-4 | Population: 3255944
00:10:05.841 | Rx-2 | -- 200 OK .../searchJSON (922ms)
00:10:05.843 | Rx-2 | Population: 2138551 当任何基础子流中的任何一个发出任何值时它将立即向下游传递给订户。 这意味着我们现在可以在事件发生时即时处理事件。 请注意结果流是乱序的。 我们收到的第一个事件是7556900恰好是伦敦的人口在第一流中排名第二。 与concatMap()相反 flatMap()无法保留顺序因此以“随机”顺序发出值。 好吧不是真正随机的我们只是在它们可用时立即接收值。 在此特定执行中首先是针对伦敦的HTTP响应但绝对不能保证。 这导致一个有趣的问题。 我们有各种各样的人口价值流和最初的城市流。 但是输出流可以是事件的任意排列并且我们不知道哪个人口对应哪个城市。 我们将在后续文章中解决此问题。 concatMapEager()似乎带来了两全其美并发性和输出事件的有保证顺序 cities.concatMapEager(geoNames::populationOf).subscribe(response - log.info(Population: {}, response)); 在了解了concatMap()和flatMap()功能之后了解concatMapEager()相当简单。 急切地让流concatMapEager()流 duh 同时预订所有子流。 但是此运算符可确保首先传播第一个子流的结果即使它不是要完成的第一个子流也是如此。 一个示例将Swift揭示这意味着什么 00:34:18.371 | Rx-2 | -- GET .../searchJSON?qParis http/1.1
00:34:18.371 | Rx-3 | -- GET .../searchJSON?qLondon http/1.1
00:34:18.371 | Rx-4 | -- GET .../searchJSON?qMadrid http/1.1
00:34:18.371 | Rx-1 | -- GET .../searchJSON?qWarsaw http/1.1
00:34:18.517 | Rx-3 | -- 200 OK .../searchJSON?qLondon (143ms)
00:34:18.563 | Rx-1 | -- 200 OK .../searchJSON?qWarsaw (189ms)
00:34:18.565 | Rx-1 | Population: 1702139
00:34:20.460 | Rx-2 | -- 200 OK .../searchJSON?qParis (2086ms)
00:34:20.460 | Rx-4 | -- 200 OK .../searchJSON?qMadrid (2086ms)
00:34:20.461 | Rx-2 | Population: 2138551
00:34:20.462 | Rx-2 | Population: 7556900
00:34:20.462 | Rx-2 | Population: 3255944 我们立即启动四个HTTP请求。 从日志输出中我们可以清楚地看到伦敦的居民首先被返回。 但是订户没有收到它因为华沙尚未到来。 巧合的是华沙排名第二因此华沙人口可以在下游传递给订户。 不幸的是伦敦人口必须等待更多因为首先我们需要巴黎人口。 巴黎紧随其后是马德里完成后所有剩余结果都将传递到下游。 请注意即使人口充足伦敦的人口也必须等待休眠直到华沙和巴黎完成。 那么concatMapEager()是最好的并发运算符吗 不完全的。 想象一下我们有一个数千个城市的列表每一个城市我们都获取一张1MB的图片。 使用concatMap()我们可以依次即缓慢concatMap()下载图片。 使用flatMap()可以同时下载图片并在图片到达时尽快进行处理。 现在 concatMapEager()呢 在最坏的情况下我们可以使用concatMapEager()缓存999张图片因为来自第一个城市的图片恰好是最慢的。 即使我们已经拥有99.9的结果但由于我们执行严格的排序因此我们无法对其进行处理。 使用哪个运算符 flatMap()应该是您的首选武器。 它允许与流行为进行有效的并发。 但是要准备好接收乱序的结果。 仅当提供的转换速度如此之快顺序处理不是问题时 concatMap()才能很好地工作。 concatMapEager()非常方便但是要注意内存消耗。 同样在最坏的情况下您可能最终会闲置等待很少的响应。 附录配置Retrofit2客户端 实际上我们在本文中始终使用的GeoNames服务接口如下所示 public interface GeoNames {GET(/searchJSON)SingleSearchResult search(Query(q) String query,Query(maxRows) int maxRows,Query(style) String style,Query(username) String username);default FlowableLong populationOf(String city) {return search(city, 1, LONG, s3cret).map(SearchResult::getGeonames).map(g - g.get(0)).map(Geoname::getPopulation).toFlowable();}} 非默认方法的实现由Retrofit2自动生成。 请注意为简单起见 populationOf()返回一个元素的FlowableLong 。 但是要完全拥抱此API的本质在现实世界中其他实现将更为合理。 首先 SearchResult类返回结果的有序列表省略了获取器/设置器 class SearchResult {private ListGeoname geonames new ArrayList();
}class Geoname {private double lat;private double lng;private Integer geonameId;private Long population;private String countryCode;private String name;
} 毕竟世界上有许多华沙和伦敦 。 我们默默假设列表将包含至少一个元素而第一个是正确的匹配。 更合适的实现应返回所有匹配甚至返回更好的MaybeLong类型以反映没有匹配项 default MaybeLong populationOf(String city) {return search(city, 1, LONG, nurkiewicz).flattenAsFlowable(SearchResult::getGeonames).map(Geoname::getPopulation).firstElement();
} 粘合代码如下所示。 首先Jackson的设置以便解析来自API的响应 import com.fasterxml.jackson.databind.ObjectMapper;private ObjectMapper objectMapper() {return new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
} FAIL_ON_UNKNOWN_PROPERTIES通常是您想要的。 否则您必须映射JSON响应中的所有字段并且当API生产者引入新的或向后兼容的字段时代码将中断。 然后我们设置OkHttpClient 由Retrofit在下面使用 import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;private OkHttpClient client() {HttpLoggingInterceptor interceptor new HttpLoggingInterceptor();interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);return new OkHttpClient.Builder().addInterceptor(interceptor).build();
} 有时您可以跳过OkHttp客户端的配置但是我们添加了日志拦截器。 默认情况下OkHttp使用java.util.logging日志记录因此为了使用体面的日志记录框架我们必须在开始时就安装网桥 import org.slf4j.bridge.SLF4JBridgeHandler;static {SLF4JBridgeHandler.removeHandlersForRootLogger();SLF4JBridgeHandler.install();
} 最后进行改造 import io.reactivex.schedulers.Schedulers;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.jackson.JacksonConverterFactory;GeoNames createClient() {return new Retrofit.Builder().client(client()).baseUrl(http://api.geonames.org).addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io())).addConverterFactory(JacksonConverterFactory.create(objectMapper())).build().create(GeoNames.class);
} 调用createClient()将产生GeoNames接口的动态实现。 我们使用了以下依赖项 compile io.reactivex.rxjava2:rxjava:2.0.6compile com.squareup.retrofit2:adapter-rxjava2:2.3.0
compile com.squareup.retrofit2:converter-jackson:2.0.1
compile com.squareup.okhttp3:logging-interceptor:3.8.0compile ch.qos.logback:logback-classic:1.1.7
compile org.slf4j:slf4j-api:1.7.21
compile org.slf4j:jul-to-slf4j:1.7.21翻译自: https://www.javacodegeeks.com/2017/08/flatmap-vs-concatmap-vs-concatmapeager-rxjava-faq.html