当前位置: 首页 > news >正文

网站安全建设进展情况汇报itmc电子商务网店运营推广

网站安全建设进展情况汇报,itmc电子商务网店运营推广,东莞专业做网站建设服务,做中国菜的外国网站Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例1 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例2 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例3 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例4 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 17、Flink 之Table API: Table API 支持的操作1 17、Flink 之Table API: Table API 支持的操作2 18、Flink的SQL 支持的操作和语法 19、Flink 的Table API 和 SQL 中的内置函数及示例1 19、Flink 的Table API 和 SQL 中的自定义函数及示例2 19、Flink 的Table API 和 SQL 中的自定义函数及示例3 20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL可以直接提交 SQL 任务到集群上 22、Flink 的table api与sql之创建表的DDL 24、Flink 的table api与sql之Catalogs介绍、类型、java api和sql实现ddl、java api和sql操作catalog-1 24、Flink 的table api与sql之Catalogsjava api操作数据库、表-2 24、Flink 的table api与sql之Catalogsjava api操作视图-3 24、Flink 的table api与sql之Catalogsjava api操作分区与函数-4 26、Flink 的SQL之概览与入门示例 27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例1 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例2 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例3 27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例4 27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例5 27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例6 27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例7 28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE1 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE2 30、Flink SQL之SQL 客户端通过kafka和filesystem的例子介绍了配置文件使用-表、视图等 32、Flink table api和SQL 之用户自定义 Sources Sinks实现及详细示例 41、Flink之Hive 方言介绍及详细示例 42、Flink 的table api与sql之Hive Catalog 43、Flink之Hive 读写及详细验证示例 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的 文章目录 Flink 系列文章5、聚合函数1、示例 6、表值聚合函数1、示例1- 计算topN2、示例2 - emitUpdateWithRetract 方法使用老版本Planner可用 本文介绍了标量聚合函数和表值聚合函数的自定义实现分别以具体的示例进行展示。特别需要提醒的是表值聚合函数自定义实现时针对emitValue和emitUpdateWithRetract方法的不同版本实现要求该处在其官网上没有特别的说明会导致运行异常具体原因及解决办法在示例2emitUpdateWithRetract中有说明。 本文依赖flink集群能正常使用。 本文分为2个部分即标量聚合函数以及表值聚合函数的自定义实现。 本文的示例如无特殊说明则是在Flink 1.17版本中运行。 5、聚合函数 自定义聚合函数UDAGG是把一个表一行或者多行每行可以有一列或者多列聚合成一个标量值。 上面的图片展示了一个聚合的例子。 假设有一个关于饮料的表。表里面有三个字段分别是 id、name、price表里有 5 行数据。 假设需要找到所有饮料里最贵的饮料的价格即执行一个 max() 聚合。 需要遍历所有 5 行数据而结果就只有一个数值。 自定义聚合函数是通过扩展 AggregateFunction 来实现的。AggregateFunction 的工作过程如下。 首先它需要一个 accumulator它是一个数据结构存储了聚合的中间结果。通过调用 AggregateFunction 的 createAccumulator() 方法创建一个空的 accumulator。 接下来对于每一行数据会调用 accumulate() 方法来更新 accumulator。 当所有的数据都处理完了之后通过调用 getValue 方法来计算和返回最终的结果。 下面几个方法是每个 AggregateFunction 必须要实现的 createAccumulator()accumulate()getValue() Flink 的类型推导在遇到复杂类型的时候可能会推导出错误的结果比如那些非基本类型和普通的 POJO 类型的复杂类型。所以跟 ScalarFunction 和 TableFunction 一样AggregateFunction 也提供了 AggregateFunction#getResultType() 和 AggregateFunction#getAccumulatorType() 来分别指定返回值类型和 accumulator 的类型两个函数的返回值类型也都是 TypeInformation。 除了上面的方法还有几个方法可以选择实现。这些方法有些可以让查询更加高效而有些是在某些特定场景下必须要实现的。 例如如果聚合函数用在会话窗口当两个会话窗口合并的时候需要 merge 他们的 accumulator的话merge() 方法就是必须要实现的。 AggregateFunction 的以下方法在某些场景下是必须实现的 retract() 在 bounded OVER 窗口中是必须实现的。merge() 在许多批式聚合和会话以及滚动窗口聚合中是必须实现的。除此之外这个方法对于优化也很多帮助。例如两阶段聚合优化就需要所有的 AggregateFunction 都实现 merge 方法。resetAccumulator() 在许多批式聚合中是必须实现的。 AggregateFunction 的所有方法都必须是 public 的不能是 static 的而且名字必须跟上面写的一样。createAccumulator、getValue、getResultType 以及 getAccumulatorType 这几个函数是在抽象类 AggregateFunction 中定义的而其他函数都是约定的方法。如果要定义一个聚合函数你需要扩展 org.apache.flink.table.functions.AggregateFunction并且实现一个或者多个accumulate 方法。accumulate 方法可以重载每个方法的参数类型不同并且支持变长参数。 AggregateFunction 的所有方法的详细文档说明如下。 /*** Base class for user-defined aggregates and table aggregates.* 用户定义聚合和表聚合的基类。** param T the type of the aggregation result.* param ACC the type of the aggregation accumulator. The accumulator is used to keep the* aggregated values which are needed to compute an aggregation result.* 聚合累加器的类型。累加器用于保存计算聚合结果所需的聚合值。*/ public abstract class UserDefinedAggregateFunctionT, ACC extends UserDefinedFunction {/*** Creates and init the Accumulator for this (table)aggregate function.* 创建和初始化aggregate function 的Accumulator 方法** return the accumulator with the initial value*/public ACC createAccumulator(); // MANDATORY/*** Returns the TypeInformation of the (table)aggregate functions result.* 返回aggregate function的结果类型TypeInformation ** return The TypeInformation of the (table)aggregate functions result or null if the result* type should be automatically inferred.* 返回aggregate function的结果类型TypeInformation 如果结果为null则会自动推导类型*/public TypeInformationT getResultType null; // PRE-DEFINED/*** Returns the TypeInformation of the (table)aggregate functions accumulator.* 返回aggregate functions accumulator 的类型TypeInformation ** return The TypeInformation of the (table)aggregate functions accumulator or null if the* accumulator type should be automatically inferred.*/public TypeInformationACC getAccumulatorType null; // PRE-DEFINED }/*** Base class for aggregation functions.** param T the type of the aggregation result* param ACC the type of the aggregation accumulator. The accumulator is used to keep the* aggregated values which are needed to compute an aggregation result.* AggregateFunction represents its state using accumulator, thereby the state of the* AggregateFunction must be put into the accumulator.* acc aggregation accumulator的类型accumulator 用来保存计算聚合结果所需的聚合值。* AggregateFunction使用累加器表示其状态从而表示AggregateFunction必须放入累加器*/ public abstract class AggregateFunctionT, ACC extends UserDefinedAggregateFunctionT, ACC {/** Processes the input values and update the provided accumulator instance. The method* accumulate can be overloaded with different custom types and arguments. An AggregateFunction* requires at least one accumulate() method.* 处理输入值并更新提供的累加器实例。方法accumulate 可以用不同的自定义类型和参数重载。* 聚合函数至少需要一个accumulate方法。** param accumulator the accumulator which contains the current aggregated results* param [user defined inputs] the input value (usually obtained from a new arrived data).*/public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY/*** Retracts the input values from the accumulator instance. The current design assumes the* inputs are the values that have been previously accumulated. The method retract can be* overloaded with different custom types and arguments. This function must be implemented for* datastream bounded over aggregate.* 收回累加器实例中的输入值。当前设计假设输入是先前累积的值。收回方法可以是重载了不同的自定义类型和参数。* 此功能在datastream的有界流基于over aggregate必须被实现。** param accumulator the accumulator which contains the current aggregated results* param [user defined inputs] the input value (usually obtained from a new arrived data).*/public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL/*** Merges a group of accumulator instances into one accumulator instance. This function must be* implemented for datastream session window grouping aggregate and bounded grouping aggregate.* 将一组accumulator 实例合并为一个accumulator 实例。* 该函数在datastream session window的分组聚合 和 有界流的分组聚合必须实现。* param accumulator the accumulator which will keep the merged aggregate results. It should* be noted that the accumulator may contain the previous aggregated* results. Therefore user should not replace or clean this instance in the* custom merge method. 累加器用于保存合并后的聚合结果。* 应该注意的是累加器可以包含先前的聚合结果。* 因此用户不应在自定义合并方法中替换或清除此实例。* param its an {link java.lang.Iterable} pointed to a group of accumulators that will be* merged.*/public void merge(ACC accumulator, java.lang.IterableACC its); // OPTIONAL/*** Called every time when an aggregation result should be materialized.* The returned value could be either an early and incomplete result* (periodically emitted as data arrive) or the final result of the aggregation.* 每次应该具体化materialized聚合结果时调用。* 返回的值可能是早期且不完整的结果随着数据的到达而定期发出也可能是聚合的最终结果。** param accumulator the accumulator which contains the current aggregated results* return the aggregation result*/public T getValue(ACC accumulator); // MANDATORY/*** Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for* bounded grouping aggregate.* 重置此[[AggregateFunction]]的累加器。必须为有界分组聚合实现此函数。** param accumulator the accumulator which needs to be reset*/public void resetAccumulator(ACC accumulator); // OPTIONAL/*** Returns true if this AggregateFunction can only be applied in an OVER window.* 如果此AggregateFunction只能在OVER窗口中应用则返回true。** return true if the AggregateFunction requires an OVER window, false otherwise.*/public Boolean requiresOver false; // PRE-DEFINED }1、示例 该示例包含以下三个功能 定义一个聚合函数来计算某一列的加权平均在 TableEnvironment 中注册函数在查询中使用函数 为了计算加权平均值accumulator 需要存储加权总和以及数据的条数。 在例子里定义了一个类 Aalan_WeightedAvgAccum 来作为 accumulator。Flink 的 checkpoint 机制会自动保存 accumulator在失败时进行恢复以此来保证精确一次的语义。 例子的WeightedAvgAggregateFunction自定义聚合函数的 accumulate 方法有三个输入参数。 第一个是 Aalan_WeightedAvgAccum accumulator 另外两个是用户自定义的输入输入的值 ivaluebalance 和 输入的权重 iweightage。 尽管 retract()、merge()、resetAccumulator() 这几个方法在大多数聚合类型中都不是必须实现的样例中提供了他们的实现。 在 Scala 样例中也是用的是 Java 的基础类型并且定义了 getResultType() 和 getAccumulatorType()因为 Flink 的类型推导对于 Scala 的类型推导做的不是很好。 import static org.apache.flink.table.api.Expressions.$;import java.util.Arrays; import java.util.Iterator; import java.util.List;import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.types.Row;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;/*** author alanchan**/ public class TestUDAGGDemo {// 加权平均累加器bean加上名称以示区别避免混淆public static class Aalan_WeightedAvgAccum {public long sum 0;public int count 0;}// 聚合函数的自定义实现计算加权平均public static class WeightedAvgAggregateFunction extends AggregateFunctionLong, Aalan_WeightedAvgAccum {/*** 创建和初始化aggregate function 的Accumulator 方法*/Overridepublic Aalan_WeightedAvgAccum createAccumulator() {return new Aalan_WeightedAvgAccum();}/*** 每次应该具体化materialized聚合结果时调用。 返回的值可能是早期且不完整的结果随着数据的到达而定期发出也可能是聚合的最终结果。*/Overridepublic Long getValue(Aalan_WeightedAvgAccum acc) {if (acc.count 0) {return null;} else {return acc.sum / acc.count;}}/*** 处理输入值并更新提供的累加器实例。方法accumulate 可以用不同的自定义类型和参数重载。聚合函数至少需要一个accumulate方法。* * param acc 累加器bean,包含当前汇总结果的累加器* param iValue 输入的需要的累加值* param iWeight 输入的需要累加的值的权重*/public void accumulate(Aalan_WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum iValue * iWeight;acc.count iWeight;}/*** 收回累加器实例中的输入值。当前设计假设输入是先前累积的值。收回方法可以是重载了不同的自定义类型和参数。 此功能在datastream的有界流基于over* aggregate必须被实现。* * param acc 累加器bean,包含当前汇总结果的累加器* param iValue 输入的需要的累加值* param iWeight 输入的需要累加的值的权重*/public void retract(Aalan_WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum - iValue * iWeight;acc.count - iWeight;}/*** 将一组accumulator 实例合并为一个accumulator 实例。 该函数在datastream session window的分组聚合 和* 有界流的分组聚合必须实现。* * param acc 累加器用于保存合并后的聚合结果。 应该注意的是累加器可以包含先前的聚合结果。 因此用户不应在自定义合并方法中替换或清除此实例。* param it 指向将被合并的一组累加器的Iterable。*/public void merge(Aalan_WeightedAvgAccum acc, IterableAalan_WeightedAvgAccum it) {IteratorAalan_WeightedAvgAccum iter it.iterator();while (iter.hasNext()) {Aalan_WeightedAvgAccum a iter.next();acc.count a.count;acc.sum a.sum;}}/*** 重置此[[AggregateFunction]]的累加器。必须为有界分组聚合实现此函数。* * param acc*/public void resetAccumulator(Aalan_WeightedAvgAccum acc) {acc.count 0;acc.sum 0L;}}DataNoArgsConstructorAllArgsConstructorpublic static class User {private long id;private String name;private int age;private long balance;private Long rowtime;}final static ListUser userList Arrays.asList(new User(1L, alan, 18, 20, 1698742358391L), new User(2L, alan, 19, 25, 1698742359396L),new User(3L, alan, 25, 30, 1698742360407L), new User(4L, alanchan, 28, 35, 1698742361409L), new User(5L, alanchan, 29, 35, 1698742362424L));public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 将聚合函数注册为函数tenv.registerFunction(alan_weightavgAF, new WeightedAvgAggregateFunction());DataStreamUser users env.fromCollection(userList);Table usersTable tenv.fromDataStream(users, $(id), $(name), $(age), $(balance), $(rowtime));tenv.createTemporaryView(user_view, users);// 使用函数String sql SELECT name, alan_weightavgAF(balance, age) AS avgPoints FROM user_view GROUP BY name;Table result tenv.sqlQuery(sql);DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print(); // 16 (true,I[alanchan, 35]) // 2 (true,I[alan, 20]) // 2 (false,-U[alan, 20]) // 2 (true,U[alan, 22]) // 2 (false,-U[alan, 22]) // 2 (true,U[alan, 25])env.execute();}}6、表值聚合函数 自定义表值聚合函数UDTAGG可以把一个表一行或者多行每行有一列或者多列聚合成另一张表结果中可以有多行多列。 上图展示了一个表值聚合函数的例子。 假设有一个饮料的表这个表有 3 列分别是 id、name 和 price一共有 5 行。 假设需要找到价格最高的两个饮料类似于 top2() 表值聚合函数。 需要遍历所有 5 行数据结果是有 2 行数据的一个表。 用户自定义表值聚合函数是通过扩展 TableAggregateFunction 类来实现的。 一个 TableAggregateFunction 的工作过程如下。 首先它需要一个 accumulator这个 accumulator 负责存储聚合的中间结果。 通过调用 TableAggregateFunction 的 createAccumulator 方法来构造一个空的 accumulator。 接下来对于每一行数据会调用 accumulate 方法来更新 accumulator。 当所有数据都处理完之后调用 emitValue 方法来计算和返回最终的结果。 下面几个 TableAggregateFunction 的方法是必须要实现的 createAccumulator()accumulate() Flink 的类型推导在遇到复杂类型的时候可能会推导出错误的结果比如那些非基本类型和普通的 POJO 类型的复杂类型。所以类似于 ScalarFunction 和 TableFunctionTableAggregateFunction 也提供了 TableAggregateFunction#getResultType() 和 TableAggregateFunction#getAccumulatorType() 方法来指定返回值类型和 accumulator 的类型这两个方法都需要返回 TypeInformation。 除了上面的方法还有几个其他的方法可以选择性的实现。有些方法可以让查询更加高效而有些方法对于某些特定场景是必须要实现的。比如在会话窗口当两个会话窗口合并时会合并两个 accumulator中使用聚合函数时必须要实现merge() 方法。 下面几个 TableAggregateFunction 的方法在某些特定场景下是必须要实现的 retract() 在 bounded OVER 窗口中的聚合函数必须要实现。merge() 在许多批式聚合和以及流式会话和滑动窗口聚合中是必须要实现的。resetAccumulator() 在许多批式聚合中是必须要实现的。emitValue() 在批式聚合以及窗口聚合中是必须要实现的。 下面的 TableAggregateFunction 的方法可以提升流式任务的效率 emitUpdateWithRetract() 在 retract 模式下该方法负责发送被更新的值。 emitValue 方法会发送所有 accumulator 给出的结果。拿 TopN 来说emitValue 每次都会发送所有的最大的 n 个值。这在流式任务中可能会有一些性能问题。为了提升性能用户可以实现 emitUpdateWithRetract 方法。这个方法在 retract 模式下会增量的输出结果比如有数据更新了我们必须要撤回老的数据然后再发送新的数据。如果定义了 emitUpdateWithRetract 方法那它会优先于 emitValue 方法被使用因为一般认为 emitUpdateWithRetract 会更加高效因为它的输出是增量的。 TableAggregateFunction 的所有方法都必须是 public 的、非 static 的而且名字必须跟上面提到的一样。createAccumulator、getResultType 和 getAccumulatorType 这三个方法是在抽象父类 TableAggregateFunction 中定义的而其他的方法都是约定的方法。要实现一个表值聚合函数你必须扩展 org.apache.flink.table.functions.TableAggregateFunction并且实现一个或者多个accumulate 方法。accumulate 方法可以有多个重载的方法也可以支持变长参数。 TableAggregateFunction 的所有方法的详细文档说明如下其中部分与AggregateFunction 类似的方法不再赘述。 /*** Base class for user-defined aggregates and table aggregates.** param T the type of the aggregation result.* param ACC the type of the aggregation accumulator. The accumulator is used to keep the* aggregated values which are needed to compute an aggregation result.*/ public abstract class UserDefinedAggregateFunctionT, ACC extends UserDefinedFunction {/*** Creates and init the Accumulator for this (table)aggregate function.** return the accumulator with the initial value*/public ACC createAccumulator(); // MANDATORY/*** Returns the TypeInformation of the (table)aggregate functions result.** return The TypeInformation of the (table)aggregate functions result or null if the result* type should be automatically inferred.*/public TypeInformationT getResultType null; // PRE-DEFINED/*** Returns the TypeInformation of the (table)aggregate functions accumulator.** return The TypeInformation of the (table)aggregate functions accumulator or null if the* accumulator type should be automatically inferred.*/public TypeInformationACC getAccumulatorType null; // PRE-DEFINED }/*** Base class for table aggregation functions.** param T the type of the aggregation result* param ACC the type of the aggregation accumulator. The accumulator is used to keep the* aggregated values which are needed to compute a table aggregation result.* TableAggregateFunction represents its state using accumulator, thereby the state of* the TableAggregateFunction must be put into the accumulator.*/ public abstract class TableAggregateFunctionT, ACC extends UserDefinedAggregateFunctionT, ACC {/** Processes the input values and update the provided accumulator instance. The method* accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction* requires at least one accumulate() method.** param accumulator the accumulator which contains the current aggregated results* param [user defined inputs] the input value (usually obtained from a new arrived data).*/public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY/*** Retracts the input values from the accumulator instance. The current design assumes the* inputs are the values that have been previously accumulated. The method retract can be* overloaded with different custom types and arguments. This function must be implemented for* datastream bounded over aggregate.** param accumulator the accumulator which contains the current aggregated results* param [user defined inputs] the input value (usually obtained from a new arrived data).*/public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL/*** Merges a group of accumulator instances into one accumulator instance. This function must be* implemented for datastream session window grouping aggregate and bounded grouping aggregate.** param accumulator the accumulator which will keep the merged aggregate results. It should* be noted that the accumulator may contain the previous aggregated* results. Therefore user should not replace or clean this instance in the* custom merge method.* param its an {link java.lang.Iterable} pointed to a group of accumulators that will be* merged.*/public void merge(ACC accumulator, java.lang.IterableACC its); // OPTIONAL/*** Called every time when an aggregation result should be materialized. The returned value* could be either an early and incomplete result (periodically emitted as data arrive) or* the final result of the aggregation.* 每次应该具体化聚合结果时调用。返回的值可能是早期且不完整的结果随着数据的到达而定期发出也可能是聚合的最终结果。** param accumulator the accumulator which contains the current* aggregated results* param out the collector used to output data*/public void emitValue(ACC accumulator, CollectorT out); // OPTIONAL/*** Called every time when an aggregation result should be materialized. The returned value* could be either an early and incomplete result (periodically emitted as data arrive) or* the final result of the aggregation.** Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated.* This method outputs data incrementally in retract mode, i.e., once there is an update, we* have to retract old records before sending new updated ones. The emitUpdateWithRetract* method will be used in preference to the emitValue method if both methods are defined in the* table aggregate function, because the method is treated to be more efficient than emitValue* as it can outputvalues incrementally.* 每次应该具体化聚合结果时调用。返回的值可能是早期且不完整的结果随着数据的到达而定期发出也可能是聚合的最终结果。* 与emitValue不同emitUpdateWithRetract用于发出已更新的值。* 该方法以收回模式递增地输出数据在发送新的更新记录之前我们必须收回旧记录。* 如果表聚合函数中定义了两个方法则emitUpdateWithRetract方法将优先于emitValue方法* 因为该方法被认为比emitValue更有效因为它可以增量输出值。* * param accumulator the accumulator which contains the current* aggregated results* param out the retractable collector used to output data. Use collect method* to output(add) records and use retract method to retract(delete)* records.*/public void emitUpdateWithRetract(ACC accumulator, RetractableCollectorT out); // OPTIONAL/*** Collects a record and forwards it. The collector can output retract messages with the retract* method. Note: only use it in {code emitRetractValueIncrementally}.*/public interface RetractableCollectorT extends CollectorT {/*** Retract a record.** param record The record to retract.*/void retract(T record);} } 1、示例1- 计算topN 下面的例子展示了如何 定义一个 TableAggregateFunction 来计算给定列的最大的 3 个值在 TableEnvironment 中注册函数在 Table API 查询中使用函数当前只在 Table API 中支持 TableAggregateFunction 为了计算最大的 3 个值accumulator 需要保存当前看到的最大的 3 个值。 在例子中定义了类 TopAccum 来作为 accumulator。Flink 的 checkpoint 机制会自动保存 accumulator并且在失败时进行恢复来保证精确一次的语义。 TopTableAggregateFunction 表值聚合函数TableAggregateFunction的 accumulate() 方法有两个输入 第一个是 TopAccum accumulator 另一个是用户定义的输入输入的值 v。 尽管 merge() 方法在大多数聚合类型中不是必须的也在样例中提供了它的实现。 在 Scala 样例中也使用的是 Java 的基础类型并且定义了 getResultType() 和 getAccumulatorType() 方法因为 Flink 的类型推导对于 Scala 的类型推导支持的不是很好。 import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays; import java.util.List;import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.types.Row; import org.apache.flink.util.Collector;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;/*** author alanchan**/ public class TestUDTAGGDemo {/*** Accumulator for Top3**/Datapublic static class TopAccum {public Integer first;public Integer second;public Integer third;}public static class TopTableAggregateFunction extends TableAggregateFunctionTuple2Integer, Integer, TopAccum {Overridepublic TopAccum createAccumulator() {TopAccum acc new TopAccum();acc.first Integer.MIN_VALUE;acc.second Integer.MIN_VALUE;acc.third Integer.MIN_VALUE;return acc;}public void accumulate(TopAccum acc, Integer v) {if (v acc.first) {acc.third acc.second;acc.second acc.first;acc.first v;} else if (v acc.second) {acc.third acc.second;acc.second v;} else if (v acc.third) {acc.third v;}}public void merge(TopAccum acc, java.lang.IterableTopAccum iterable) {for (TopAccum otherAcc : iterable) {accumulate(acc, otherAcc.first);accumulate(acc, otherAcc.second);accumulate(acc, otherAcc.third);}}public void emitValue(TopAccum acc, CollectorTuple2Integer, Integer out) { // System.out.println(acc:acc);// emit the value and rankif (acc.first ! Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.first, 1));}if (acc.second ! Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.second, 2));}if (acc.third ! Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.third, 3));}}}DataNoArgsConstructorAllArgsConstructorpublic static class User {private long id;private String name;private int age;private int balance;private Long rowtime;}final static ListUser userList Arrays.asList(new User(1L, alan, 18, 20, 1698742358391L), new User(2L, alan, 19, 25, 1698742359396L),new User(3L, alan, 25, 30, 1698742360407L), new User(4L, alanchan, 28, 35, 1698742361409L), new User(5L, alanchan, 29, 35, 1698742362424L));public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 将聚合函数注册为函数tenv.registerFunction(top, new TopTableAggregateFunction());DataStreamUser users env.fromCollection(userList);Table usersTable tenv.fromDataStream(users, $(id), $(name), $(age), $(balance), $(rowtime));// 使用函数Table result usersTable.groupBy($(name)).flatAggregate(call(top, $(balance))).select($(name), $(f0).as(balance), $(f1).as(rank));DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print(); // 2 (true,I[alan, 20, 1]) // 16 (true,I[alanchan, 35, 1]) // 2 (false,-D[alan, 20, 1]) // 16 (false,-D[alanchan, 35, 1]) // 2 (true,I[alan, 25, 1]) // 16 (true,I[alanchan, 35, 1]) // 2 (true,I[alan, 20, 2]) // 16 (true,I[alanchan, 35, 2]) // 2 (false,-D[alan, 25, 1]) // 2 (false,-D[alan, 20, 2]) // 2 (true,I[alan, 30, 1]) // 2 (true,I[alan, 25, 2]) // 2 (true,I[alan, 20, 3])env.execute();}}2、示例2 - emitUpdateWithRetract 方法使用老版本Planner可用 下面的例子展示了如何使用 emitUpdateWithRetract 方法来只发送更新的数据。 为了只发送更新的结果accumulator 保存了上一次的最大的3个值也保存了当前最大的3个值。 如果 TopN 中的 n 非常大这种既保存上次的结果也保存当前的结果的方式不太高效。 一种解决这种问题的方式是把输入数据直接存储到 accumulator 中然后在调用 emitUpdateWithRetract 方法时再进行计算。 需要特别说明的是 下面的示例需要使用到useOldPlanner对应的planner的maven依赖见下文 !-- flink执行计划,这是1.9版本之前的--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_2.11/artifactIdversion1.13.6/version/dependency如果flink的版本比较高的话下面的示例将不能运行因为新版本的Builder没有useOldPlanner()方法了已经移除。不能构造EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); //新版本该方法已经被移除Deprecatedpublic Builder useOldPlanner() {this.plannerClass OLD_PLANNER_FACTORY;this.executorClass OLD_EXECUTOR_FACTORY;return this;}如果使用OldPlanner的话emitValue和emitUpdateWithRetract仅需定义一个就可以了并且emitUpdateWithRetract的优先级大于emitValue。但是在Blink Planner里只看有没有定义emitValue。 也即在Blink Planner中只能使用emitValue不能使用emitUpdateWithRetract。 否则会报如下异常 Exception in thread “main” org.apache.flink.table.api.ValidationException: Could not find an implementation method ‘emitValue’ in class ‘org.tablesql.udf.TestUDTAGGDemo2$TopNTableAggregateFunction’ for function ‘TopNTableAggregateFunction’ that matches the following signature: void emitValue(org.tablesql.udf.TestUDTAGGDemo2.TopNAccum, org.apache.flink.util.Collector) 具体示例如下 import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays; import java.util.List;import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.types.Row;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;/*** author alanchan**/ public class TestUDTAGGDemo2 {Datapublic static class TopNAccum {public Integer first;public Integer second;public Integer third;public Integer oldFirst;public Integer oldSecond;public Integer oldThird;}/*** 自定义聚合函数实现* * author alanchan**/public static class TopNTableAggregateFunction extends TableAggregateFunctionTuple2Integer, Integer, TopNAccum {Overridepublic TopNAccum createAccumulator() {TopNAccum topNAccum new TopNAccum();topNAccum.first Integer.MIN_VALUE;topNAccum.second Integer.MIN_VALUE;topNAccum.third Integer.MIN_VALUE;topNAccum.oldFirst Integer.MIN_VALUE;topNAccum.oldSecond Integer.MIN_VALUE;topNAccum.oldThird Integer.MIN_VALUE;return topNAccum;}public void accumulate(TopNAccum acc, Integer v) {if (v acc.first) {acc.third acc.second;acc.second acc.first;acc.first v;} else if (v acc.second) {acc.third acc.second;acc.second v;} else if (v acc.third) {acc.third v;}}public void emitUpdateWithRetract(TopNAccum acc, RetractableCollectorTuple2Integer, Integer out) {System.out.println(emitUpdateWithRetract----acc: acc);if (!acc.first.equals(acc.oldFirst)) {// if there is an update, retract old value then emit new value.if (acc.oldFirst ! Integer.MIN_VALUE) {out.retract(Tuple2.of(acc.oldFirst, 1));}out.collect(Tuple2.of(acc.first, 1));acc.oldFirst acc.first;}if (!acc.second.equals(acc.oldSecond)) {// if there is an update, retract old value then emit new value.if (acc.oldSecond ! Integer.MIN_VALUE) {out.retract(Tuple2.of(acc.oldSecond, 2));}out.collect(Tuple2.of(acc.second, 2));acc.oldSecond acc.second;}if (!acc.third.equals(acc.oldThird)) {// if there is an update, retract old value then emit new value.if (acc.oldThird ! Integer.MIN_VALUE) {out.retract(Tuple2.of(acc.oldThird, 3));}out.collect(Tuple2.of(acc.third, 3));acc.oldThird acc.third;}}}DataNoArgsConstructorAllArgsConstructorpublic static class User {private long id;private String name;private int age;private int balance;private Long rowtime;}final static ListUser userList Arrays.asList(new User(1L, alan, 18, 20, 1698742358391L), new User(2L, alan, 19, 25, 1698742359396L),new User(3L, alan, 25, 30, 1698742360407L), new User(11L, alan, 28, 31, 1698742358391L), new User(12L, alan, 29, 32, 1698742359396L),new User(13L, alan, 35, 35, 1698742360407L), new User(23L, alan, 45, 36, 1698742360407L), new User(14L, alanchan, 28, 15, 1698742361409L), new User(15L, alanchan, 29, 16, 1698742362424L), new User(24L, alanchan, 30, 20, 1698742361409L),new User(25L, alanchan, 31, 22, 1698742362424L), new User(34L, alanchan, 32, 24, 1698742361409L), new User(35L, alanchan, 33, 26, 1698742362424L),new User(44L, alanchan, 34, 28, 1698742361409L), new User(55L, alanchan, 35, 35, 1698742362424L));public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1);EnvironmentSettings settings EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamTableEnvironment tenv StreamTableEnvironment.create(env, settings);// 将聚合函数注册为函数tenv.registerFunction(topN, new TopNTableAggregateFunction());DataStreamUser users env.fromCollection(userList);Table usersTable tenv.fromDataStream(users, $(id), $(name), $(age), $(balance), $(rowtime));// 使用函数Table result usersTable.groupBy($(name)).flatAggregate(call(topN, $(balance))).select($(name), $(f0).as(balance), $(f1).as(rank));DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();env.execute();}} 运行结果如下 emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first20, second-2147483648, third-2147483648, oldFirst-2147483648, oldSecond-2147483648, oldThird-2147483648) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first15, second-2147483648, third-2147483648, oldFirst-2147483648, oldSecond-2147483648, oldThird-2147483648) 14 (true,I[alan, 20, 1]) 9 (true,I[alanchan, 15, 1]) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first25, second20, third-2147483648, oldFirst20, oldSecond-2147483648, oldThird-2147483648) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first16, second15, third-2147483648, oldFirst15, oldSecond-2147483648, oldThird-2147483648) 14 (false,I[alan, 20, 1]) 14 (true,I[alan, 25, 1]) 14 (true,I[alan, 20, 2]) 9 (false,I[alanchan, 15, 1]) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first30, second25, third20, oldFirst25, oldSecond20, oldThird-2147483648) 14 (false,I[alan, 25, 1]) 9 (true,I[alanchan, 16, 1]) 14 (true,I[alan, 30, 1]) 14 (false,I[alan, 20, 2]) 9 (true,I[alanchan, 15, 2]) 14 (true,I[alan, 25, 2]) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first20, second16, third15, oldFirst16, oldSecond15, oldThird-2147483648) 14 (true,I[alan, 20, 3]) 9 (false,I[alanchan, 16, 1]) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first31, second30, third25, oldFirst30, oldSecond25, oldThird20) 9 (true,I[alanchan, 20, 1]) 9 (false,I[alanchan, 15, 2]) 14 (false,I[alan, 30, 1]) 9 (true,I[alanchan, 16, 2]) 14 (true,I[alan, 31, 1]) 9 (true,I[alanchan, 15, 3]) 14 (false,I[alan, 25, 2]) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first22, second20, third16, oldFirst20, oldSecond16, oldThird15) 14 (true,I[alan, 30, 2]) 9 (false,I[alanchan, 20, 1]) 14 (false,I[alan, 20, 3]) 9 (true,I[alanchan, 22, 1]) 14 (true,I[alan, 25, 3]) 9 (false,I[alanchan, 16, 2]) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first32, second31, third30, oldFirst31, oldSecond30, oldThird25) 9 (true,I[alanchan, 20, 2]) 9 (false,I[alanchan, 15, 3]) 9 (true,I[alanchan, 16, 3]) 14 (false,I[alan, 31, 1]) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first24, second22, third20, oldFirst22, oldSecond20, oldThird16) 14 (true,I[alan, 32, 1]) 9 (false,I[alanchan, 22, 1]) 14 (false,I[alan, 30, 2]) 9 (true,I[alanchan, 24, 1]) 9 (false,I[alanchan, 20, 2]) 14 (true,I[alan, 31, 2]) 9 (true,I[alanchan, 22, 2]) 14 (false,I[alan, 25, 3]) 9 (false,I[alanchan, 16, 3]) 14 (true,I[alan, 30, 3]) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first35, second32, third31, oldFirst32, oldSecond31, oldThird30) 9 (true,I[alanchan, 20, 3]) 14 (false,I[alan, 32, 1]) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first26, second24, third22, oldFirst24, oldSecond22, oldThird20) 14 (true,I[alan, 35, 1]) 9 (false,I[alanchan, 24, 1]) 14 (false,I[alan, 31, 2]) 9 (true,I[alanchan, 26, 1]) 14 (true,I[alan, 32, 2]) 9 (false,I[alanchan, 22, 2]) 14 (false,I[alan, 30, 3]) 9 (true,I[alanchan, 24, 2]) 14 (true,I[alan, 31, 3]) 9 (false,I[alanchan, 20, 3]) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first36, second35, third32, oldFirst35, oldSecond32, oldThird31) 9 (true,I[alanchan, 22, 3]) 14 (false,I[alan, 35, 1]) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first28, second26, third24, oldFirst26, oldSecond24, oldThird22) 14 (true,I[alan, 36, 1]) 9 (false,I[alanchan, 26, 1]) 14 (false,I[alan, 32, 2]) 9 (true,I[alanchan, 28, 1]) 14 (true,I[alan, 35, 2]) 9 (false,I[alanchan, 24, 2]) 9 (true,I[alanchan, 26, 2]) 14 (false,I[alan, 31, 3]) 9 (false,I[alanchan, 22, 3]) 14 (true,I[alan, 32, 3]) 9 (true,I[alanchan, 24, 3]) emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first35, second28, third26, oldFirst28, oldSecond26, oldThird24) 9 (false,I[alanchan, 28, 1]) 9 (true,I[alanchan, 35, 1]) 9 (false,I[alanchan, 26, 2]) 9 (true,I[alanchan, 28, 2]) 9 (false,I[alanchan, 24, 3]) 9 (true,I[alanchan, 26, 3]) 以上介绍了标量聚合函数和表值聚合函数的自定义实现分别以具体的示例进行展示。特别需要提醒的是表值聚合函数自定义实现时针对emitValue和emitUpdateWithRetract方法的不同版本实现要求该处在其官网上没有特别的说明会导致运行异常具体原因及解决办法在示例2emitUpdateWithRetract中有说明。
http://wiki.neutronadmin.com/news/40644/

相关文章:

  • ps如何做网站横幅网站制作教程书籍
  • C语言网站开发pdf如何优化网站关键词
  • 香飘飘网站平台建设网页设计与网站制作
  • 网站开发与经营做外贸没有网站需要什么条件
  • 短网址生成网站外贸网站外贸网站建设行吗
  • 庆阳市建设局网站vs2015网站开发基础样式
  • 南昌网站制作wordpress站群seo
  • 柳市网站优化2023年房地产最新消息
  • 关于网站策划的说法错误的是汕头制作企业网站
  • 重庆双八自助建设网站网站建设的软件叫啥
  • 网站子站怎么建设企业所得税优惠政策最新2023规定公告
  • 网站域名如何备案北京网站建设哪便宜
  • 泛微e8做网站门户微信推广方案范文
  • 网站加载效果怎么做的网页设计尺寸1920
  • 小网站源码移动端网站建设需要注意哪些问题
  • 网站上怎么做动画广告视频在线观看做同城相亲网站
  • 学生怎么制作网站中企动力公司网站价格
  • 鹤壁做网站价格你的网站尚未进行备案
  • 公司网站建设及推广wordpress会员登录界面美化
  • 厦门网站建设培训班深圳 建网站
  • 江苏网站建设开发网站建设需求分析怎么写
  • 网站开发与系统开发用python做音乐网站
  • 策划书模板范文抖音seo排名系统哪个好用
  • 做网站就业要会什么263企业邮箱怎么注册
  • 移动端 pc网站开发开发公司名字起名大全
  • 考试源码网站wordpress岳阳网站界面设计
  • 南通网站建设公司哪家好集团网站建设特色
  • 富阳网站建设报价河北省建设工程信息网招标公告
  • 加强网站制度建设电子商务网站规划与建设的论文
  • 济南网站建设询问臻动传媒百度小程序wordpress