设计外贸商城网站建设,浙江省建设质量工程协会网站,制作短视频的软件app,在线制作头像模板水位线特点
插入到数据流中的一个标记#xff0c;可以认为是一个特殊的数据主要内容是一个时间戳水位线是基于数据的时间戳生成的#xff0c;即事件时间水位线必须单调递增水位线可以通过设置延迟#xff0c;来保证正确处理乱序数据一个水位线#xff0c;表示事件时间已经…水位线特点
插入到数据流中的一个标记可以认为是一个特殊的数据主要内容是一个时间戳水位线是基于数据的时间戳生成的即事件时间水位线必须单调递增水位线可以通过设置延迟来保证正确处理乱序数据一个水位线表示事件时间已经达到了时间戳t水位线是Flink流处理中保证结果正确性的核心机制
窗口
错误理解窗口是一个固定位置的框数据流源源不断地流过来到某个时间窗口该关闭了就停止收集数据触发计算并窗口关闭输出结果。
Flink中窗口是动态创建的当有落在这个窗口区间范围的数据达到时才创建对应的窗口。事实上触发计算和窗口关闭两个行为可以分开。
总体原则
水位线出现表示这个时间之前的数据已经全部到齐之后再也不会出现了不过要保证绝对正确就必须等足够长的时间这会带来更高的延迟。水位线是流处理中对低延迟和结果正确性的一个权衡机制。
水位线生成方案
水位线的生成位置越靠近数据源越好
WatermarkStrategy:水位线策略对象 1. 水位线生成器 WatermarkGenerator - onEvent() 给每条数据生成水位线 - onPeriodicEmit():周期性生成水位线 2. 时间戳分配器 TimestampAssigner - extractTimestamp()
有序流水位线生成的代码如下
public class Flink01_UserDefineWaterMarkStrategy {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//设置生成水位线的周期env.getConfig().setAutoWatermarkInterval(1000);//tom,/home,1000SingleOutputStreamOperatorEvent ds env.socketTextStream(hadoop102, 8888).map(line - {String[] words line.split(,);return new Event(words[0].trim(),words[1].trim(),Long.valueOf(words[2].trim()));});ds.print(input);ds.assignTimestampsAndWatermarks(new MyWatermarkStrategy());ds.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}public static class MyWatermarkStrategy implements WatermarkStrategyEvent{/*** 创建水位线生成器* param context* return*/Overridepublic WatermarkGeneratorEvent createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyWatermarkGenerator();}public static class MyWatermarkGenerator implements WatermarkGeneratorEvent{private Long maxTs Long.MIN_VALUE;/*** 每一条数据调用一次用于生成一次水位线* param event* param eventTimestamp* param output*/Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {//有序流每条数据生成水位线
// System.out.println(有序流每条数据生成水位线》eventTimestamp);
// output.emitWatermark(new Watermark(eventTimestamp));maxTs Math.max(maxTs, eventTimestamp);}/*** 周期性生成水位线* 默认周期是200ms* param output*/Overridepublic void onPeriodicEmit(WatermarkOutput output) {//有序流周期性生成水位线System.out.println(有序流周期性生成水位线》maxTs);output.emitWatermark(new Watermark(maxTs));}}/*** 创建时间戳分配器用于从数据中提取时间戳* param context* return*/Overridepublic TimestampAssignerEvent createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new MyTimestampAssigner();}}public static class MyTimestampAssigner implements TimestampAssignerEvent{/*** 从数据中提取时间戳* param element The element that the timestamp will be assigned to.* param recordTimestamp The current internal timestamp of the element, or a negative value, if* no timestamp has been assigned yet.* return*/Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTs();}}}