当前位置: 首页 > news >正文

计算机网站建设目标自建站平台

计算机网站建设目标,自建站平台,免费看行情的软件大全免费,9国产精华最好的产品watermark 时间语义和 watermark 注意:数据进入flink的时间#xff1a;如果用这个作为时间语义就不存在问题#xff0c;但是开发中往往会用处理时间 作为时间语义这里就需要考虑延时的问题。 如上图#xff0c;数据从kafka中获取出来#xff0c;从多个分区中获取#xf…watermark 时间语义和 watermark 注意:数据进入flink的时间如果用这个作为时间语义就不存在问题但是开发中往往会用处理时间 作为时间语义这里就需要考虑延时的问题。 如上图数据从kafka中获取出来从多个分区中获取这时候时间肯定有乱序,这时候就需要使用事 件时间。 场景游戏连续过五关给予奖励 地铁里面玩游戏连过三关断网了二分钟过了八关。这时候是用处理时间还是事件时间呢 处理时间的优势牺牲一定的数据准确性没有延迟 package com.atguigu.apitest.window;/**import com.atguigu.apitest.beans.SensorReading; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.OutputTag;public class WindowTest3_EventTimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//默认为当前机器的cpu的最大核数//env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.getConfig().setAutoWatermarkInterval(100);// socket文本流DataStreamString inputStream env.socketTextStream(localhost, 7777);// 转换成SensorReading类型分配时间戳和watermarkDataStreamSensorReading dataStream inputStream.map(line - {String[] fields line.split(,);return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));})// 乱序数据设置时间戳和watermark.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorSensorReading(Time.seconds(2)) {Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});OutputTagSensorReading outputTag new OutputTagSensorReading(late) {};// 基于事件时间的开窗聚合统计15秒内温度的最小值SingleOutputStreamOperatorSensorReading minTempStream dataStream.keyBy(id).timeWindow(Time.seconds(15)).allowedLateness(Time.minutes(1)).sideOutputLateData(outputTag).minBy(temperature);minTempStream.print(minTemp);minTempStream.getSideOutput(outputTag).print(late);env.execute();} }sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718207,36.3 sensor_1,1547718211,32.8 sensor_1,1547718212,37.1注意第一个窗口是[1547718195,1547718210); sensor_1,1547718213,33 sensor_1,1547718224,32.1 sensor_1,1547718225,31.6 sensor_1,1547718226,21.2 sensor_1,1547718227,33.6第二个窗口大小第一个窗口是[1547718210,1547718225); 1.理想状态来一条数据处理一条每条数据代表对时间推进如图到5之后就将【05的窗口关闭并输出2.乱序状态原因网络延迟、分布式、分区导致乱序数据产生网络延迟和分布式处理造成的乱序都是几十毫秒和几百毫秒的范围的差距这将回造成大多数延迟数据集中在几十毫秒和几百毫秒的范围内3.解决方案将时间事件放慢 flink的三重保证1.设置watermaker将几百毫秒的数据全部输出2.先输出一个近似的结果但是不要关闭窗口后面延迟的时间还需要更新3.当延时时间到了窗口就关闭了兜底方案使用侧输出流保证数据不丢失注意数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据都已经到 达了因此window 的执行也是由 Watermark 触发的。 6 3 2 5 4 1 比如设置3秒的watermaker 到达5说明2秒之前的数据都到齐了后面23都可以输出 到达6说明3秒之前的数据都到齐了大于等于3秒的数据才能输出意义watermark 用来让程序自己平衡延迟和结果正确性如果设置太大延迟太高设置太 小数据就不准确需要通过具体的业务场景去平衡这个值; watermark 用来让程序自己平衡延迟和结果正确性如果设置太大延迟太高设置太小,乱序数据 没有搞定,数据就不准确需要通过具体的业务场景去平衡这个值;如何找到watermaker首先要了解乱序程度; 解决方案通过机器学习构建一个模型构建当前业务模型中的延迟状态的分布情况; 如图大部分的延时数据都20ms和80ms之间的范围中,这时候设置80ms就搞定大部分乱序数据; 这时候还有很少的数据如果对数据准确性要求比较高这时候就需要设置窗口迟到机制去保证 数据的准备性;最后还有网络延迟的数据还是没有输出这时候就需要添加侧输出流作为兜底方案。 watermark 生成问题 默认来一条生产一条watermaker如果短时间数据量比较大会造成watermaker都一样造成资 源浪费周期性添加watermaker:每隔一段时间更新一下watermaker 周期性时间缺点实时性不好数据过于分散会造成资源浪费如何选择看数据的分布过于集中使用周期性生成模式,数据稀疏使用默认的模型 状态编程  需求我们可以利用 Keyed state实现这样一个需求: 检测传感器的温度值如果连续的两个温度差值超过 10 度就输出报警 package com.atguigu.apitest.state;/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved* p* Project: FlinkTutorial* Package: com.atguigu.apitest.state* Version: 1.0* p* Created by wushengran on 2020/11/10 16:33*/import com.atguigu.apitest.beans.SensorReading; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;/*** ClassName: StateTest3_KeyedStateApplicationCase* Description:* Author: wushengran on 2020/11/10 16:33* Version: 1.0*/ public class StateTest3_KeyedStateApplicationCase {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// socket文本流DataStreamString inputStream env.socketTextStream(localhost, 7777);// 转换成SensorReading类型DataStreamSensorReading dataStream inputStream.map(line - {String[] fields line.split(,);return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});// 定义一个flatmap操作检测温度跳变输出报警SingleOutputStreamOperatorTuple3String, Double, Double resultStream dataStream.keyBy(id).flatMap(new TempChangeWarning(10.0));resultStream.print();env.execute();}// 实现自定义函数类public static class TempChangeWarning extends RichFlatMapFunctionSensorReading, Tuple3String, Double, Double{// 私有属性温度跳变阈值private Double threshold;public TempChangeWarning(Double threshold) {this.threshold threshold;}// 定义状态保存上一次的温度值private ValueStateDouble lastTempState;Overridepublic void open(Configuration parameters) throws Exception {lastTempState getRuntimeContext().getState(new ValueStateDescriptorDouble(last-temp, Double.class));}Overridepublic void flatMap(SensorReading value, CollectorTuple3String, Double, Double out) throws Exception {// 获取状态Double lastTemp lastTempState.value();// 如果状态不为null那么就判断两次温度差值if( lastTemp ! null ){Double diff Math.abs( value.getTemperature() - lastTemp );if( diff threshold )out.collect(new Tuple3(value.getId(), lastTemp, value.getTemperature()));}// 更新状态lastTempState.update(value.getTemperature());}Overridepublic void close() throws Exception {lastTempState.clear();}} }sensor_1,1547718206,36.3 sensor_1,1547718206,37.9 sensor_1,1547718206,48 sensor_6,1547718201,15.4 sensor_6,1547718201,35 sensor_1,1547718226,36 状态后端 状态后端 1.本地的状态管理(如何存上下文配置怎么存怎么写) 2.做快照容错如何恢复数据 1. 测试环境MemoryStateBackend 2. 生产环境FsStateBackend 3. 数据非常大时候RocksDBStateBackend state.backend: filesystem //默认使用FsStateBackend tate.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints //配置一个checkpoint的hdfs的存储路径jobmanager.execution.failover-strategy: region //区域化重启state.backend.incremental: false //增量添加checkpoint
http://wiki.neutronadmin.com/news/30462/

相关文章:

  • 接单网站做火牛厦门中小企业建网站补助
  • 网站电子商务平台建设抖音测一测小程序怎么赚钱
  • 外包给网站建设注意事项许昌做网站公司哪家专业
  • 长治网站制作哪家好换友网站
  • 福建省建设厅网站 2013建设网站投标标书范本
  • 东莞网站优化制作网站双倍浮动
  • 怎么做个手机版的网站吗.tech域名的网站
  • 做网站需要买什么东西官方网站下载qq最新版
  • 便利的邯郸网站建设建设通网
  • 怎么买网站空间广告平面设计培训班学费一般多少
  • 广州模板建站系统石家庄做网站推广
  • 网站制作哪个软件字节跳动小程序开发平台
  • wordpress音乐站主题微信开发流程四步
  • 东莞莞城网站建设公司企炬网站
  • 太原seo代理商上海seo网络推广公司
  • 河北省建设招标网站做网站兰州
  • 做国际网站有补贴吗哈尔滨做网站多少钱
  • 建网站业务员wordpress 转发标题
  • 老师问我做网站用到什么创新技术网上商城网站开发报告
  • 沈阳做网站价格e福州首页
  • 成都网站开发收费网站建设计划图
  • 网站风格的设计原则网站数据库怎么建立
  • 电商网站建设精英用wordpress主题首页
  • 建设网站的项目策划书网站怎么排名
  • 做网站的企业广州wordpress怎么找到php文件路径
  • 做淘宝客网站哪个好用济南官网seo厂家
  • 网络营销的应用研究论文广告网站建设网站排名优化
  • 如何建设一个电影网站可能wordpress.org或服务器配置文件存在问题
  • 网站视觉优化的意义手机网站设计神器
  • 在哪做网站好wordpress版本要求