当前位置: 首页 > news >正文

台州知名网站广州的房地产网站建设

台州知名网站,广州的房地产网站建设,广东全网推广,标准化班组建设网站文章目录 0、demo数据1、源算子Source2、从集合中读取数据3、从文件中读取4、从Socket读取5、从Kafka读取6、从数据生成器读取数据7、Flink支持的数据类型8、Flink的类型提示#xff08;Type Hints#xff09; 0、demo数据 准备一个实体类WaterSensor#xff1a; Data All… 文章目录 0、demo数据1、源算子Source2、从集合中读取数据3、从文件中读取4、从Socket读取5、从Kafka读取6、从数据生成器读取数据7、Flink支持的数据类型8、Flink的类型提示Type Hints 0、demo数据 准备一个实体类WaterSensor Data AllArgsConstructor NoArgsConstructor public class WaterSensor{private String id; //水位传感器类型private Long ts; //传感器记录时间戳private Integer vc; //水位记录 } //注意所有属性的类型都是可序列化的如果属性类型是自定义类那要实现Serializable接口模块下准备个文件words.txt内容 hello flink hello world hello java1、源算子Source Flink可以从各种来源获取数据然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源data source而读取数据的算子就是源算子source operator。 Flink1.12以前添加数据源的方式是调用执行环境对象的addSource方法 DataStreamString stream env.addSource(...); //方法传入的参数是一个“源函数”source function需要实现SourceFunction接口Flink1.12开始的流批统一的Source框架下则是 DataStreamSourceString stream env.fromSource(…)2、从集合中读取数据 调用执行环境对象的fromCollection方法进行读取。这相当于将数据临时存储到内存中形成特殊的数据结构后作为数据源使用一般用于测试 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();ListInteger data Arrays.asList(1, 22, 33);DataStreamSourceInteger ds env.fromCollection(data);stream.print(); //直接打印env.execute(); } 还可以直接fromElements方法 DataStreamSourceInteger ds env.fromElements(1,22,33);3、从文件中读取 从文件中读是批处理中最常见的读取方式比如读取某个日志文件。首先需要引入文件连接器依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version /dependencypublic static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//第三个参数为自定义的sourceNameFileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(input/word.txt)).build();env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),file).print();env.execute(); } FileSource数据源对象的创建传参可以是目录也可以是文件可以相对、绝对路径也可从HDFS目录下读开头格式hdfs://…相对路径是从系统属性user.dir获取路径idea下是project的根目录standalone模式下是集群节点根目录之前的env.readTextFile方法被标记为过时是因为底层调用了addSource 4、从Socket读取 前面的文件和集合都是有界流而Socket常用于调试阶段模拟无界流 DataStreamString stream env.socketTextStream(localhost, 9527);# 对应的主机执行 nc -lk 95275、从Kafka读取 数据源是外部系统常需要导入对应的连接器的依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version /dependency 实例 public class SourceKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop01:9092,hadoop02:9092,hadoop03:9092) //指定Kafka节点的端口和地址.setTopics(topic_1) //消费的Topic.setGroupId(code9527) //消费者组id//Flink程序做为Kafka的消费者要进行对象的反序列化setDeserializer对key和value都生效.setStartingOffsets(OffsetsInitializer.latest()) //指定Flink消费Kafka的策略.setValueOnlyDeserializer(new SimpleStringSchema()) //反序列化Value的反序列化器.build();DataStreamSourceString stream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafka-source);stream.print(Kafka);env.execute();} }//很多传参CtrlP看源码类型、CtrlH实现类自行分析Kafaka的消费者参数 earliest有offset就从offset继续消费没offset就从最早开始消费latest有offset就从offset继续消费没offset就从最新开始消费 Flink下的KafkaSourceoffset消费策略有个初始化器OffsetInitializer默认是earliest earliest一定从最早消费latest一定从最新消费 注意和Kafka自身的区别。 6、从数据生成器读取数据 Flink从1.11开始提供了一个内置的DataGen 连接器主要是用于生成一些随机数来调试。1.17版本提供了新写法导入依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion${flink.version}/version /dependencypublic class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//env.setParallelism(1);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number:value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(10),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), datagenerator).print();env.execute();} } new数据生成器源对象有四个参数 第一个为GeneratorFunction接口key为Long型value为需要map转换后的类型。需要实现map方法输入方法固定是Long类型第二个为自动生成数字的最大值long型到这个值就停止生成第三个为限速策略比如每秒生成几个第四个为返回的数据类型Types.xxTypes类是Flink包下的 嘶并行度默认为CPU核心数了输出算子6个子任务且是每个并行度上是各自自增的先按总数/并行度划分再各自执行比如最大值100并行度2那一个从0开始另一个从50到99。数字打印出来看着有点乱了改下并行度 env.setParallelism(1);可以看到程序结束了相当于有界流了想模拟无界流可以第二个参数传Long.MAX_VALUE这就一直输出了 7、Flink支持的数据类型 Flink使用类型信息TypeInformation来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性并为每个数据类型生成特定的序列化器、反序列化器和比较器 对于Java和Scale常见的数据类型Flink都支持在Types工具类中可以看到 Flink支持所有自定义的Java类和Scala类但要符合以下要求 类是公有public的有一个无参的构造方法所有属性都是可访问的即公有public或privategetter、setter类中所有属性的类型都是可以序列化的 不满足以上要求的类会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒无法获取它们内部的属性它们也不是由Flink本身序列化的而是由Kryo序列化的。 8、Flink的类型提示Type Hints Flink还具有一个类型提取系统可以分析函数的输入和返回类型自动获取类型信息从而获得对应的序列化器和反序列化器。但是由于Java中泛型擦除的存在在某些特殊情况下比如Lambda表达式中自动提取的信息是不够精细的需要我们手动显示提供类型信息。 之前的word count流处理程序我们在将String类型的每个词转换成word count二元组后就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式系统只能推断出返回的是Tuple2类型而无法得到Tuple2String, Long。只有显式地告诉系统当前的返回类型才能正确地解析出完整数据。 //.... .map(word - Tuple2.of(word, 1L)) .returns(Types.TUPLE(Types.STRING, Types.LONG));
http://wiki.neutronadmin.com/news/260389/

相关文章:

  • 深圳网站建设深圳企业网站建设软件下载平台哪个好
  • 网站关键字如何选择wordpress无法编辑
  • 个人网站建设的参考文献做淘客网站怎么建要购买数据库吗
  • 网站推广的渠道有哪些网站建设价目
  • 上海自助建站平台建设销售网站的好处
  • 快速搭建网站推荐专业动漫如何制作
  • 做网站最简单的网站开发成都
  • 学校网站建设领导小组网站开发与推广计划书
  • 平舆网站建设推荐几个安全免费的网站
  • 网址网页网站的区别??官方网站查询叉车证
  • 汕头网站建设sthke主要的网站开发技术
  • 青岛公司做网站天长市建设局网站
  • 南昌电商购物网站开发湖南学校网站建设
  • 工信部网站备案注销wordpress 树
  • 建设银行什么网站可买手表全球十大网站排名
  • 河南省建设厅广州网站优化排名推广
  • 完成网站的建设工作总结wordpress用户邮箱验证码
  • 校园网站规划与建设心得如何在腾讯云做网站
  • 网站导航网站开发网站设计在线培训
  • 做网站可以赚多少钱it网站开发
  • 网站开发 一般用什么语言表白网站是怎么做的
  • wordpress一步步建企业网站外贸销售怎么找客户
  • 做国内打不开的网站吗徐州seo排名公司
  • 阿里云安装wordpress数据库错误网站seo如何做
  • 销售类网站开发架构阳朔县建设规划局网站
  • 珠海做网站的公司网站建设需要哪些技术
  • 中山网站建设是什么安徽城乡建设厅网站
  • 网站制作top用阿里云怎么建网站
  • 哪些网站会盗取湛江市住房和城乡建设网站
  • 潍坊网站建设外包一哥优购物官方网站