当前位置: 首页 > news >正文

重庆铜梁网站建设费用买了域名怎么做网站

重庆铜梁网站建设费用,买了域名怎么做网站,怎么做监控网站,微信小程序注册费用背景#xff1a; 广播状态可以用于规则表或者配置表的实时更新#xff0c;本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 1.首先看主流…背景 广播状态可以用于规则表或者配置表的实时更新本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 1.首先看主流程主流程中使用了两个Broadcast广播的状态这两个Broadcast广播的状态是独立的 // 这里面包含规则广播状态的两次使用方法,分别在DynamicKeyFunction处理函数和DynamicAlertFunction处理函数,注意这两个处理函数中的广播状态是独立的也就是需要分别维度不能共享// Processing pipeline setupDataStreamAlert alerts transactions.connect(rulesStream).process(new DynamicKeyFunction()).uid(DynamicKeyFunction).name(Dynamic Partitioning Function).keyBy((keyed) - keyed.getKey()).connect(rulesStream).process(new DynamicAlertFunction()).uid(DynamicAlertFunction).name(Dynamic Rule Evaluation Function);2.BroadcastProcessFunction的处理这里面会维护这个算子本身的广播状态并把所有的事件扩散发送到下一个算子 public class DynamicKeyFunctionextends BroadcastProcessFunctionTransaction, Rule, KeyedTransaction, String, Integer {Overridepublic void open(Configuration parameters) {}// 这里会把每个事件结合上广播状态中的每个规则生成N条记录流转到下一个算子Overridepublic void processElement(Transaction event, ReadOnlyContext ctx, CollectorKeyedTransaction, String, Integer out)throws Exception {ReadOnlyBroadcastStateInteger, Rule rulesState ctx.getBroadcastState(Descriptors.rulesDescriptor);forkEventForEachGroupingKey(event, rulesState, out);}// 独立维护广播状态,可以在广播状态中新增删除或者清空广播状态Overridepublic void processBroadcastElement(Rule rule, Context ctx, CollectorKeyedTransaction, String, Integer out) throws Exception {log.info({}, rule);BroadcastStateInteger, Rule broadcastState ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);if (rule.getRuleState() RuleState.CONTROL) {handleControlCommand(rule.getControlType(), broadcastState);}}}static void handleRuleBroadcast(Rule rule, BroadcastStateInteger, Rule broadcastState)throws Exception {switch (rule.getRuleState()) {case ACTIVE:case PAUSE:broadcastState.put(rule.getRuleId(), rule);break;case DELETE:broadcastState.remove(rule.getRuleId());break;}}3.KeyedBroadcastProcessFunction的处理,这里面也是会维护这个算子本身的广播状态此外还有键值分区状态特别注意的是在处理广播元素时可以用applyToKeyedState方法对所有的键值分区状态应用某个方法对于ontimer方法依然可以访问键值分区状态和广播状态 /** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* License); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.ververica.field.dynamicrules.functions;import static com.ververica.field.dynamicrules.functions.ProcessingUtils.addToStateValuesSet; import static com.ververica.field.dynamicrules.functions.ProcessingUtils.handleRuleBroadcast;import com.ververica.field.dynamicrules.Alert; import com.ververica.field.dynamicrules.FieldsExtractor; import com.ververica.field.dynamicrules.Keyed; import com.ververica.field.dynamicrules.Rule; import com.ververica.field.dynamicrules.Rule.ControlType; import com.ververica.field.dynamicrules.Rule.RuleState; import com.ververica.field.dynamicrules.RuleHelper; import com.ververica.field.dynamicrules.RulesEvaluator.Descriptors; import com.ververica.field.dynamicrules.Transaction; import java.math.BigDecimal; import java.util.*; import java.util.Map.Entry; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.accumulators.SimpleAccumulator; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.util.Collector;/** Implements main rule evaluation and alerting logic. */ Slf4j public class DynamicAlertFunctionextends KeyedBroadcastProcessFunctionString, KeyedTransaction, String, Integer, Rule, Alert {private static final String COUNT COUNT_FLINK;private static final String COUNT_WITH_RESET COUNT_WITH_RESET_FLINK;private static int WIDEST_RULE_KEY Integer.MIN_VALUE;private static int CLEAR_STATE_COMMAND_KEY Integer.MIN_VALUE 1;private transient MapStateLong, SetTransaction windowState;private Meter alertMeter;private MapStateDescriptorLong, SetTransaction windowStateDescriptor new MapStateDescriptor(windowState,BasicTypeInfo.LONG_TYPE_INFO,TypeInformation.of(new TypeHintSetTransaction() {}));Overridepublic void open(Configuration parameters) {windowState getRuntimeContext().getMapState(windowStateDescriptor);alertMeter new MeterView(60);getRuntimeContext().getMetricGroup().meter(alertsPerSecond, alertMeter);}// 键值分区状态和广播状态联合处理在这个方法中可以更新键值分区状态然后广播状态只能读取Overridepublic void processElement(KeyedTransaction, String, Integer value, ReadOnlyContext ctx, CollectorAlert out)throws Exception {long currentEventTime value.getWrapped().getEventTime();addToStateValuesSet(windowState, currentEventTime, value.getWrapped());long ingestionTime value.getWrapped().getIngestionTimestamp();ctx.output(Descriptors.latencySinkTag, System.currentTimeMillis() - ingestionTime);Rule rule ctx.getBroadcastState(Descriptors.rulesDescriptor).get(value.getId());if (noRuleAvailable(rule)) {log.error(Rule with ID {} does not exist, value.getId());return;}if (rule.getRuleState() Rule.RuleState.ACTIVE) {Long windowStartForEvent rule.getWindowStartFor(currentEventTime);long cleanupTime (currentEventTime / 1000) * 1000;ctx.timerService().registerEventTimeTimer(cleanupTime);SimpleAccumulatorBigDecimal aggregator RuleHelper.getAggregator(rule);for (Long stateEventTime : windowState.keys()) {if (isStateValueInWindow(stateEventTime, windowStartForEvent, currentEventTime)) {aggregateValuesInState(stateEventTime, aggregator, rule);}}BigDecimal aggregateResult aggregator.getLocalValue();boolean ruleResult rule.apply(aggregateResult);ctx.output(Descriptors.demoSinkTag,Rule rule.getRuleId() | value.getKey() : aggregateResult.toString() - ruleResult);if (ruleResult) {if (COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {evictAllStateElements();}alertMeter.markEvent();out.collect(new Alert(rule.getRuleId(), rule, value.getKey(), value.getWrapped(), aggregateResult));}}}//维护广播状态新增/删除或者整个清空,值得注意的是处理广播元素时可以对所有的键值分区状态应用某个函数比如这里当收到某个属于控制消息的广播消息时使用applyToKeyedState方法把所有的键值分区状态都清空Overridepublic void processBroadcastElement(Rule rule, Context ctx, CollectorAlert out)throws Exception {log.info({}, rule);BroadcastStateInteger, Rule broadcastState ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);updateWidestWindowRule(rule, broadcastState);if (rule.getRuleState() RuleState.CONTROL) {handleControlCommand(rule, broadcastState, ctx);}}private void handleControlCommand(Rule command, BroadcastStateInteger, Rule rulesState, Context ctx) throws Exception {ControlType controlType command.getControlType();switch (controlType) {case EXPORT_RULES_CURRENT:for (Map.EntryInteger, Rule entry : rulesState.entries()) {ctx.output(Descriptors.currentRulesSinkTag, entry.getValue());}break;case CLEAR_STATE_ALL:ctx.applyToKeyedState(windowStateDescriptor, (key, state) - state.clear());break;case CLEAR_STATE_ALL_STOP:rulesState.remove(CLEAR_STATE_COMMAND_KEY);break;case DELETE_RULES_ALL:IteratorEntryInteger, Rule entriesIterator rulesState.iterator();while (entriesIterator.hasNext()) {EntryInteger, Rule ruleEntry entriesIterator.next();rulesState.remove(ruleEntry.getKey());log.info(Removed Rule {}, ruleEntry.getValue());}break;}}private boolean isStateValueInWindow(Long stateEventTime, Long windowStartForEvent, long currentEventTime) {return stateEventTime windowStartForEvent stateEventTime currentEventTime;}private void aggregateValuesInState(Long stateEventTime, SimpleAccumulatorBigDecimal aggregator, Rule rule) throws Exception {SetTransaction inWindow windowState.get(stateEventTime);if (COUNT.equals(rule.getAggregateFieldName())|| COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {for (Transaction event : inWindow) {aggregator.add(BigDecimal.ONE);}} else {for (Transaction event : inWindow) {BigDecimal aggregatedValue FieldsExtractor.getBigDecimalByName(rule.getAggregateFieldName(), event);aggregator.add(aggregatedValue);}}}private boolean noRuleAvailable(Rule rule) {// This could happen if the BroadcastState in this CoProcessFunction was updated after it was// updated and used in DynamicKeyFunctionif (rule null) {return true;}return false;}private void updateWidestWindowRule(Rule rule, BroadcastStateInteger, Rule broadcastState)throws Exception {Rule widestWindowRule broadcastState.get(WIDEST_RULE_KEY);if (rule.getRuleState() ! Rule.RuleState.ACTIVE) {return;}if (widestWindowRule null) {broadcastState.put(WIDEST_RULE_KEY, rule);return;}if (widestWindowRule.getWindowMillis() rule.getWindowMillis()) {broadcastState.put(WIDEST_RULE_KEY, rule);}}// ontimer方法中可以访问/更新键值分区状态读取广播状态此外ontimer方法和processElement方法以及processBroadcastElement方法是同步的不需要考虑并发访问的问题Overridepublic void onTimer(final long timestamp, final OnTimerContext ctx, final CollectorAlert out)throws Exception {Rule widestWindowRule ctx.getBroadcastState(Descriptors.rulesDescriptor).get(WIDEST_RULE_KEY);OptionalLong cleanupEventTimeWindow Optional.ofNullable(widestWindowRule).map(Rule::getWindowMillis);OptionalLong cleanupEventTimeThreshold cleanupEventTimeWindow.map(window - timestamp - window);cleanupEventTimeThreshold.ifPresent(this::evictAgedElementsFromWindow);}private void evictAgedElementsFromWindow(Long threshold) {try {IteratorLong keys windowState.keys().iterator();while (keys.hasNext()) {Long stateEventTime keys.next();if (stateEventTime threshold) {keys.remove();}}} catch (Exception ex) {throw new RuntimeException(ex);}}private void evictAllStateElements() {try {IteratorLong keys windowState.keys().iterator();while (keys.hasNext()) {keys.next();keys.remove();}} catch (Exception ex) {throw new RuntimeException(ex);}} } ps: ontimer方法和processElement方法是同步访问的没有并发的问题所以不需要考虑同时更新键值分区状态的线程安全问题 参考文献 https://flink.apache.org/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/
http://wiki.neutronadmin.com/news/289685/

相关文章:

  • 西安网站开发公司哪家强wordpress .htaccess 规则
  • 如何在百度上做网站推广wordpress栏目id顺序
  • 雅虎做网站推广工程公司注册条件
  • 青海找人做网站多少钱亿万网站
  • 赣州那里有做网站的公司海安网站建设
  • 推进政务服务网站一体化建设石家庄建筑网
  • 工业和信息化部网站备案系统怎么登录商标注册查询设计类型 vi设计生成
  • 网站内容规范中信建设有限责任公司洪波
  • 廊坊网络公司网站wordpress用户调用
  • 网上医疗和医院网站建设制作拉扎斯网络科技上海有限公司
  • 一个ip地址上可以做几个网站无锡网站建设制作开发
  • 中国效能建设网站关键词上首页的有效方法
  • 内蒙营销型网站建设上海房地产网站官网
  • 做的网站怎么让别人也能看到下载wix做的网站
  • 电子商务网站建设可运用的技术述建设一个网站的具体步骤
  • 沧州哪里做网站免费软件园
  • 做直播网站一定要idc吗重庆建网站一般多少钱
  • 服务好的高端网站建设科技企业网站模板
  • 网站建设写seo综合查询网站源码
  • 盘锦网站网站建设沈阳网上房地产
  • 建设一个网站的基本步骤网站loading动画效果
  • 手机网站怎么dw做邢台123招聘信息网
  • 河南住房和城乡建设厅网站特种wordpress 裁剪图片上传
  • 电影网站的建设目标搜索引擎优化到底是优化什么
  • 网站运行平台包括wordpress 自定义内容类型
  • 网站推广策划方案毕业设计算命购物网站建设
  • 模拟ip访问网站建设银行关方网站
  • 途牛旅行网站建设策划书wordpress自带的简码
  • 深圳有哪些网站是做餐饮沙龙的wordpress中文主程序优化
  • 做seo用哪种建站程序最好引用网站资料怎么注明