科技感的网站,中国最好网站建设公司,免费企业,网站后台管理系统开发文章目录 0、demo数据1、源算子Source2、从集合中读取数据3、从文件中读取4、从Socket读取5、从Kafka读取6、从数据生成器读取数据7、Flink支持的数据类型8、Flink的类型提示#xff08;Type Hints#xff09; 0、demo数据
准备一个实体类WaterSensor#xff1a;
Data
All… 文章目录 0、demo数据1、源算子Source2、从集合中读取数据3、从文件中读取4、从Socket读取5、从Kafka读取6、从数据生成器读取数据7、Flink支持的数据类型8、Flink的类型提示Type Hints 0、demo数据
准备一个实体类WaterSensor
Data
AllArgsConstructor
NoArgsConstructor
public class WaterSensor{private String id; //水位传感器类型private Long ts; //传感器记录时间戳private Integer vc; //水位记录
}
//注意所有属性的类型都是可序列化的如果属性类型是自定义类那要实现Serializable接口模块下准备个文件words.txt内容
hello flink
hello world
hello java1、源算子Source
Flink可以从各种来源获取数据然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源data source而读取数据的算子就是源算子source operator。 Flink1.12以前添加数据源的方式是调用执行环境对象的addSource方法
DataStreamString stream env.addSource(...);
//方法传入的参数是一个“源函数”source function需要实现SourceFunction接口Flink1.12开始的流批统一的Source框架下则是
DataStreamSourceString stream env.fromSource(…)2、从集合中读取数据
调用执行环境对象的fromCollection方法进行读取。这相当于将数据临时存储到内存中形成特殊的数据结构后作为数据源使用一般用于测试
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();ListInteger data Arrays.asList(1, 22, 33);DataStreamSourceInteger ds env.fromCollection(data);stream.print(); //直接打印env.execute();
}
还可以直接fromElements方法
DataStreamSourceInteger ds env.fromElements(1,22,33);3、从文件中读取
从文件中读是批处理中最常见的读取方式比如读取某个日志文件。首先需要引入文件连接器依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version
/dependencypublic static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//第三个参数为自定义的sourceNameFileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(input/word.txt)).build();env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),file).print();env.execute();
}
FileSource数据源对象的创建传参可以是目录也可以是文件可以相对、绝对路径也可从HDFS目录下读开头格式hdfs://…相对路径是从系统属性user.dir获取路径idea下是project的根目录standalone模式下是集群节点根目录之前的env.readTextFile方法被标记为过时是因为底层调用了addSource
4、从Socket读取
前面的文件和集合都是有界流而Socket常用于调试阶段模拟无界流
DataStreamString stream env.socketTextStream(localhost, 9527);# 对应的主机执行
nc -lk 95275、从Kafka读取
数据源是外部系统常需要导入对应的连接器的依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version
/dependency
实例
public class SourceKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop01:9092,hadoop02:9092,hadoop03:9092) //指定Kafka节点的端口和地址.setTopics(topic_1) //消费的Topic.setGroupId(code9527) //消费者组id//Flink程序做为Kafka的消费者要进行对象的反序列化setDeserializer对key和value都生效.setStartingOffsets(OffsetsInitializer.latest()) //指定Flink消费Kafka的策略.setValueOnlyDeserializer(new SimpleStringSchema()) //反序列化Value的反序列化器.build();DataStreamSourceString stream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafka-source);stream.print(Kafka);env.execute();}
}//很多传参CtrlP看源码类型、CtrlH实现类自行分析Kafaka的消费者参数
earliest有offset就从offset继续消费没offset就从最早开始消费latest有offset就从offset继续消费没offset就从最新开始消费
Flink下的KafkaSourceoffset消费策略有个初始化器OffsetInitializer默认是earliest
earliest一定从最早消费latest一定从最新消费
注意和Kafka自身的区别。
6、从数据生成器读取数据
Flink从1.11开始提供了一个内置的DataGen 连接器主要是用于生成一些随机数来调试。1.17版本提供了新写法导入依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion${flink.version}/version
/dependencypublic class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//env.setParallelism(1);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number:value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(10),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), datagenerator).print();env.execute();}
}
new数据生成器源对象有四个参数
第一个为GeneratorFunction接口key为Long型value为需要map转换后的类型。需要实现map方法输入方法固定是Long类型第二个为自动生成数字的最大值long型到这个值就停止生成第三个为限速策略比如每秒生成几个第四个为返回的数据类型Types.xxTypes类是Flink包下的 嘶并行度默认为CPU核心数了输出算子6个子任务且是每个并行度上是各自自增的先按总数/并行度划分再各自执行比如最大值100并行度2那一个从0开始另一个从50到99。数字打印出来看着有点乱了改下并行度
env.setParallelism(1);可以看到程序结束了相当于有界流了想模拟无界流可以第二个参数传Long.MAX_VALUE这就一直输出了
7、Flink支持的数据类型 Flink使用类型信息TypeInformation来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性并为每个数据类型生成特定的序列化器、反序列化器和比较器 对于Java和Scale常见的数据类型Flink都支持在Types工具类中可以看到 Flink支持所有自定义的Java类和Scala类但要符合以下要求
类是公有public的有一个无参的构造方法所有属性都是可访问的即公有public或privategetter、setter类中所有属性的类型都是可以序列化的
不满足以上要求的类会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒无法获取它们内部的属性它们也不是由Flink本身序列化的而是由Kryo序列化的。
8、Flink的类型提示Type Hints
Flink还具有一个类型提取系统可以分析函数的输入和返回类型自动获取类型信息从而获得对应的序列化器和反序列化器。但是由于Java中泛型擦除的存在在某些特殊情况下比如Lambda表达式中自动提取的信息是不够精细的需要我们手动显示提供类型信息。
之前的word count流处理程序我们在将String类型的每个词转换成word count二元组后就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式系统只能推断出返回的是Tuple2类型而无法得到Tuple2String, Long。只有显式地告诉系统当前的返回类型才能正确地解析出完整数据。
//....
.map(word - Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));