当前位置: 首页 > news >正文

做网站版头图片企业商务网站 建设方案

做网站版头图片,企业商务网站 建设方案,网络优化是做啥的,网站做授权登录【README】 0#xff09;本文编写了多个flink水位线watermark的代码例子#xff0c;加深对watermark的理解 #xff1b; 1#xff09;时间分类 Event Time#xff1a; 事件创建的时间#xff08;事件发生时间#xff09;#xff1b;Ingestion Time#xff1a;数据进…【README】 0本文编写了多个flink水位线watermark的代码例子加深对watermark的理解 1时间分类 Event Time 事件创建的时间事件发生时间Ingestion Time数据进入flink的实际Processing Time执行算子的本地机器时间 我们主要讨论的是 事件时间 2flink窗口分为 滚动窗口滑动窗口 本文使用了 滚动窗口 滚动窗口 只有1个参数窗口长度与窗口步长窗口创建频率相等滑动窗口有2个参数即窗口长度窗口步长可以手动设置可以相等也可以不等 3本文结合代码示例讲了 水位线 窗口窗口属性 lateness 延迟属性 窗口流的 siteOutputLateData 侧输出流旁路输出及其它们的作用 【1】水位线 1定义本文自定义总结非官方水位线 watermark指的是 flink底层在数据流中添加的带有时间戳的数据当这些水位线数据到达算子时如窗口算子算子会认为 小于水位线的业务数据都来了数据可以理解为 一条日志或温度传感器采集的温度信息 2作用 水位线可以用来处理无序数据流下文代码例子会给出 3如何产生水位线 指定水位线的时间戳如何获取 可以指定 水位线时间戳从业务数据抽象为javabean的某个属性获取指定水位线可以延迟多长时间即允许无序数据最多可以晚来多长时间超过这个时间会被丢弃 【1.1】事件迟到被丢弃 1建立一个 10s 滚动窗口算子每10s新开一个长度为10s的窗口水位线取温度bean的时间戳且延迟 0 秒如下 其中 窗口用于收集id号码即属于同一个窗口的元素的id会被收集到一起 public class WindowTest3_EventTimeWatermarkWindow3 {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket读取数据数据格式参见 sensorTimeWatermarkWindow.txt // DataStreamString textStream env.readTextFile(D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensorTimeWatermarkWindow.txt);// nc -lk 7777 DataStreamString textStream env.socketTextStream(192.168.163.201, 7778);// 转换为 SensorReader pojo类型DataStreamSensorReadingTimeWatermarkWindow sensorStream textStream.map(x - {String[] arr x.split(,);return new SensorReadingTimeWatermarkWindow(arr[0], arr[1], arr[2], new BigDecimal(arr[3]));});// 设置抽取时间戳水位线延迟2秒如当前时间戳为 20:00:10 水位线的时间是 20:00:08窗口是看水位线时间而不是时间时间SingleOutputStreamOperatorSensorReadingTimeWatermarkWindow streamWithWatermark sensorStream.assignTimestampsAndWatermarks(WatermarkStrategy.SensorReadingTimeWatermarkWindowforBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner((event, timestamp) - event.getTimestamp().getTime()));// 开窗聚合SingleOutputStreamOperatorString aggForWindowStream streamWithWatermark.keyBy(SensorReadingTimeWatermarkWindow::getType).window(TumblingEventTimeWindows.of(Time.seconds(10))).aggregate(new AggregateFunctionSensorReadingTimeWatermarkWindow, String, String() {Overridepublic String createAccumulator() {return ;}Overridepublic String add(SensorReadingTimeWatermarkWindow sensorReadingTimeWatermarkWindow, String s) {return s , sensorReadingTimeWatermarkWindow.getId();}Overridepublic String getResult(String s) {return s;}Overridepublic String merge(String s, String acc1) {return s , acc1;}});// 打印aggForWindowStream.print(aggForWindowStream);// 执行env.execute(aggForWindowStream);} } 上述代码中的水位线的延迟时间为0s即水位线时间戳等于事件时间戳  元素抽象为 传感器信息bean如下 public class SensorReadingTimeWatermarkWindow {private String id;private String type;private Timestamp timestamp;private BigDecimal temperature;public SensorReadingTimeWatermarkWindow() {}public SensorReadingTimeWatermarkWindow(String id, String type, String timeStr, BigDecimal temperature) {this.id id;this.type type;this.temperature temperature;this.parseTimestamp(timeStr);}private void parseTimestamp(String timeStr) {SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);try {this.timestamp new Timestamp(simpleDateFormat.parse(timeStr).getTime());} catch (ParseException e) {this.timestamp new Timestamp(System.currentTimeMillis());}} } 接收的是 nc 客户端的socket文本流窗口算子计算结果如下 详情如下 1,sensor1,2022-04-17 22:07:01,36.17,sensor1,2022-04-17 22:07:02,36.78,sensor1,2022-04-17 22:07:04,36.811,sensor1,2022-04-17 22:07:07,36.912,sensor1,2022-04-17 22:07:11,36.9 - 1, 7, 8, 11 13,sensor1,2022-04-17 22:07:09,36.915,sensor1,2022-04-17 22:07:16,36.9 16,sensor1,2022-04-17 22:07:23,36.9 - 12,15 【结果分析】 发现1当事件12id12出现时因水位线延迟时间为0所以水位线时间戳等于事件12的时间戳22:07:11这个时间戳大于窗口结束时间22:07:10第1个窗口被关闭并输出计算结果为【1,7,811】发现2 当事件16id16出现时因水位线延迟时间为0所以水位线时间戳等于事件16的时间戳22:07:23这个时间戳大于窗口结束时间22:07:20第2个窗口被关闭并输出计算结果为【12,15】发现3事件13没有更新水位线因为水位线必须单调递增事件12发生时的水位线是22:07:11事件13的时间戳是22:07:09所以事件13发生时不会更新水位线 问题来了 事件13去哪里了 被 flink 丢弃了因为事件13迟到了 如何理解事件迟到了 因为事件12 的时间戳为 22:07:11又水位线延迟0s所以水位线的 时间戳也是 22:07:11这大于窗口结束时间所以窗口关闭并计算结果窗口关闭后事件13才来因此被丢弃。 【补充】窗口范围是左闭右开如上图第1个窗口的范围是 [0,10)第2个窗口是 [10,20) 【1.2】 事件迟到但被正常处理 1修改上述水位线代码 设置延迟时间为5s重新录入上述数据结果如下 // 设置抽取时间戳水位线延迟2秒如当前时间戳为 20:00:10 水位线的时间是 20:00:08窗口是看水位线时间而不是事件时间 SingleOutputStreamOperatorSensorReadingTimeWatermarkWindow streamWithWatermark sensorStream.assignTimestampsAndWatermarks(WatermarkStrategy.SensorReadingTimeWatermarkWindowforBoundedOutOfOrderness(Duration.ofSeconds(5)) // 水位线延迟时间修改为 5s.withTimestampAssigner((event, timestamp) - event.getTimestamp().getTime())); 1,sensor1,2022-04-17 22:07:01,36.17,sensor1,2022-04-17 22:07:02,36.78,sensor1,2022-04-17 22:07:04,36.811,sensor1,2022-04-17 22:07:07,36.912,sensor1,2022-04-17 22:07:11,36.9 13,sensor1,2022-04-17 22:07:09,36.915,sensor1,2022-04-17 22:07:16,36.9 - 1, 7, 8, 11, 13 16,sensor1,2022-04-17 22:07:23,36.9 21,sensor1,2022-04-17 22:07:20,36.922,sensor1,2022-04-17 22:07:25,36.9 - 12, 15 【结果分析】 发现1事件13事件21 不会更新水位线时间戳原因上文已经解释过了发现2当事件15id15出现时因水位线延迟时间为5s所以水位线等于事件15的时间戳减去5s 22:07:11这个时间戳大于窗口结束时间22:07:10第1个窗口被关闭并输出计算结果为【1,7,81113】发现3事件13没有被丢弃因为水位线延迟了5s窗口在事件15发生时才关闭所以可以探测到事件13这也阐述了为啥 flink水位线可以处理无序数据的原理flink的设计者的水位线idea真的很棒对比来看【1.1】中的例子事件13被丢弃发现4当事件22id22出现时因水位线延迟时间为5s所以水位线等于事件22的时间戳减去5s 22:07:20这个时间戳大于等于窗口结束时间22:07:20第2个窗口被关闭并输出计算结果为【12,15】大于等于窗口结束时间窗口就被关闭因为窗口范围是左开右闭【2】窗口的 lateness 延迟属性 此外窗口还有 lateness 属性表示延迟多长时间关闭窗口 如下面代码每10s 创建一个长度为12s的窗口 如果没有 lateness参数或其为0的话 就是 每10s 创建一个长度为10s的窗口 代码修改如下 SingleOutputStreamOperatorString aggForWindowStream streamWithWatermark.keyBy(SensorReadingTimeWatermarkWindow::getType).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 允许窗口延迟 2 秒后关闭窗口 窗口算子计算结果如下 详情如下 1,sensor1,2022-04-17 22:07:01,36.17,sensor1,2022-04-17 22:07:02,36.78,sensor1,2022-04-17 22:07:04,36.811,sensor1,2022-04-17 22:07:07,36.912,sensor1,2022-04-17 22:07:11,36.9 13,sensor1,2022-04-17 22:07:09,36.915,sensor1,2022-04-17 22:07:15,36.9 - 1, 7, 8, 11, 1316,sensor1,2022-04-17 22:07:09,36.9 - 1, 7, 8, 11, 13, 16 17,sensor1,2022-04-17 22:07:16,36.9 18,sensor1,2022-04-17 22:07:09,36.9 - 1, 7, 8, 11, 13, 16, 18 19,sensor1,2022-04-17 22:07:17,36.9 窗口关闭 20,sensor1,2022-04-17 22:07:09,36.9 被丢弃 21,sensor1,2022-04-17 22:07:20,36.922,sensor1,2022-04-17 22:07:25,36.9 - 12, 15, 17, 19 【结果分析】 事件15发生时因水位线延迟5s所以水位线时间戳22:07:15-5s22:07:10等于第1个窗口的结束时间故第1个窗口计算结果为 【1, 7, 8, 11, 13】但窗口没有关闭因为lateness为2s延迟2秒关闭即当水位线大于等于 22:07:12 时窗口关闭事件16发生时第1个窗口因为 lateness2s 没有关闭又事件16时间戳22:07:09所以还是参与窗口1的计算输出结果【1, 7, 8, 11, 13, 16】事件17发生时时间戳22:07:16水位线时间戳22:07:11这小于带lateness2s的窗口1的关闭时间 22:07:12所以窗口1还是不会关闭事件18发生时时间戳22:07:09 因水位线单调递增故不变还是22:07:11事件18参与窗口1的计算结果为 【1, 7, 8, 11, 13, 16, 18】事件19发生时时间戳22:07:17水位线22:07:12等于带lateness2s的窗口1的关闭时间窗口1关闭事件20发生时时间戳22:07:09落入了窗口1的范围22:07:00~22:07:10但因窗口1已经关闭所以事件20被丢弃 通过以上示例本文应该是把窗口的lateness属性 讲清楚了 【问题】 事件20被丢弃的话 不满足业务场景对数据一致性的要求 因为服务1发送了10条数据到达服务2的时候却只有9条数据这不满足业务需求是开发团队不愿意看到的事情那如何找回这些被丢弃的事件呢通过旁路输出【3】如何收集迟到数据 从旁路输出side output获取迟到数据 通过 Flink 的 旁路输出 功能可以获得迟到数据的数据流。 首先需要在开窗后的 stream 上使用 sideOutputLateData(OutputTag) 表明需要把迟到数据存入 旁输出流。 代码修改如下添加旁路输出流侧输出流 // 侧输出流对于延迟的且没有进入窗口的数据放到侧输出流旁路输出流OutputTagSensorReadingTimeWatermarkWindow lateOutputTag new OutputTagSensorReadingTimeWatermarkWindow(late) {};// 开窗聚合SingleOutputStreamOperatorString aggForWindowStream streamWithWatermark.keyBy(SensorReadingTimeWatermarkWindow::getType).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 允许延迟 2 秒后关闭窗口.sideOutputLateData(lateOutputTag) // 无法进入窗口则进入侧输出流.aggregate(new AggregateFunctionSensorReadingTimeWatermarkWindow, String, String() {Overridepublic String createAccumulator() {return ;}Overridepublic String add(SensorReadingTimeWatermarkWindow sensorReadingTimeWatermarkWindow, String s) {return s , sensorReadingTimeWatermarkWindow.getId();}Overridepublic String getResult(String s) {return s;}Overridepublic String merge(String s, String acc1) {return s , acc1;}});// 打印窗口算子结果aggForWindowStream.print(aggForWindowStream);// 打印旁输出流aggForWindowStream.getSideOutput(lateOutputTag).print(lateOutputTag);// 执行env.execute(aggForWindowStream); 事件发生详情如下 1,sensor1,2022-04-17 22:07:01,36.17,sensor1,2022-04-17 22:07:02,36.78,sensor1,2022-04-17 22:07:04,36.811,sensor1,2022-04-17 22:07:07,36.912,sensor1,2022-04-17 22:07:11,36.9 13,sensor1,2022-04-17 22:07:09,36.915,sensor1,2022-04-17 22:07:15,36.9 - 1, 7, 8, 11, 1316,sensor1,2022-04-17 22:07:09,36.9 - 1, 7, 8, 11, 13, 16 17,sensor1,2022-04-17 22:07:16,36.9 18,sensor1,2022-04-17 22:07:09,36.9 - 1, 7, 8, 11, 13, 16, 18 19,sensor1,2022-04-17 22:07:17,36.9 窗口关闭 20,sensor1,2022-04-17 22:07:09,36.9 - lateOutputTag SensorReadingTimeWindow{id20, typesensor1, timestamp2022-04-17 22:07:09.0, temperature36.9} 结果分析 相比于【2】中代码示例 事件20被丢弃了而【3】中代码当事件20出现时由于窗口已经关闭但存在侧输出流旁路输出所以事件20 存入侧输出流解决了乱序数据迟到事件过长导致数据不一致的问题相反如果没有侧输出流则事件20会被丢弃
http://www.yutouwan.com/news/47747/

相关文章:

  • 网页设计网站简单静态模板哪个做简历的网站可以中英的
  • 广州番禺网站制作推广如何网站专题策划
  • 谁做的四虎网站是多少泉州网上房地产
  • c 做网站优点免费的wordpress企业模板
  • 粮油移动端网页设计素材网站seo诊断分析报告
  • 影楼做网站wordpress需要身份验证
  • 巨腾外贸网站建设公司外贸订单怎么找
  • 肥西县建设局资询网站中国室内设计公司排名榜
  • 开发什么网站免费行情软件app网站mnw直
  • 在兔展上怎么做网站页面应用商店网站源码
  • 死链对网站链轮的影响装饰工程包括哪些主要内容
  • 各网站推广做soho外贸网站
  • wordpress网站视频播放磁力蜘蛛种子搜索
  • 淄博企业网站建设价格简述网络营销的方法
  • 外贸网站如何seo推广开发网页游戏平台
  • 南通港闸区城乡建设局网站电商网站定制开发
  • 上海网站的优化公司wordpress加超链接
  • 公家网站模板什么是网站的入口
  • 网站字体大小合适wordpress posted on
  • 买网站需要注意什么商城网站建设预算要多少钱
  • 温州专业微网站制作文登做网站的公司
  • 四川网站建设博客app软件开发的费用设计
  • 做文案策划有些网站wordpress为什么不能显示域名
  • 蒲公英网站建设深圳办公室装修多少钱一个平方
  • 集团酒店网站建设网络管理与维护
  • 17一起来做网站北京网站优化流程
  • 自己做彩票网站合法吗广告设计用什么软件做
  • 有专门做辩论的网站吗上城区网站建设价格
  • 青岛网站建设排名aspx网站模板
  • 哪有做网站 的上海百姓网免费发布信息网