当前位置: 首页 > news >正文

台州知名网站广州的房地产网站建设

台州知名网站,广州的房地产网站建设,广东全网推广,标准化班组建设网站文章目录 0、demo数据1、源算子Source2、从集合中读取数据3、从文件中读取4、从Socket读取5、从Kafka读取6、从数据生成器读取数据7、Flink支持的数据类型8、Flink的类型提示#xff08;Type Hints#xff09; 0、demo数据 准备一个实体类WaterSensor#xff1a; Data All… 文章目录 0、demo数据1、源算子Source2、从集合中读取数据3、从文件中读取4、从Socket读取5、从Kafka读取6、从数据生成器读取数据7、Flink支持的数据类型8、Flink的类型提示Type Hints 0、demo数据 准备一个实体类WaterSensor Data AllArgsConstructor NoArgsConstructor public class WaterSensor{private String id; //水位传感器类型private Long ts; //传感器记录时间戳private Integer vc; //水位记录 } //注意所有属性的类型都是可序列化的如果属性类型是自定义类那要实现Serializable接口模块下准备个文件words.txt内容 hello flink hello world hello java1、源算子Source Flink可以从各种来源获取数据然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源data source而读取数据的算子就是源算子source operator。 Flink1.12以前添加数据源的方式是调用执行环境对象的addSource方法 DataStreamString stream env.addSource(...); //方法传入的参数是一个“源函数”source function需要实现SourceFunction接口Flink1.12开始的流批统一的Source框架下则是 DataStreamSourceString stream env.fromSource(…)2、从集合中读取数据 调用执行环境对象的fromCollection方法进行读取。这相当于将数据临时存储到内存中形成特殊的数据结构后作为数据源使用一般用于测试 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();ListInteger data Arrays.asList(1, 22, 33);DataStreamSourceInteger ds env.fromCollection(data);stream.print(); //直接打印env.execute(); } 还可以直接fromElements方法 DataStreamSourceInteger ds env.fromElements(1,22,33);3、从文件中读取 从文件中读是批处理中最常见的读取方式比如读取某个日志文件。首先需要引入文件连接器依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version /dependencypublic static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//第三个参数为自定义的sourceNameFileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(input/word.txt)).build();env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),file).print();env.execute(); } FileSource数据源对象的创建传参可以是目录也可以是文件可以相对、绝对路径也可从HDFS目录下读开头格式hdfs://…相对路径是从系统属性user.dir获取路径idea下是project的根目录standalone模式下是集群节点根目录之前的env.readTextFile方法被标记为过时是因为底层调用了addSource 4、从Socket读取 前面的文件和集合都是有界流而Socket常用于调试阶段模拟无界流 DataStreamString stream env.socketTextStream(localhost, 9527);# 对应的主机执行 nc -lk 95275、从Kafka读取 数据源是外部系统常需要导入对应的连接器的依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version /dependency 实例 public class SourceKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop01:9092,hadoop02:9092,hadoop03:9092) //指定Kafka节点的端口和地址.setTopics(topic_1) //消费的Topic.setGroupId(code9527) //消费者组id//Flink程序做为Kafka的消费者要进行对象的反序列化setDeserializer对key和value都生效.setStartingOffsets(OffsetsInitializer.latest()) //指定Flink消费Kafka的策略.setValueOnlyDeserializer(new SimpleStringSchema()) //反序列化Value的反序列化器.build();DataStreamSourceString stream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafka-source);stream.print(Kafka);env.execute();} }//很多传参CtrlP看源码类型、CtrlH实现类自行分析Kafaka的消费者参数 earliest有offset就从offset继续消费没offset就从最早开始消费latest有offset就从offset继续消费没offset就从最新开始消费 Flink下的KafkaSourceoffset消费策略有个初始化器OffsetInitializer默认是earliest earliest一定从最早消费latest一定从最新消费 注意和Kafka自身的区别。 6、从数据生成器读取数据 Flink从1.11开始提供了一个内置的DataGen 连接器主要是用于生成一些随机数来调试。1.17版本提供了新写法导入依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion${flink.version}/version /dependencypublic class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//env.setParallelism(1);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number:value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(10),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), datagenerator).print();env.execute();} } new数据生成器源对象有四个参数 第一个为GeneratorFunction接口key为Long型value为需要map转换后的类型。需要实现map方法输入方法固定是Long类型第二个为自动生成数字的最大值long型到这个值就停止生成第三个为限速策略比如每秒生成几个第四个为返回的数据类型Types.xxTypes类是Flink包下的 嘶并行度默认为CPU核心数了输出算子6个子任务且是每个并行度上是各自自增的先按总数/并行度划分再各自执行比如最大值100并行度2那一个从0开始另一个从50到99。数字打印出来看着有点乱了改下并行度 env.setParallelism(1);可以看到程序结束了相当于有界流了想模拟无界流可以第二个参数传Long.MAX_VALUE这就一直输出了 7、Flink支持的数据类型 Flink使用类型信息TypeInformation来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性并为每个数据类型生成特定的序列化器、反序列化器和比较器 对于Java和Scale常见的数据类型Flink都支持在Types工具类中可以看到 Flink支持所有自定义的Java类和Scala类但要符合以下要求 类是公有public的有一个无参的构造方法所有属性都是可访问的即公有public或privategetter、setter类中所有属性的类型都是可以序列化的 不满足以上要求的类会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒无法获取它们内部的属性它们也不是由Flink本身序列化的而是由Kryo序列化的。 8、Flink的类型提示Type Hints Flink还具有一个类型提取系统可以分析函数的输入和返回类型自动获取类型信息从而获得对应的序列化器和反序列化器。但是由于Java中泛型擦除的存在在某些特殊情况下比如Lambda表达式中自动提取的信息是不够精细的需要我们手动显示提供类型信息。 之前的word count流处理程序我们在将String类型的每个词转换成word count二元组后就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式系统只能推断出返回的是Tuple2类型而无法得到Tuple2String, Long。只有显式地告诉系统当前的返回类型才能正确地解析出完整数据。 //.... .map(word - Tuple2.of(word, 1L)) .returns(Types.TUPLE(Types.STRING, Types.LONG));
http://www.yutouwan.com/news/260389/

相关文章:

  • 网站运营经验分享ppt模板哪些购物网站做的比较简洁有品质
  • 网站开发定位网络服务商机构域名
  • 精选聊城做网站的公司长沙公共资源交易中心官网
  • 网站开发需要经过的几个主要阶段二建官网报名入口
  • 数据分析网站开发wordpress建网店
  • 道路建设网站专题广告公司名字怎么起
  • 福州企业网站建设关键词检索
  • 网站建设公司专业网站研发开发个人简历(电子版)
  • 嘉兴企业网站建设系统国家信用企业信息系统
  • 官方网站制作思路百度app安装免费下载
  • 访问国外网站太慢中国网站
  • 付费网站推广网络营销以什么为基础
  • html网站建设源码门业东莞网站建设技术支持
  • 环保局网站建设方案福州市建设工程造价管理网站
  • 武安企业做网站推广网站建设晋丰
  • 邯郸营销型网站淘宝优秀软文范例100字
  • 宁夏建设网站的公司电话asp网站的安全性
  • 国外设计模板网站jsp网站建设项目
  • 什么做网站站群潜江资讯网58同城
  • 合肥网站建设的价格湖南省做网站那个企业便宜
  • 开发一个企业网站报价wordpress twilight saga 主题
  • 好文案网站wordpress登录不上
  • 唯品会网站架构织梦网络公司网站源码
  • 网站体验分享wordpress 更换编辑器
  • 如何做拍卖网站扬州做机床公司网站
  • 零基础学做衣服的网站服务好的高端网站建设公司
  • 教育网站制作定制网站搭建中114514
  • 举报网站建设情况 汇报招聘页面模板
  • 什么是优化型网站手机网站生成代码
  • 信用网站建设成效上海网站建设推广服务