当前位置: 首页 > news >正文

如何提高网站优化网店代运营公司方案

如何提高网站优化,网店代运营公司方案,wordpress 怎么设置,长沙网站排名分析星光下的赶路人star的个人主页 我的敌手就是我自己#xff0c;我要他美好到能使我满意的程度 文章目录 1、处理函数1.1 基本处理函数#xff08;ProcessFunction#xff09;1.1.1 处理函数的功能和使用1.1.2 ProcessFunction解析1.1.3 处理函数的分类 1.2 按键分区处理函数我要他美好到能使我满意的程度 文章目录 1、处理函数1.1 基本处理函数ProcessFunction1.1.1 处理函数的功能和使用1.1.2 ProcessFunction解析1.1.3 处理函数的分类 1.2 按键分区处理函数KeyedProcessFunction1.2.1 定时器Timer和定时服务TimeService1.2.2 KeyedProcessFunction案例 1.3 窗口处理函数1.3.1 窗口处理函数的使用1.3.2 ProcessWindowFunction解析 1.4 应用案例---topN1.4.1 使用ProcessAllWindowFunction1.4.2 使用KeyedProcessFunction 1.5 侧输出流Side Output 1、处理函数 之前所介绍的流处理API无论是基本的转换、聚合还是更为复杂的窗口操作其实都是基于DataStream进行转换的所以可以统称为DataStream API。 在Flink更底层我们可以不定义任何具体的算子比如mapfilter或者window而只是提炼出一个统一的“处理”process操作——它是所有转换算子的一个概括性的表达可以自定义处理逻辑所以这一层接口就被叫作“处理函数”process function。 1.1 基本处理函数ProcessFunction 1.1.1 处理函数的功能和使用 我们之前学习的转换算子一般只是针对某种具体操作来定义的能够拿到的信息比较有限。如果我们想要访问事件的时间戳或者当前的水位线信息都是完全做不到的。跟时间相关的操作目前我们只会用窗口来处理。而在很多应用需求中要求我们对时间有更精细的控制需要能够获取水位线甚至要“把控时间”、定义什么时候做什么事这就不是基本的时间窗口能够实现的了。 这时就需要使用底层的处理函数。处理函数提供了一个“定时服务”TimerService我们可以通过它访问流中的事件event、时间戳timestamp、水位线watermark甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类所以拥有富函数类的所有特性同样可以访问状态state和其他运行时信息。此外处理函数还可以直接将数据输出到侧输出流side output中。所以处理函数是最为灵活的处理方法可以实现各种自定义的业务逻辑。 处理函数的使用与基本的转换操作类似只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数用来定义处理逻辑。 stream.process(new MyProcessFunction()这里ProcessFunction不是接口而是一个抽象类继承了AbstractRichFunctionMyProcessFunction是它的一个具体实现。所以所有的处理函数都是富函数RichFunction富函数可以调用的东西这里同样都可以调用。 1.1.2 ProcessFunction解析 在源码中我们可以看到抽象类ProcessFunction继承了AbstractRichFunction有两个泛型类型参数I表示Input也就是输入的数据类型O表示Output也就是处理完成之后输出的数据类型。 内部单独定义了两个方法一个是必须要实现的抽象方法.processElement()另一个是非抽象方法.onTimer()。 public abstract class ProcessFunctionI, O extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, CollectorO out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, CollectorO out) throws Exception {}...}1、抽象方法processElement() 用于“处理元素”定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次参数包括三个输入数据值value上下文ctx以及“收集器”Collectorout。方法没有返回值处理之后的输出数据是通过收集器out来定义的。 -value当前流中的输入元素也就是正在处理的数据类型和流中数据类型一致。 ctx类型是ProcessFunction中定义的内部抽象类Context表示当前运行的上下文可以获取到当前的时间戳并提供了用于查询时间和注册定时器的“定时服务”TimerService以及可以将输出发送到“侧输出流”side output的方法output()。out :“收集器”类型为Collector用于返回输出数据。使用方式与flatMap算子中的收集器完全一样直接调用out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用也可以不调用。 通过几个参数的分析不难发现ProcessFunction可以轻松实现flatMap、map、filter这样的基本转换功能而通过富函数提供的获取上下文方法.getRuntimeContext()也可以自定义状态state进行处理这也就能实现聚合操作的功能了。 2、非抽象方法onTimer() 这个方法只有在注册好的定时器触发的时候才会调用而定时器是通过“定时服务”TimerService来注册的。打个比方注册定时器timer就是设了一个闹钟到了设定时间就会响而.onTimer()中定义的就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”callback方法通过时间的进展来触发在事件时间语义下就是由水位线watermark来触发了。 定时方法.onTimer()也有三个参数时间戳timestamp上下文ctx以及收集器out。这里的timestamp是指设定好的触发时间事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器所以也可以调用定时服务TimerService以及任意输出处理之后的数据。 既然有.onTimer()方法做定时触发我们用ProcessFunction也可以自定义数据按照时间分组、定时触发计算输出结果这其实就实现了窗口window的功能。所以说ProcessFunction其实可以实现一切功能。 注意在Flink中只有“按键分区流”KeyedStream才支持设置定时器的操作。 1.1.3 处理函数的分类 我们知道DataStream在调用一些转换方法之后有可能生成新的流类型例如调用.keyBy()之后得到KeyedStream进而再调用.window()之后得到WindowedStream。对于不同类型的流其实都可以直接调用.process()方法进行自定义处理这时传入的参数就都叫作处理函数。当然它们尽管本质相同都是可以访问状态和时间信息的底层API可彼此之间也会有所差异。 Flink提供了8个不同的处理函数 1ProcessFunction 最基本的处理函数基于DataStream直接调用.process()时作为参数传入。 2KeyedProcessFunction 对流按键分区后的处理函数基于KeyedStream调用.process()时作为参数传入。要想使用定时器比如基于KeyedStream。 3ProcessWindowFunction 开窗之后的处理函数也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。 4ProcessAllWindowFunction 同样是开窗之后的处理函数基于AllWindowedStream调用.process()时作为参数传入。 5CoProcessFunction 合并connect两条流之后的处理函数基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作我们会在后续章节详细介绍。 6ProcessJoinFunction 间隔连接interval join两条流之后的处理函数基于IntervalJoined调用.process()时作为参数传入。 7BroadcastProcessFunction 广播连接流处理函数基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream是一个未keyBy的普通DataStream与一个广播流BroadcastStream做连接conncet之后的产物。关于广播流的相关操作我们会在后续章节详细介绍。 8KeyedBroadcastProcessFunction 按键分区的广播连接流处理函数同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是这时的广播连接流是一个KeyedStream与广播流BroadcastStream做连接之后的产物。 1.2 按键分区处理函数KeyedProcessFunction 在上节中提到只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下我们都是先做了keyBy分区之后再去定义处理操作代码中更加常见的处理函数是KeyedProcessFunction。 1.2.1 定时器Timer和定时服务TimeService 在.onTimer()方法中可以实现定时处理的逻辑而它能触发的前提就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能是通过上下文中提供的“定时服务”来实现的。 定时服务与当前运行的环境有关。前面已经介绍过ProcessFunction的上下文Context中提供了.timerService()方法可以直接返回一个TimerService对象。TimerService是Flink关于时间和定时器的基础服务接口包含以下六个方法 // 获取当前的处理时间 long currentProcessingTime();// 获取当前的水位线事件时间 long currentWatermark();// 注册处理时间定时器当处理时间超过time时触发 void registerProcessingTimeTimer(long time);// 注册事件时间定时器当水位线超过time时触发 void registerEventTimeTimer(long time);// 删除触发时间为time的处理时间定时器 void deleteProcessingTimeTimer(long time);// 删除触发时间为time的处理时间定时器 void deleteEventTimeTimer(long time);六个方法可以分成两大类基于处理时间和基于事件时间。而对应的操作主要有三个获取当前时间注册定时器以及删除定时器。需要注意尽管处理函数中都可以直接访问TimerService不过只有基于KeyedStream的处理函数才能去调用注册和删除定时器的方法未作按键分区的DataStream不支持定时器操作只能获取当前时间。 TimerService会以键key和时间戳为标准对定时器进行去重也就是说对于每个key和时间戳最多只有一个定时器如果注册了多次onTimer()方法也将只被调用一次。 1.2.2 KeyedProcessFunction案例 基于keyBy之后的KeyedStream直接调用.process()方法这时需要传入的参数就是KeyedProcessFunction的实现类。 public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// TODO Process:keyedSingleOutputStreamOperatorString process sensorKS.process(new KeyedProcessFunctionString, WaterSensor, String() {/*** 来一条数据调用一次* param value* param ctx* param out* throws Exception*/Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {//获取当前数据的keyString currentKey ctx.getCurrentKey();// TODO 1.定时器注册TimerService timerService ctx.timerService();// 1、事件时间的案例Long currentEventTime ctx.timestamp(); // 数据中提取出来的事件时间timerService.registerEventTimeTimer(5000L);System.out.println(当前key currentKey ,当前时间 currentEventTime ,注册了一个5s的定时器);// 2、处理时间的案例 // long currentTs timerService.currentProcessingTime(); // timerService.registerProcessingTimeTimer(currentTs 5000L); // System.out.println(当前key currentKey ,当前时间 currentTs ,注册了一个5s后的定时器);// 3、获取 process的 当前watermark // long currentWatermark timerService.currentWatermark(); // System.out.println(当前数据 value ,当前watermark currentWatermark);// 注册定时器 处理时间、事件时间 // timerService.registerProcessingTimeTimer(); // timerService.registerEventTimeTimer();// 删除定时器 处理时间、事件时间 // timerService.deleteEventTimeTimer(); // timerService.deleteProcessingTimeTimer();// 获取当前时间进展 处理时间-当前系统时间 事件时间-当前watermark // long currentTs timerService.currentProcessingTime(); // long wm timerService.currentWatermark();}/*** TODO 2.时间进展到定时器注册的时间调用该方法* param timestamp 当前时间进展就是定时器被触发时的时间* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey ctx.getCurrentKey();System.out.println(key currentKey 现在时间是 timestamp 定时器触发);}});process.print();env.execute();} }1.3 窗口处理函数 除了KeyedProcessFunction另外一大类常用的处理函数就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。我们之前已经简单地使用过窗口处理函数了。 1.3.1 窗口处理函数的使用 进行窗口计算我们可以直接调用现成的简单聚合方法sum/max/min也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数ReduceFunction/AggregateFucntion而对于更加复杂、需要窗口信息和额外状态的一些场景我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。 窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似也是基于WindowedStream直接调用方法就可以只不过这时调用的是.process()。 stream.keyBy( t - t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())1.3.2 ProcessWindowFunction解析 ProcessWindowFunction既是处理函数又是全窗口函数。从名字上也可以推测出它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点 public abstract class ProcessWindowFunctionIN, OUT, KEY, W extends Window extends AbstractRichFunction {...public abstract void process(KEY key, Context context, IterableIN elements, CollectorOUT out) throws Exception;public void clear(Context context) throws Exception {}public abstract class Context implements java.io.Serializable {...} }ProcessWindowFunction依然是一个继承了AbstractRichFunction的抽象类它有四个类型参数 INinput数据流中窗口任务的输入数据类型。OUToutput窗口任务进行计算之后的输出数据类型。KEY数据中键key的类型。W窗口的类型是Window的子类型。一般情况下我们定义时间窗口W就是TimeWindow。 ProcessWindowFunction里面处理数据的核心方法.process()。方法包含四个参数。key窗口做统计计算基于的键也就是之前keyBy用来分区的字段。context当前窗口进行计算的上下文它的类型就是ProcessWindowFunction内部定义的抽象类Context。elements窗口收集到用来计算的所有数据这是一个可迭代的集合类型。 -out用来发送数据输出计算结果的收集器类型为Collector。 可以明显看出这里的参数不再是一个输入数据而是窗口中所有数据的集合。而上下文context所包含的内容也跟其他处理函数有所差别 public abstract class Context implements java.io.Serializable {public abstract W window();public abstract long currentProcessingTime();public abstract long currentWatermark();public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();public abstract X void output(OutputTagX outputTag, X value);}除了可以通过.output()方法定义侧输出流不变外其他部分都有所变化。这里不再持有TimerService对象只能通过currentProcessingTime()和currentWatermark()来获取当前时间所以失去了设置定时器的功能另外由于当前不是只处理一个数据所以也不再提供.timestamp()方法。与此同时也增加了一些获取其他信息的方法比如可以通过.window()直接获取到当前的窗口对象也可以通过.windowState()和.globalState()获取到当前自定义的窗口状态和全局状态。注意这里的“窗口状态”是自定义的不包括窗口本身已经有的状态针对当前key、当前窗口有效而“全局状态”同样是自定义的状态针对当前key的所有窗口有效。 所以我们会发现ProcessWindowFunction中除了.process()方法外并没有.onTimer()方法而是多出了一个.clear()方法。从名字就可以看出这主要是方便我们进行窗口的清理工作。如果我们自定义了窗口状态那么必须在.clear()方法中进行显式地清除避免内存溢出。 至于另一种窗口处理函数ProcessAllWindowFunction它的用法非常类似。区别在于它基于的是AllWindowedStream相当于对没有keyBy的数据流直接开窗并调用.process()方法 stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessAllWindowFunction())1.4 应用案例—topN 案例需求实时统计一段时间内的出现次数最多的水位。例如统计最近10秒钟内出现次数最多的两个水位并且每5秒钟更新一次。我们知道这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据按照不同的水位进行统计而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。 1.4.1 使用ProcessAllWindowFunction 思路一一种最简单的想法是我们干脆不区分不同水位而是将所有访问数据都收集起来统一进行统计计算。所以可以不做keyBy直接基于DataStream开窗然后使用全窗口函数ProcessAllWindowFunction来进行处理。 在窗口中可以用一个HashMap来保存每个水位的出现次数只要遍历窗口中的所有数据自然就能得到所有水位的出现次数。最后把HashMap转成一个列表ArrayList然后进行排序、取出前两名输出就可以了。 代码具体实现如下 public class ProcessAllWindowTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));// 最近10秒 窗口长度 每5秒输出 滑动步长// TODO 思路一 所有数据到一起 用hashmap存 keyvcvaluecount值sensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new MyTopNPAWF()).print();env.execute();}public static class MyTopNPAWF extends ProcessAllWindowFunctionWaterSensor, String, TimeWindow {Overridepublic void process(Context context, IterableWaterSensor elements, CollectorString out) throws Exception {// 定义一个hashmap用来存keyvcvaluecount值MapInteger, Integer vcCountMap new HashMap();// 1.遍历数据, 统计 各个vc出现的次数for (WaterSensor element : elements) {Integer vc element.getVc();if (vcCountMap.containsKey(vc)) {// 1.1 key存在不是这个key的第一条数据直接累加vcCountMap.put(vc, vcCountMap.get(vc) 1);} else {// 1.2 key不存在初始化vcCountMap.put(vc, 1);}}// 2.对 count值进行排序: 利用List来实现排序ListTuple2Integer, Integer datas new ArrayList();for (Integer vc : vcCountMap.keySet()) {datas.add(Tuple2.of(vc, vcCountMap.get(vc)));}// 对List进行排序根据count值 降序datas.sort(new ComparatorTuple2Integer, Integer() {Overridepublic int compare(Tuple2Integer, Integer o1, Tuple2Integer, Integer o2) {// 降序 后 减 前return o2.f1 - o1.f1;}});// 3.取出 count最大的2个 vcStringBuilder outStr new StringBuilder();outStr.append(\n);// 遍历 排序后的 List取出前2个 考虑可能List不够2个的情况 》 List中元素的个数 和 2 取最小值for (int i 0; i Math.min(2, datas.size()); i) {Tuple2Integer, Integer vcCount datas.get(i);outStr.append(Top (i 1) \n);outStr.append(vc vcCount.f0 \n);outStr.append(count vcCount.f1 \n);outStr.append(窗口结束时间 DateFormatUtils.format(context.window().getEnd(), yyyy-MM-dd HH:mm:ss.SSS) \n);outStr.append(\n);}out.collect(outStr.toString());}} }1.4.2 使用KeyedProcessFunction 思路二在上一小节的实现过程中我们没有进行按键分区直接将所有数据放在一个分区上进行了开窗操作。这相当于将并行度强行设置为1在实际应用中是要尽量避免的所以Flink官方也并不推荐使用AllWindowedStream进行处理。另外我们在全窗口函数中定义了HashMap来统计vc的出现次数计算过程是要先收集齐所有数据、然后再逐一遍历更新HashMap这显然不够高效。 基于这样的想法我们可以从两个方面去做优化一是对数据进行按键分区分别统计vc的出现次数二是进行增量聚合得到结果最后再做排序输出。所以我们可以使用增量聚合函数AggregateFunction进行浏览量的统计然后结合ProcessWindowFunction排序输出来实现Top N的需求。 具体实现可以分成两步先对每个vc统计出现次数然后再将统计结果收集起来排序输出最终结果。由于最后的排序还是基于每个时间窗口的输出的统计结果中要包含窗口信息我们可以输出包含了vc、出现次数count以及窗口结束时间的Tuple3。之后先按窗口结束时间分区然后用KeyedProcessFunction来实现。 用KeyedProcessFunction来收集数据做排序这时面对的是窗口聚合之后的数据流而窗口已经不存在了我们需要确保能够收集齐所有数据所以应该在窗口结束时间基础上再“多等一会儿”。具体实现上可以采用一个延迟触发的事件时间定时器。基于窗口的结束时间来设定延迟其实并不需要等太久——因为我们是靠水位线的推进来触发定时器而水位线的含义就是“之前的数据都到齐了”。所以我们只需要设置1毫秒的延迟就一定可以保证这一点。 而在等待过程中之前已经到达的数据应该缓存起来我们这里用一个自定义的HashMap来进行存储key为窗口的标记value为List。之后每来一条数据就把它添加到当前的HashMap中并注册一个触发时间为窗口结束时间加1毫秒windowEnd 1的定时器。待到水位线到达这个时间定时器触发我们可以保证当前窗口所有vc的统计结果Tuple3都到齐了于是从HashMap中取出进行排序输出。 具体代码实现如下 public class KeyedProcessFunctionTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));// 最近10秒 窗口长度 每5秒输出 滑动步长/*** TODO 思路二 使用 KeyedProcessFunction实现* 1、按照vc做keyby开窗分别count* 》 增量聚合计算 count* 》 全窗口对计算结果 count值封装 带上 窗口结束时间的 标签* 》 为了让同一个窗口时间范围的计算结果到一起去** 2、对同一个窗口范围的count值进行处理 排序、取前N个* 》 按照 windowEnd做keyby* 》 使用process 来一条调用一次需要先存分开存用HashMap,keywindowEnd,valueList* 》 使用定时器对 存起来的结果 进行 排序、取前N个*/// 1. 按照 vc 分组、开窗、聚合增量计算全量打标签// 开窗聚合后就是普通的流没有了窗口信息需要自己打上窗口的标记 windowEndSingleOutputStreamOperatorTuple3Integer, Integer, Long windowAgg sensorDS.keyBy(sensor - sensor.getVc()).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new VcCountAgg(),new WindowResult());// 2. 按照窗口标签窗口结束时间keyby保证同一个窗口时间范围的结果到一起去。排序、取TopNwindowAgg.keyBy(r - r.f2).process(new TopN(2)).print();env.execute();}public static class VcCountAgg implements AggregateFunctionWaterSensor, Integer, Integer {Overridepublic Integer createAccumulator() {return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {return accumulator 1;}Overridepublic Integer getResult(Integer accumulator) {return accumulator;}Overridepublic Integer merge(Integer a, Integer b) {return null;}}/*** 泛型如下* 第一个输入类型 增量函数的输出 count值Integer* 第二个输出类型 Tuple3(vccountwindowEnd) ,带上 窗口结束时间 的标签* 第三个key类型 vcInteger* 第四个窗口类型*/public static class WindowResult extends ProcessWindowFunctionInteger, Tuple3Integer, Integer, Long, Integer, TimeWindow {Overridepublic void process(Integer key, Context context, IterableInteger elements, CollectorTuple3Integer, Integer, Long out) throws Exception {// 迭代器里面只有一条数据next一次即可Integer count elements.iterator().next();long windowEnd context.window().getEnd();out.collect(Tuple3.of(key, count, windowEnd));}}public static class TopN extends KeyedProcessFunctionLong, Tuple3Integer, Integer, Long, String {// 存不同窗口的 统计结果keywindowEndvaluelist数据private MapLong, ListTuple3Integer, Integer, Long dataListMap;// 要取的Top数量private int threshold;public TopN(int threshold) {this.threshold threshold;dataListMap new HashMap();}Overridepublic void processElement(Tuple3Integer, Integer, Long value, Context ctx, CollectorString out) throws Exception {// 进入这个方法只是一条数据要排序得到齐才行 》 存起来不同窗口分开存// 1. 存到HashMap中Long windowEnd value.f2;if (dataListMap.containsKey(windowEnd)) {// 1.1 包含vc不是该vc的第一条直接添加到List中ListTuple3Integer, Integer, Long dataList dataListMap.get(windowEnd);dataList.add(value);} else {// 1.1 不包含vc是该vc的第一条需要初始化listListTuple3Integer, Integer, Long dataList new ArrayList();dataList.add(value);dataListMap.put(windowEnd, dataList);}// 2. 注册一个定时器 windowEnd1ms即可// 同一个窗口范围应该同时输出只不过是一条一条调用processElement方法只需要延迟1ms即可ctx.timerService().registerEventTimeTimer(windowEnd 1);}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);// 定时器触发同一个窗口范围的计算结果攒齐了开始 排序、取TopNLong windowEnd ctx.getCurrentKey();// 1. 排序ListTuple3Integer, Integer, Long dataList dataListMap.get(windowEnd);dataList.sort(new ComparatorTuple3Integer, Integer, Long() {Overridepublic int compare(Tuple3Integer, Integer, Long o1, Tuple3Integer, Integer, Long o2) {// 降序 后 减 前return o2.f1 - o1.f1;}});// 2. 取TopNStringBuilder outStr new StringBuilder();outStr.append(\n);// 遍历 排序后的 List取出前 threshold 个 考虑可能List不够2个的情况 》 List中元素的个数 和 2 取最小值for (int i 0; i Math.min(threshold, dataList.size()); i) {Tuple3Integer, Integer, Long vcCount dataList.get(i);outStr.append(Top (i 1) \n);outStr.append(vc vcCount.f0 \n);outStr.append(count vcCount.f1 \n);outStr.append(窗口结束时间 vcCount.f2 \n);outStr.append(\n);}// 用完的List及时清理节省资源dataList.clear();out.collect(outStr.toString());}} }1.5 侧输出流Side Output 处理函数还有另外一个特有功能就是将自定义的数据放入“侧输出流”side output输出。这个概念我们并不陌生之前在讲到窗口处理迟到数据时最后一招就是输出到侧输出流。而这种处理方式的本质其实就是处理函数的侧输出流功能。 我们之前讲到的绝大多数转换算子输出的都是单一流流里的数据类型只能有一种。而侧输出流可以认为是“主流”上分叉出的“支流”所以可以由一条流产生出多条流而且这些流中的数据类型还可以不一样。利用这个功能可以很容易地实现“分流”操作。 具体应用时只要在处理函数的.processElement()或者.onTimer()方法中调用上下文的.output()方法就可以了。 DataStreamInteger stream env.fromSource(...);OutputTagString outputTag new OutputTagString(side-output) {};SingleOutputStreamOperatorLong longStream stream.process(new ProcessFunctionInteger, Long() {Overridepublic void processElement( Integer value, Context ctx, CollectorInteger out) throws Exception {// 转换成Long输出到主流中out.collect(Long.valueOf(value));// 转换成String输出到侧输出流中ctx.output(outputTag, side-output: String.valueOf(value));} });这里output()方法需要传入两个参数第一个是一个“输出标签”OutputTag用来标识侧输出流一般会在外部统一声明第二个就是要输出的数据。 我们可以在外部先将OutputTag声明出来 OutputTagString outputTag new OutputTagString(side-output) {};如果想要获取这个侧输出流可以基于处理之后的DataStream直接调用.getSideOutput()方法传入对应的OutputTag这个方式与窗口API中获取侧输出流是完全一样的。 DataStreamString stringStream longStream.getSideOutput(outputTag);案例需求对每个传感器水位超过10的输出告警信息 public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));OutputTagString warnTag new OutputTag(warn, Types.STRING);SingleOutputStreamOperatorWaterSensor process sensorDS.keyBy(sensor - sensor.getId()).process(new KeyedProcessFunctionString, WaterSensor, WaterSensor() {Overridepublic void processElement(WaterSensor value, Context ctx, CollectorWaterSensor out) throws Exception {// 使用侧输出流告警if (value.getVc() 10) {ctx.output(warnTag, 当前水位 value.getVc() ,大于阈值10);}// 主流正常 发送数据out.collect(value);}});process.print(主流);process.getSideOutput(warnTag).printToErr(warn);env.execute();} }您的支持是我创作的无限动力 希望我能为您的未来尽绵薄之力 如有错误谢谢指正若有收获谢谢赞美
http://wiki.neutronadmin.com/news/144763/

相关文章:

  • 包车哪个网站做的最好网站找什么公司做
  • 那些免费网站做推广比较好石家庄建设网站公司
  • python可以做网站吗做网站哪家好 要钱
  • 做网站推广优化哪家好网站集约化 建设方案
  • 什么网站可以做问卷网站群建设的必要性
  • 记事本做网站如何排版昆明 网站设计
  • 使用redis做视频网站缓存哈尔滨仿站定制模板建站
  • 会计题库网站怎么做网页编辑用什么软件
  • 腾讯云主机网站建设教程工业企业在线平台
  • 泉州建设银行网站检测网站是否做了301
  • 网站锚点链接怎么做做封面图的网站
  • 公益建设网站的作用企业网站html
  • 深圳松岗 网站建设wordpress又拍
  • 网站版面设计注意事项网站提示危险怎么办
  • 网站建设找好景科技怒火一刀代理平台
  • 备案成功后怎么做网站线上宣传推广方式
  • 做企业网站用什么字体网页网页设计制作公司
  • 网站备案 接入商名称苏州外贸网站建设优化推广
  • 网站还没完成 能备案吗网店代运营犯法吗
  • 网站更换服务器需要重新备案吗房产网上查询系统
  • 九成seo东莞网站seo公司哪家大
  • 大连网站设计制作方案怎么做个网站演示
  • 营销型的网站企业评价一个网站设计的好坏
  • 上海景泰建设股份有限公司网站常州专业网站建设公司哪家好
  • 手机ftp传网站文件网站整体设计意图及其功能
  • 天凡建设股份有限公司网站企业市场网络推广方案
  • 网站建设案例简介怎么写html5做网站
  • 学校资源网站建设源码出售网站怎么做
  • 安徽省建设干部学校网站首页wordpress wshk
  • 网站文章编辑器公司部门设置及职责划分