美食网站的建设目的,北京有名气的设计事务所,做网站设计哪家好,企业网站建设讲解一、什么是分流
所谓“分流”#xff0c;就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream#xff0c;定义一些筛选条件#xff0c;将符合条件的数据拣选出来放到对应的流里。
二、基于filter算子的简单实现分流
其实根据条件筛选数据的需求…一、什么是分流
所谓“分流”就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream定义一些筛选条件将符合条件的数据拣选出来放到对应的流里。
二、基于filter算子的简单实现分流
其实根据条件筛选数据的需求本身非常容易实现只要针对同一条流多次独立调用.filter()方法进行筛选就可以得到拆分之后的流了。 案例需求读取一个整数数字流将数据流划分为奇数流和偶数流。
package com.flink.DataStream.SplitStream;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkSplitStreamByFilter {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration().set(RestOptions.BIND_PORT, 8081));//.getExecutionEnvironment();//TODO 设置全局并行度为2streamExecutionEnvironment.setParallelism(2);DataStreamSourceString dataStreamSource streamExecutionEnvironment.socketTextStream(localhost, 8888);//TODO 先将输入流转为Integer类型SingleOutputStreamOperatorInteger mapResult dataStreamSource.map((input) - {int i Integer.parseInt(input);return i;});//TODO 使用匿名函数分流偶数流SingleOutputStreamOperatorInteger ds1 mapResult.filter(new FilterFunctionInteger() {Overridepublic boolean filter(Integer a) throws Exception {return a % 2 0;}});//TODO 使用lamda表达式分流奇数流SingleOutputStreamOperatorInteger ds2 mapResult.filter((a) - a % 2 1);ds1.print(偶数流);ds2.print(奇数流);streamExecutionEnvironment.execute();}
}执行结果
奇数流:1 1
偶数流:2 2
偶数流:1 2
偶数流:2 4
奇数流:1 3
奇数流:2 1Process finished with exit code 130 (interrupted by signal 2: SIGINT)这种实现非常简单但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的却重复写了三次。而且这段代码背后的含义是将原始数据流 stream 复制三份然后对每一份分别做筛选这明显是不够高效的。我们自然想到能不能不用复制流直接用一个算子就把它们都拆分开呢
三、使用测输出流
关于处理函数中侧输出流的用法我们已经在 7.5 节做了详细介绍。简单来说只需要调用上下文 ctx 的.output()方法就可以输出任意类型的数据了。而侧输出流的标记和提取都离不开一个“输出标签”OutputTag指定了侧输出流的 id 和类型。