当前位置: 首页 > news >正文

邯郸哪有做网站的公司重庆公司网站建设价格

邯郸哪有做网站的公司,重庆公司网站建设价格,wordpress参考文献,深圳定制网站建设目录 1、添加pom依赖 2、API使用说明 3、这是一个完整的入门案例 4、Kafka消息应该如何解析 4.1、只获取Kafka消息的value部分 ​4.2、获取完整Kafka消息(key、value、Metadata) 4.3、自定义Kafka消息解析器 5、起始消费位点应该如何设置 ​5.1、earliest() 5.2、lat…目录 1、添加pom依赖 2、API使用说明 3、这是一个完整的入门案例 4、Kafka消息应该如何解析 4.1、只获取Kafka消息的value部分 ​4.2、获取完整Kafka消息(key、value、Metadata) 4.3、自定义Kafka消息解析器 5、起始消费位点应该如何设置 ​5.1、earliest() 5.2、latest() 5.3、timestamp() 6、Kafka分区扩容了该怎么办 —— 动态分区检查 7、在加载KafkaSource时提取事件时间添加水位线 7.1、使用内置的单调递增的水位线生成器 kafka timestamp 为事件时间 7.2、使用内置的单调递增的水位线生成器 kafka 消息中的 ID字段 为事件时间 1、添加pom依赖 我们可以使用Flink官方提供连接Kafka的工具flink-connector-kafka 该工具实现了一个消费者FlinkKafkaConsumer可以用它来读取kafka的数据 如果想使用这个通用的Kafka连接工具需要引入jar依赖 !-- 引入 kafka连接器依赖-- dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion1.17.0/version /dependency 2、API使用说明 官网链接Apache Kafka 连接器 语法说明:  // 1.初始化 KafkaSource 实例 KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers) // 必填指定broker连接信息 (为保证高可用,建议多指定几个节点) .setTopics(input-topic) // 必填指定要消费的topic.setGroupId(my-group) // 必填指定消费者的groupid(不存在时会自动创建).setValueOnlyDeserializer(new SimpleStringSchema()) // 必填指定反序列化器(用来解析kafka消息数据转换为flink数据类型).setStartingOffsets(OffsetsInitializer.earliest()) // 可选指定启动任务时的消费位点不指定时将默认使用 OffsetsInitializer.earliest().build(); // 2.通过 fromSource KafkaSource 获取 DataStreamSource env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source); 3、这是一个完整的入门案例 开发语言java1.8 flink版本flink1.17.0 public class ReadKafka {public static void main(String[] args) throws Exception {newAPI();}public static void newAPI() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(worker01:9092) // 必填指定broker连接信息 (为保证高可用,建议多指定几个节点).setTopics(20230810) // 必填指定要消费的topic.setGroupId(FlinkConsumer) // 必填指定消费者的groupid(不存在时会自动创建).setValueOnlyDeserializer(new SimpleStringSchema()) // 必填指定反序列化器(用来解析kafka消息数据).setStartingOffsets(OffsetsInitializer.earliest()) // 可选指定启动任务时的消费位点不指定时将默认使用 OffsetsInitializer.earliest().build();env.fromSource(source,WatermarkStrategy.noWatermarks(),Kafka Source).print();// 3.触发程序执行env.execute();} }4、Kafka消息应该如何解析 代码中需要提供一个反序列化器Deserializer来对 Kafka 的消息进行解析 反序列化器的功能 将Kafka ConsumerRecords转换为Flink处理的数据类型(Java/Scala对象) 反序列化器通过  setDeserializer(KafkaRecordDeserializationSchema.of(反序列化器类型)) 指定 下面介绍两种常用Kafka消息解析器 KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)) 1、返回完整的Kafka消息将JSON字符串反序列化为ObjectNode对象 2、可以选择是否返回Kafak消息的Metadata信息true-返回false-不返回 KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) 1、只返回Kafka消息中的value部分  4.1、只获取Kafka消息的value部分 4.2、获取完整Kafka消息(key、value、Metadata) kafak消息格式 key   {nation:蜀国} value  {ID:整数} public static void ParseMessageJSONKeyValue() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092) // 必填指定broker连接信息 (为保证高可用,建议多指定几个节点).setTopics(9527) // 必填指定要消费的topic.setGroupId(FlinkConsumer) // 必填指定消费者的groupid(不存在时会自动创建)// 必填指定反序列化器(将kafak消息解析为ObjectNodejson对象).setDeserializer(KafkaRecordDeserializationSchema.of(// includeMetadata (true:返回Kafak元数据信息 false:不返回)new JSONKeyValueDeserializationSchema(true))).setStartingOffsets(OffsetsInitializer.latest()) // 可选指定启动任务时的消费位点不指定时将默认使用 OffsetsInitializer.earliest().build();env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source).print();// 3.触发程序执行env.execute();}运行结果     常见报错  Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic 9527, partition 0, leaderEpoch 0, offset 1064, CreateTime 1691668775938, serialized key size 4, serialized value size 9, headers RecordHeaders(headers [], isReadOnly false), key [B5e9eaab8, value [B67390400).at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)... 14 more Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token xxxx: was expecting (JSON String, Number, Array, Object or token null, true or false)at [Source: (byte[])xxxx; line: 1, column: 5] 报错原因 出现这个报错一般是使用flink读取fafka时使用JSONKeyValueDeserializationSchema 来解析消息时kafka消息中的key 或者 value 内容不符合json格式而造成的解析错误 例如下面这个格式就会造成解析错误  key1000value你好 那应该怎么解决呢 1、如果有权限修改Kafka消息格式可以将Kafka消息keyvalue内容修改为Json格式 2、如果没有权限修改Kafka消息格式(比如线上环境修改比较困难)可以重新实现 JSONKeyValueDeserializationSchema类根据所需格式来解析Kafka消息(可以参考源码) 4.3、自定义Kafka消息解析器 生产中对Kafka消息及解析的格式总是各种各样的当flink预定义的解析器满足不了业务需求时可以通过自定义kafka消息解析器来完成业务的支持 例如当使用 MyJSONKeyValueDeserializationSchema 获取Kafka元数据时只返回了 offset、topic、partition 三个字段信息现在需要kafka生产者写入数据时的timestamp就可以通过自定义kafka消息解析器来完成 代码示例 // TODO 自定义Kafka消息解析器在 metadata 中增加 timestamp字段 public class MyJSONKeyValueDeserializationSchema implements KafkaDeserializationSchemaObjectNode{private static final long serialVersionUID 1509391548173891955L;private final boolean includeMetadata;private ObjectMapper mapper;public MyJSONKeyValueDeserializationSchema(boolean includeMetadata) {this.includeMetadata includeMetadata;}Overridepublic void open(DeserializationSchema.InitializationContext context) throws Exception {mapper JacksonMapperFactory.createObjectMapper();}Overridepublic ObjectNode deserialize(ConsumerRecordbyte[], byte[] record) throws Exception {ObjectNode node mapper.createObjectNode();if (record.key() ! null) {node.set(key, mapper.readValue(record.key(), JsonNode.class));}if (record.value() ! null) {node.set(value, mapper.readValue(record.value(), JsonNode.class));}if (includeMetadata) {node.putObject(metadata).put(offset, record.offset()).put(topic, record.topic()).put(partition, record.partition())// 添加 timestamp 字段.put(timestamp,record.timestamp());}return node;}Overridepublic boolean isEndOfStream(ObjectNode nextElement) {return false;}Overridepublic TypeInformationObjectNode getProducedType() {return getForClass(ObjectNode.class);}}运行结果 5、起始消费位点应该如何设置 起始消费位点说明 起始消费位点是指 启动flink任务时应该从哪个位置开始读取Kafka的消息    下面介绍下常用的三个设置     OffsetsInitializer.earliest()  从最早位点开始消 这里的最早指的是Kafka消息保存的时长(默认为7天生成环境各公司略有不同) 该这设置为默认设置当不指定OffsetsInitializer.xxx时默认为earliest()  OffsetsInitializer.latest()    从最末尾位点开始消费 这里的最末尾指的是flink任务启动时间点之后生产的消息 OffsetsInitializer.timestamp(时间戳) 从时间戳大于等于指定时间戳毫秒的数据开始消费 下面用案例说明下三种设置的效果kafak生成10条数据如下 5.1、earliest() 代码示例 KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(23230811).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最早位置开始消费该设置为默认设置.setStartingOffsets(OffsetsInitializer.earliest()).build(); 运行结果 5.2、latest() 代码示例 KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(23230811).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest()).build(); 运行结果 5.3、timestamp() 代码示例 KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(23230811).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new MyJSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从指定时间戳后开始消费.setStartingOffsets(OffsetsInitializer.timestamp(1691722791273L)).build(); 运行结果 6、Kafka分区扩容了该怎么办 —— 动态分区检查 在flink1.13的时候如果Kafka分区扩容了只有通过重启flink任务才能消费到新增分区的数据小编就曾遇到过上游业务部门的kafka分区扩容了并没有通知下游使用方导致实时指标异常甚至丢失了数据。 在flink1.17的时候可以通过开启动态分区检查来实现不用重启flink任务就能消费到新增分区的数据 开启分区检查(默认不开启) KafkaSource.builder().setProperty(partition.discovery.interval.ms, 10000); // 每 10 秒检查一次新分区 代码示例 KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(9527).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest())// 开启动态分区检查默认不开启.setProperty(partition.discovery.interval.ms, 10000) // 每 10 秒检查一次新分区.build(); 7、在加载KafkaSource时提取事件时间添加水位线 可以在 fromSource(source,WatermarkStrategy,sourceName) 时提取事件时间和制定水位线生成策略 注意当不指定事件时间提取器时Kafka Source 使用 Kafka 消息中的时间戳作为事件时间 7.1、使用内置的单调递增的水位线生成器 kafka timestamp 为事件时间 代码示例 // 在读取Kafka消息时提取事件时间插入水位线public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(9527).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new MyJSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest()).build();env.fromSource(source,// 使用内置的单调递增的水位线生成器默认使用 kafka的timestamp作为事件时间WatermarkStrategy.forMonotonousTimestamps(),Kafka Source)// 通过 ProcessFunction 查看提取的事件时间和水位线信息.process(new ProcessFunctionObjectNode, String() {Overridepublic void processElement(ObjectNode kafkaJson, ProcessFunctionObjectNode, String.Context ctx, CollectorString out) throws Exception {// 当前处理时间long currentProcessingTime ctx.timerService().currentProcessingTime();// 当前水位线long currentWatermark ctx.timerService().currentWatermark();StringBuffer record new StringBuffer();record.append(\n);record.append(kafkaJson \n);record.append(currentProcessingTime currentProcessingTime \n);record.append(currentWatermark currentWatermark \n);record.append(kafka-ID Long.parseLong(kafkaJson.get(value).get(ID).toString()) \n);record.append(kafka-timestamp Long.parseLong(kafkaJson.get(metadata).get(timestamp).toString()) \n);out.collect(record.toString());}}).print();// 3.触发程序执行env.execute();}运行结果 7.2、使用内置的单调递增的水位线生成器 kafka 消息中的 ID字段 为事件时间 代码示例 // 在读取Kafka消息时提取事件时间插入水位线public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(9527).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new MyJSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest()).build();env.fromSource(source,// 使用内置的单调递增的水位线生成器使用 kafka消息中的ID字段作为事件时间WatermarkStrategy.ObjectNodeforMonotonousTimestamps()// 提取 Kafka消息中的 ID字段作为 事件时间.withTimestampAssigner((json, timestamp) - Long.parseLong(json.get(value).get(ID).toString())),Kafka Source)// 通过 ProcessFunction 查看提取的事件时间和水位线信息.process(new ProcessFunctionObjectNode, String() {Overridepublic void processElement(ObjectNode kafkaJson, ProcessFunctionObjectNode, String.Context ctx, CollectorString out) throws Exception {// 当前处理时间long currentProcessingTime ctx.timerService().currentProcessingTime();// 当前水位线long currentWatermark ctx.timerService().currentWatermark();StringBuffer record new StringBuffer();record.append(\n);record.append(kafkaJson \n);record.append(currentProcessingTime currentProcessingTime \n);record.append(currentWatermark currentWatermark \n);record.append(kafka-ID Long.parseLong(kafkaJson.get(value).get(ID).toString()) \n);record.append(kafka-timestamp Long.parseLong(kafkaJson.get(metadata).get(timestamp).toString()) \n);out.collect(record.toString());}}).print();// 3.触发程序执行env.execute();}运行结果
http://wiki.neutronadmin.com/news/172962/

相关文章:

  • 葫芦岛网站建设找思路建设银行通控件网站
  • 查询网站是否过期星子网今天最新新闻
  • 网站开发培训设计企业数字化管理
  • 博客推广那个网站列好做优秀企业网站
  • 资讯类网站源码wordpress 配置ftp
  • 自学网站建设要看什么书农资网络销售平台
  • 玉溪市网站建设如何学习wordpress
  • 微信网站开发用什么语言怎样建设企业网站 用于宣传
  • 网站开发技术岗位职责wordpress 忘记用户名密码破解
  • 咨询网站公司建设计划书如何做某网站的移动客户端开发
  • 信息可视化网站提供免费建网站的网
  • 网站建设的功能特点有哪些网站开发学什么数据库
  • 灵璧做网站的公司安徽省建设安全协会网站
  • 什么是网站开发山西建设工程执业注册中心网站
  • 宣城网站建设公司软件项目流程八个阶段
  • 企业网站开发技术公众号开发教程零基础
  • 动易网站系统qq空间个人网站
  • 东莞市做网站的公司哪家好企业建站程序哪个好
  • 永泰县住房和城乡建设局网站做源码网站赚钱吗
  • 网页设计与网站建设课程设计h5动态页面怎么做的
  • c2c网站类型祁阳县住房和城乡规划建设局网站
  • 做男装比较好的网站有哪些做货代在哪些网站能找到客户
  • 网站编辑的工作职能有哪些大邯郸网站
  • 那个网站可以兼职做效果图wordpress 自建主题
  • 网站建设参考网站的说明三亚发布最新消息
  • 新吴区网站建设温州网站设计力推亿企帮
  • 高密哪里做网站好网站后台别人制作
  • an网站建设网站建设方案 市场分析
  • 做设计做网站义乌专业做网站的
  • 各网站收录做淘宝首页初学ps视频网站