当前位置: 首页 > news >正文

科技感的网站中国最好网站建设公司

科技感的网站,中国最好网站建设公司,免费企业,网站后台管理系统开发文章目录 0、demo数据1、源算子Source2、从集合中读取数据3、从文件中读取4、从Socket读取5、从Kafka读取6、从数据生成器读取数据7、Flink支持的数据类型8、Flink的类型提示#xff08;Type Hints#xff09; 0、demo数据 准备一个实体类WaterSensor#xff1a; Data All… 文章目录 0、demo数据1、源算子Source2、从集合中读取数据3、从文件中读取4、从Socket读取5、从Kafka读取6、从数据生成器读取数据7、Flink支持的数据类型8、Flink的类型提示Type Hints 0、demo数据 准备一个实体类WaterSensor Data AllArgsConstructor NoArgsConstructor public class WaterSensor{private String id; //水位传感器类型private Long ts; //传感器记录时间戳private Integer vc; //水位记录 } //注意所有属性的类型都是可序列化的如果属性类型是自定义类那要实现Serializable接口模块下准备个文件words.txt内容 hello flink hello world hello java1、源算子Source Flink可以从各种来源获取数据然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源data source而读取数据的算子就是源算子source operator。 Flink1.12以前添加数据源的方式是调用执行环境对象的addSource方法 DataStreamString stream env.addSource(...); //方法传入的参数是一个“源函数”source function需要实现SourceFunction接口Flink1.12开始的流批统一的Source框架下则是 DataStreamSourceString stream env.fromSource(…)2、从集合中读取数据 调用执行环境对象的fromCollection方法进行读取。这相当于将数据临时存储到内存中形成特殊的数据结构后作为数据源使用一般用于测试 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();ListInteger data Arrays.asList(1, 22, 33);DataStreamSourceInteger ds env.fromCollection(data);stream.print(); //直接打印env.execute(); } 还可以直接fromElements方法 DataStreamSourceInteger ds env.fromElements(1,22,33);3、从文件中读取 从文件中读是批处理中最常见的读取方式比如读取某个日志文件。首先需要引入文件连接器依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version /dependencypublic static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//第三个参数为自定义的sourceNameFileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(input/word.txt)).build();env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),file).print();env.execute(); } FileSource数据源对象的创建传参可以是目录也可以是文件可以相对、绝对路径也可从HDFS目录下读开头格式hdfs://…相对路径是从系统属性user.dir获取路径idea下是project的根目录standalone模式下是集群节点根目录之前的env.readTextFile方法被标记为过时是因为底层调用了addSource 4、从Socket读取 前面的文件和集合都是有界流而Socket常用于调试阶段模拟无界流 DataStreamString stream env.socketTextStream(localhost, 9527);# 对应的主机执行 nc -lk 95275、从Kafka读取 数据源是外部系统常需要导入对应的连接器的依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version /dependency 实例 public class SourceKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop01:9092,hadoop02:9092,hadoop03:9092) //指定Kafka节点的端口和地址.setTopics(topic_1) //消费的Topic.setGroupId(code9527) //消费者组id//Flink程序做为Kafka的消费者要进行对象的反序列化setDeserializer对key和value都生效.setStartingOffsets(OffsetsInitializer.latest()) //指定Flink消费Kafka的策略.setValueOnlyDeserializer(new SimpleStringSchema()) //反序列化Value的反序列化器.build();DataStreamSourceString stream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafka-source);stream.print(Kafka);env.execute();} }//很多传参CtrlP看源码类型、CtrlH实现类自行分析Kafaka的消费者参数 earliest有offset就从offset继续消费没offset就从最早开始消费latest有offset就从offset继续消费没offset就从最新开始消费 Flink下的KafkaSourceoffset消费策略有个初始化器OffsetInitializer默认是earliest earliest一定从最早消费latest一定从最新消费 注意和Kafka自身的区别。 6、从数据生成器读取数据 Flink从1.11开始提供了一个内置的DataGen 连接器主要是用于生成一些随机数来调试。1.17版本提供了新写法导入依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion${flink.version}/version /dependencypublic class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//env.setParallelism(1);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number:value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(10),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), datagenerator).print();env.execute();} } new数据生成器源对象有四个参数 第一个为GeneratorFunction接口key为Long型value为需要map转换后的类型。需要实现map方法输入方法固定是Long类型第二个为自动生成数字的最大值long型到这个值就停止生成第三个为限速策略比如每秒生成几个第四个为返回的数据类型Types.xxTypes类是Flink包下的 嘶并行度默认为CPU核心数了输出算子6个子任务且是每个并行度上是各自自增的先按总数/并行度划分再各自执行比如最大值100并行度2那一个从0开始另一个从50到99。数字打印出来看着有点乱了改下并行度 env.setParallelism(1);可以看到程序结束了相当于有界流了想模拟无界流可以第二个参数传Long.MAX_VALUE这就一直输出了 7、Flink支持的数据类型 Flink使用类型信息TypeInformation来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性并为每个数据类型生成特定的序列化器、反序列化器和比较器 对于Java和Scale常见的数据类型Flink都支持在Types工具类中可以看到 Flink支持所有自定义的Java类和Scala类但要符合以下要求 类是公有public的有一个无参的构造方法所有属性都是可访问的即公有public或privategetter、setter类中所有属性的类型都是可以序列化的 不满足以上要求的类会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒无法获取它们内部的属性它们也不是由Flink本身序列化的而是由Kryo序列化的。 8、Flink的类型提示Type Hints Flink还具有一个类型提取系统可以分析函数的输入和返回类型自动获取类型信息从而获得对应的序列化器和反序列化器。但是由于Java中泛型擦除的存在在某些特殊情况下比如Lambda表达式中自动提取的信息是不够精细的需要我们手动显示提供类型信息。 之前的word count流处理程序我们在将String类型的每个词转换成word count二元组后就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式系统只能推断出返回的是Tuple2类型而无法得到Tuple2String, Long。只有显式地告诉系统当前的返回类型才能正确地解析出完整数据。 //.... .map(word - Tuple2.of(word, 1L)) .returns(Types.TUPLE(Types.STRING, Types.LONG));
http://wiki.neutronadmin.com/news/87659/

相关文章:

  • 湛江网站建设公司centos wordpress ftp
  • 陶瓷 网站模板信息流网站建设
  • 网站后台空白商业网站策划书模板范文
  • 浙江省的网站建设公司有哪些网站建设的五类成员
  • 好看的企业网站源码网站建设公司扬州
  • 做场景秀的网站如何推广外贸型网站
  • 建站小程序编辑器闪亮登场用python做电商网站
  • 怎么制作微信购物网站山东大学信息服务平台
  • 婚庆网站建设策划案深圳房地产网站建设
  • 商城网站有什么好处黑色网站模板
  • 建设银行网站app一个大佬做的本子网站
  • 网站开发公司怎么选择网站开发者模式下载视频
  • 网站外包多少钱兰州市做网站的企业有哪些
  • 山东城市建设厅网站wordpress pdf 打印
  • 南京h5网站开发莱芜金点子电子版
  • 室内设计素材网站大全境外网站可以备案吗
  • 株洲建设企业网站oa系统网站建设
  • 深圳网站建设网站推广方案设计欣赏网
  • 做网站如何链接邮箱谷歌浏览器对做网站有什么好处
  • 用软件做模板下载网站中油即时通信电脑版
  • 可以刮刮卡的网站无锡网站营销公司
  • 全站仪建站流程什么是域名为什么需要它
  • 建设网站优点免费建立平台网站
  • 软路由系统如何做网站wordpress执行生命周期
  • 自己可以建设网站吗文化建设宣传标语
  • 帆客建设网站宝塔面板加wordpress建站
  • 用文本文档做网页wordpress 内存优化
  • flash网站模板下载做投票链接网站
  • 厦门网站建设建网站网站首页包含的内容怎么做
  • 自己建网站网站优化外包价格