当前位置: 首页 > news >正文

公司网站如何建立南京注册公司有什么要求

公司网站如何建立,南京注册公司有什么要求,莆田建设信息网站,新媒体运营培训班Flink系列之#xff1a;自定义函数 一、自定义函数二、概述三、开发指南四、函数类五、求值方法六、类型推导七、自动类型推导八、定制类型推导九、确定性十、内置函数的确定性十一、运行时集成十二、标量函数十三、表值函数十四、聚合函数十五、表值聚合函数 一、自定义函数 … Flink系列之自定义函数 一、自定义函数二、概述三、开发指南四、函数类五、求值方法六、类型推导七、自动类型推导八、定制类型推导九、确定性十、内置函数的确定性十一、运行时集成十二、标量函数十三、表值函数十四、聚合函数十五、表值聚合函数 一、自定义函数 自定义函数UDF是一种扩展开发机制可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。 自定义函数可以用 JVM 语言例如 Java 或 Scala或 Python 实现实现者可以在 UDF 中使用任意第三方库本文聚焦于使用 JVM 语言开发自定义函数。 二、概述 当前 Flink 有如下几种函数 标量函数 将标量值转换成一个新标量值表值函数 将标量值转换成新的行数据聚合函数 将多行数据里的标量值转换成一个新标量值表值聚合函数 将多行数据里的标量值转换成新的行数据异步表值函数 是异步查询外部数据系统的特殊函数。 注意标量和表值函数已经使用了新的基于数据类型的类型系统聚合函数仍然使用基于 TypeInformation 的旧类型系统。 以下示例展示了如何创建一个基本的标量函数以及如何在 Table API 和 SQL 里调用这个函数。 函数用于 SQL 查询前要先经过注册而在用于 Table API 时函数可以先注册后调用也可以 内联 后直接使用。 Java版本 import org.apache.flink.table.api.*; import org.apache.flink.table.functions.ScalarFunction; import static org.apache.flink.table.api.Expressions.*;// 定义函数逻辑 public static class SubstringFunction extends ScalarFunction {public String eval(String s, Integer begin, Integer end) {return s.substring(begin, end);} }TableEnvironment env TableEnvironment.create(...);// 在 Table API 里不经注册直接“内联”调用函数 env.from(MyTable).select(call(SubstringFunction.class, $(myField), 5, 12));// 注册函数 env.createTemporarySystemFunction(SubstringFunction, SubstringFunction.class);// 在 Table API 里调用注册好的函数 env.from(MyTable).select(call(SubstringFunction, $(myField), 5, 12));// 在 SQL 里调用注册好的函数 env.sqlQuery(SELECT SubstringFunction(myField, 5, 12) FROM MyTable);Scala版本 import org.apache.flink.table.api._ import org.apache.flink.table.functions.ScalarFunction// define function logic class SubstringFunction extends ScalarFunction {def eval(s: String, begin: Integer, end: Integer): String {s.substring(begin, end)} }val env TableEnvironment.create(...)// 在 Table API 里不经注册直接“内联”调用函数 env.from(MyTable).select(call(classOf[SubstringFunction], $myField, 5, 12))// 注册函数 env.createTemporarySystemFunction(SubstringFunction, classOf[SubstringFunction])// 在 Table API 里调用注册好的函数 env.from(MyTable).select(call(SubstringFunction, $myField, 5, 12))// 在 SQL 里调用注册好的函数 env.sqlQuery(SELECT SubstringFunction(myField, 5, 12) FROM MyTable)对于交互式会话还可以在使用或注册函数之前对其进行参数化这样可以把函数 实例 而不是函数 类 用作临时函数。 为确保函数实例可应用于集群环境参数必须是可序列化的。 Java版本 import org.apache.flink.table.api.*; import org.apache.flink.table.functions.ScalarFunction; import static org.apache.flink.table.api.Expressions.*;// 定义可参数化的函数逻辑 public static class SubstringFunction extends ScalarFunction {private boolean endInclusive;public SubstringFunction(boolean endInclusive) {this.endInclusive endInclusive;}public String eval(String s, Integer begin, Integer end) {return s.substring(begin, endInclusive ? end 1 : end);} }TableEnvironment env TableEnvironment.create(...);// 在 Table API 里不经注册直接“内联”调用函数 env.from(MyTable).select(call(new SubstringFunction(true), $(myField), 5, 12));// 注册函数 env.createTemporarySystemFunction(SubstringFunction, new SubstringFunction(true));Scala版本 import org.apache.flink.table.api._ import org.apache.flink.table.functions.ScalarFunction// 定义可参数化的函数逻辑 class SubstringFunction(val endInclusive) extends ScalarFunction {def eval(s: String, begin: Integer, end: Integer): String {s.substring(endInclusive ? end 1 : end)} }val env TableEnvironment.create(...)// 在 Table API 里不经注册直接“内联”调用函数 env.from(MyTable).select(call(new SubstringFunction(true), $myField, 5, 12))// 注册函数 env.createTemporarySystemFunction(SubstringFunction, new SubstringFunction(true))你可以在 Table API 中使用 * 表达式作为函数的一个参数它将被扩展为该表所有的列作为函数对应位置的参数。 Java版本 import org.apache.flink.table.api.*; import org.apache.flink.table.functions.ScalarFunction; import static org.apache.flink.table.api.Expressions.*;public static class MyConcatFunction extends ScalarFunction {public String eval(DataTypeHint(inputGroup InputGroup.ANY) Object... fields) {return Arrays.stream(fields).map(Object::toString).collect(Collectors.joining(,));} }TableEnvironment env TableEnvironment.create(...);// 使用 $(*) 作为函数的参数如果 MyTable 有 3 列 (a, b, c) // 它们都将会被传给 MyConcatFunction。 env.from(MyTable).select(call(MyConcatFunction.class, $(*)));// 它等价于显式地将所有列传给 MyConcatFunction。 env.from(MyTable).select(call(MyConcatFunction.class, $(a), $(b), $(c)));Scala版本 import org.apache.flink.table.api._ import org.apache.flink.table.functions.ScalarFunctionimport scala.annotation.varargsclass MyConcatFunction extends ScalarFunction {varargsdef eval(DataTypeHint(inputGroup InputGroup.ANY) row: AnyRef*): String {row.map(f f.toString).mkString(,)} }val env TableEnvironment.create(...)// 使用 $* 作为函数的参数如果 MyTable 有 3 个列 (a, b, c) // 它们都将会被传给 MyConcatFunction。 env.from(MyTable).select(call(classOf[MyConcatFunction], $*));// 它等价于显式地将所有列传给 MyConcatFunction。 env.from(MyTable).select(call(classOf[MyConcatFunction], $a, $b, $c));三、开发指南 注意在聚合函数使用新的类型系统前本节仅适用于标量和表值函数。 所有的自定义函数都遵循一些基本的实现原则。 四、函数类 实现类必须继承自合适的基类之一例如 org.apache.flink.table.functions.ScalarFunction 。 该类必须声明为 public 而不是 abstract 并且可以被全局访问。不允许使用非静态内部类或匿名类。 为了将自定义函数存储在持久化的 catalog 中该类必须具有默认构造器且在运行时可实例化。 Anonymous functions in Table API can only be persisted if the function is not stateful (i.e. containing only transient and static fields). 五、求值方法 基类提供了一组可以被重写的方法例如 open()、 close() 或 isDeterministic() 。 但是除了上述方法之外作用于每条传入记录的主要逻辑还必须通过专门的 求值方法 来实现。 根据函数的种类后台生成的运算符会在运行时调用诸如 eval()、accumulate() 或 retract() 之类的求值方法。 这些方法必须声明为 public 并带有一组定义明确的参数。 常规的 JVM 方法调用语义是适用的。因此可以 实现重载的方法例如 eval(Integer) 和 eval(LocalDateTime)使用变长参数例如 eval(Integer…);使用对象继承例如 eval(Object) 可接受 LocalDateTime 和 Integer 作为参数也可组合使用例如 eval(Object…) 可接受所有类型的参数。 以下代码片段展示了一个重载函数的示例 import org.apache.flink.table.functions.ScalarFunction;// 有多个重载求值方法的函数 public static class SumFunction extends ScalarFunction {public Integer eval(Integer a, Integer b) {return a b;}public Integer eval(String a, String b) {return Integer.valueOf(a) Integer.valueOf(b);}public Integer eval(Double... d) {double result 0;for (double value : d)result value;return (int) result;} }Scala代码 import org.apache.flink.table.functions.ScalarFunction import scala.annotation.varargs// 有多个重载求值方法的函数 class SumFunction extends ScalarFunction {def eval(a: Integer, b: Integer): Integer {a b}def eval(a: String, b: String): Integer {Integer.valueOf(a) Integer.valueOf(b)}varargs // generate var-args like Javadef eval(d: Double*): Integer {d.sum.toInt} }六、类型推导 Table类似于 SQL 标准是一种强类型的 API。因此函数的参数和返回类型都必须映射到数据类型。 从逻辑角度看Planner 需要知道数据类型、精度和小数位数从 JVM 角度来看Planner 在调用自定义函数时需要知道如何将内部数据结构表示为 JVM 对象。 术语 类型推导 概括了意在验证输入值、派生出参数/返回值数据类型的逻辑。 Flink 自定义函数实现了自动的类型推导提取通过反射从函数的类及其求值方法中派生数据类型。如果这种隐式的反射提取方法不成功则可以通过使用 DataTypeHint 和 FunctionHint 注解相关参数、类或方法来支持提取过程下面展示了有关如何注解函数的例子。 如果需要更高级的类型推导逻辑实现者可以在每个自定义函数中显式重写 getTypeInference() 方法。但是建议使用注解方式因为它可使自定义类型推导逻辑保持在受影响位置附近而在其他位置则保持默认状态。 七、自动类型推导 自动类型推导会检查函数的类和求值方法派生出函数参数和结果的数据类型 DataTypeHint 和 FunctionHint 注解支持自动类型推导。 DataTypeHint 在许多情况下需要支持以 内联 方式自动提取出函数参数、返回值的类型。 以下例子展示了如何使用 DataTypeHint。 Java代码 import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.InputGroup; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.types.Row;// 有多个重载求值方法的函数 public static class OverloadedFunction extends ScalarFunction {// no hint requiredpublic Long eval(long a, long b) {return a b;}// 定义 decimal 的精度和小数位public DataTypeHint(DECIMAL(12, 3)) BigDecimal eval(double a, double b) {return BigDecimal.valueOf(a b);}// 定义嵌套数据类型DataTypeHint(ROWs STRING, t TIMESTAMP_LTZ(3))public Row eval(int i) {return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));}// 允许任意类型的符入并输出序列化定制后的值DataTypeHint(value RAW, bridgedTo ByteBuffer.class)public ByteBuffer eval(DataTypeHint(inputGroup InputGroup.ANY) Object o) {return MyUtils.serializeToByteBuffer(o);} }Scala代码 import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.annotation.InputGroup import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.types.Row import scala.annotation.varargs// function with overloaded evaluation methods class OverloadedFunction extends ScalarFunction {// no hint requireddef eval(a: Long, b: Long): Long {a b}// 定义 decimal 的精度和小数位DataTypeHint(DECIMAL(12, 3))def eval(double a, double b): BigDecimal {java.lang.BigDecimal.valueOf(a b)}// 定义嵌套数据类型DataTypeHint(ROWs STRING, t TIMESTAMP_LTZ(3))def eval(Int i): Row {Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i))}// 允许任意类型的符入并输出定制序列化后的值DataTypeHint(value RAW, bridgedTo classOf[java.nio.ByteBuffer])def eval(DataTypeHint(inputGroup InputGroup.ANY) Object o): java.nio.ByteBuffer {MyUtils.serializeToByteBuffer(o)} }FunctionHint 有时我们希望一种求值方法可以同时处理多种数据类型有时又要求对重载的多个求值方法仅声明一次通用的结果类型。 FunctionHint 注解可以提供从入参数据类型到结果数据类型的映射它可以在整个函数类或求值方法上注解输入、累加器和结果的数据类型。可以在类顶部声明一个或多个注解也可以为类的所有求值方法分别声明一个或多个注解。所有的 hint 参数都是可选的如果未定义参数则使用默认的基于反射的类型提取。在函数类顶部定义的 hint 参数被所有求值方法继承。 以下例子展示了如何使用 FunctionHint。 Java代码 import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row;// 为函数类的所有求值方法指定同一个输出类型 FunctionHint(output DataTypeHint(ROWs STRING, i INT)) public static class OverloadedFunction extends TableFunctionRow {public void eval(int a, int b) {collect(Row.of(Sum, a b));}// overloading of arguments is still possiblepublic void eval() {collect(Row.of(Empty args, -1));} }// 解耦类型推导与求值方法类型推导完全取决于 FunctionHint FunctionHint(input {DataTypeHint(INT), DataTypeHint(INT)},output DataTypeHint(INT) ) FunctionHint(input {DataTypeHint(BIGINT), DataTypeHint(BIGINT)},output DataTypeHint(BIGINT) ) FunctionHint(input {},output DataTypeHint(BOOLEAN) ) public static class OverloadedFunction extends TableFunctionObject {// an implementer just needs to make sure that a method exists// that can be called by the JVMpublic void eval(Object... o) {if (o.length 0) {collect(false);}collect(o[0]);} }Scala代码 import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.annotation.FunctionHint import org.apache.flink.table.functions.TableFunction import org.apache.flink.types.Row// 为函数类的所有求值方法指定同一个输出类型 FunctionHint(output new DataTypeHint(ROWs STRING, i INT)) class OverloadedFunction extends TableFunction[Row] {def eval(a: Int, b: Int): Unit {collect(Row.of(Sum, Int.box(a b)))}// overloading of arguments is still possibledef eval(): Unit {collect(Row.of(Empty args, Int.box(-1)))} }// 解耦类型推导与求值方法类型推导完全取决于 FunctionHint FunctionHint(input Array(new DataTypeHint(INT), new DataTypeHint(INT)),output new DataTypeHint(INT) ) FunctionHint(input Array(new DataTypeHint(BIGINT), new DataTypeHint(BIGINT)),output new DataTypeHint(BIGINT) ) FunctionHint(input Array(),output new DataTypeHint(BOOLEAN) ) class OverloadedFunction extends TableFunction[AnyRef] {// an implementer just needs to make sure that a method exists// that can be called by the JVMvarargsdef eval(o: AnyRef*) {if (o.length 0) {collect(Boolean.box(false))}collect(o(0))} }八、定制类型推导 在大多数情况下DataTypeHint 和 FunctionHint 足以构建自定义函数然而通过重写 getTypeInference() 定制自动类型推导逻辑实现者可以创建任意像系统内置函数那样有用的函数。 以下用 Java 实现的例子展示了定制类型推导的潜力它根据字符串参数来确定函数的结果类型。该函数带有两个字符串参数第一个参数表示要分析的字符串第二个参数表示目标类型。 Java代码 import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.types.Row;public static class LiteralFunction extends ScalarFunction {public Object eval(String s, String type) {switch (type) {case INT:return Integer.valueOf(s);case DOUBLE:return Double.valueOf(s);case STRING:default:return s;}}// 禁用自动的反射式类型推导使用如下逻辑进行类型推导Overridepublic TypeInference getTypeInference(DataTypeFactory typeFactory) {return TypeInference.newBuilder()// 指定输入参数的类型必要时参数会被隐式转换.typedArguments(DataTypes.STRING(), DataTypes.STRING())// specify a strategy for the result data type of the function.outputTypeStrategy(callContext - {if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {throw callContext.newValidationError(Literal expected for second argument.);}// 基于字符串值返回数据类型final String literal callContext.getArgumentValue(1, String.class).orElse(STRING);switch (literal) {case INT:return Optional.of(DataTypes.INT().notNull());case DOUBLE:return Optional.of(DataTypes.DOUBLE().notNull());case STRING:default:return Optional.of(DataTypes.STRING());}}).build();} }九、确定性 每个用户自定义函数类都可以通过重写 isDeterministic() 方法来声明它是否产生确定性的结果。如果该函数不是纯粹函数式的如random(), date(), 或now()该方法必须返回 false。默认情况下isDeterministic() 返回 true。 此外重写 isDeterministic() 方法也可能影响运行时行为。运行时实现可能会在两个不同的阶段被调用 在生成执行计划期间如果一个函数是通过常量表达式调用的或者常量表达式可以从给定的语句中推导出来那么一个函数就会被预计算以减少常量表达式并且可能不再在集群上执行。 除非 isDeterministic() 被重写为 false 用来在这种情况下禁用常量表达式简化。比如说以下对 ABS 的调用在生成执行计划期间被执行SELECT ABS(-1) FROM t 和 SELECT ABS(field) FROM t WHERE field -1而 SELECT ABS(field) FROM t 则不执行。在运行时即在集群执行如果一个函数被调用时带有非常量表达式或 isDeterministic() 返回 false。 十、内置函数的确定性 系统内置函数的确定性是不可改变的。存在两种不具有确定性的函数动态函数和非确定性函数根据 Apache Calcite SqlOperator 的定义 /*** Returns whether a call to this operator is guaranteed to always return* the same result given the same operands; true is assumed by default.*/public boolean isDeterministic() {return true;}/*** Returns whether it is unsafe to cache query plans referencing this* operator; false is assumed by default.*/public boolean isDynamicFunction() {return false;}isDeterministic 表示函数的确定性声明返回 false 时将在运行时对每个记录进行计算。 isDynamicFunction 声明返回 true 时意味着该函数只能在查询开始时被计算对于批处理模式它只在生成执行计划期间被执行 而对于流模式它等效于一个非确定性的函数这是因为查询在逻辑上是连续执行的流模式对动态表的连续查询抽象所以动态函数在每次查询执行时也会被重新计算当前实现下等效于每条记录计算。 以下内置函数总是非确定性的批和流模式下都在运行时对每条记录进行计算 UUIDRANDRAND_INTEGERCURRENT_DATABASEUNIX_TIMESTAMPCURRENT_ROW_TIMESTAMP 以下内置时间函数是动态的批处理模式下将在生成执行计划期间被执行查询开始对于流模式将在运行时对每条记录进行计算 CURRENT_DATECURRENT_TIMECURRENT_TIMESTAMPNOWLOCALTIMELOCALTIMESTAMP 注意isDynamicFunction 仅适用于内置函数 十一、运行时集成 有时候自定义函数需要获取一些全局信息或者在真正被调用之前做一些配置setup/清理clean-up的工作。自定义函数也提供了 open() 和 close() 方法你可以重写这两个方法做到类似于 DataStream API 中 RichFunction 的功能。 open() 方法在求值方法被调用之前先调用。close() 方法在求值方法调用完之后被调用。 open() 方法提供了一个 FunctionContext它包含了一些自定义函数被执行时的上下文信息比如 metric group、分布式文件缓存或者是全局的作业参数等。 下面的信息可以通过调用 FunctionContext 的对应的方法来获得 方法描述getMetricGroup()执行该函数的 subtask 的 Metric Group。getCachedFile(name)分布式文件缓存的本地临时文件副本。getJobParameter(name, defaultValue)跟对应的 key 关联的全局参数值。 下面的例子展示了如何在一个标量函数中通过 FunctionContext 来获取一个全局的任务参数 Java代码 import org.apache.flink.table.api.*; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction;public static class HashCodeFunction extends ScalarFunction {private int factor 0;Overridepublic void open(FunctionContext context) throws Exception {// 获取参数 hashcode_factor// 如果不存在则使用默认值 12factor Integer.parseInt(context.getJobParameter(hashcode_factor, 12));}public int eval(String s) {return s.hashCode() * factor;} }TableEnvironment env TableEnvironment.create(...);// 设置任务参数 env.getConfig().addJobParameter(hashcode_factor, 31);// 注册函数 env.createTemporarySystemFunction(hashCode, HashCodeFunction.class);// 调用函数 env.sqlQuery(SELECT myField, hashCode(myField) FROM MyTable);Scala代码 import org.apache.flink.table.api._ import org.apache.flink.table.functions.FunctionContext import org.apache.flink.table.functions.ScalarFunctionclass HashCodeFunction extends ScalarFunction {private var factor: Int 0override def open(context: FunctionContext): Unit {// 获取参数 hashcode_factor// 如果不存在则使用默认值 12factor context.getJobParameter(hashcode_factor, 12).toInt}def eval(s: String): Int {s.hashCode * factor} }val env TableEnvironment.create(...)// 设置任务参数 env.getConfig.addJobParameter(hashcode_factor, 31)// 注册函数 env.createTemporarySystemFunction(hashCode, classOf[HashCodeFunction])// 调用函数 env.sqlQuery(SELECT myField, hashCode(myField) FROM MyTable)十二、标量函数 自定义标量函数可以把 0 到多个标量值映射成 1 个标量值数据类型里列出的任何数据类型都可作为求值方法的参数和返回值类型。 想要实现自定义标量函数你需要扩展 org.apache.flink.table.functions 里面的 ScalarFunction 并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法必须是 public 的而且名字必须是 eval。 下面的例子展示了如何实现一个求哈希值的函数并在查询里调用它 Java代码 import org.apache.flink.table.annotation.InputGroup; import org.apache.flink.table.api.*; import org.apache.flink.table.functions.ScalarFunction; import static org.apache.flink.table.api.Expressions.*;public static class HashFunction extends ScalarFunction {// 接受任意类型输入返回 INT 型输出public int eval(DataTypeHint(inputGroup InputGroup.ANY) Object o) {return o.hashCode();} }TableEnvironment env TableEnvironment.create(...);// 在 Table API 里不经注册直接“内联”调用函数 env.from(MyTable).select(call(HashFunction.class, $(myField)));// 注册函数 env.createTemporarySystemFunction(HashFunction, HashFunction.class);// 在 Table API 里调用注册好的函数 env.from(MyTable).select(call(HashFunction, $(myField)));// 在 SQL 里调用注册好的函数 env.sqlQuery(SELECT HashFunction(myField) FROM MyTable);Scala代码 import org.apache.flink.table.annotation.InputGroup import org.apache.flink.table.api._ import org.apache.flink.table.functions.ScalarFunctionclass HashFunction extends ScalarFunction {// 接受任意类型输入返回 INT 型输出def eval(DataTypeHint(inputGroup InputGroup.ANY) o: AnyRef): Int {return o.hashCode();} }val env TableEnvironment.create(...)// 在 Table API 里不经注册直接“内联”调用函数 env.from(MyTable).select(call(classOf[HashFunction], $myField))// 注册函数 env.createTemporarySystemFunction(HashFunction, classOf[HashFunction])// 在 Table API 里调用注册好的函数 env.from(MyTable).select(call(HashFunction, $myField))// 在 SQL 里调用注册好的函数 env.sqlQuery(SELECT HashFunction(myField) FROM MyTable)十三、表值函数 跟自定义标量函数一样自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是它可以返回任意多行。返回的每一行可以包含 1 到多列如果输出行只包含 1 列会省略结构化信息并生成标量值这个标量值在运行阶段会隐式地包装进行里。 要定义一个表值函数你需要扩展 org.apache.flink.table.functions 下的 TableFunction可以通过实现多个名为 eval 的方法对求值方法进行重载。像其他函数一样输入和输出类型也可以通过反射自动提取出来。表值函数返回的表的类型取决于 TableFunction 类的泛型参数 T不同于标量函数表值函数的求值方法本身不包含返回类型而是通过 collect(T) 方法来发送要输出的行。 在 Table API 中表值函数是通过 .joinLateral(…) 或者 .leftOuterJoinLateral(…) 来使用的。joinLateral 算子会把外表算子左侧的表的每一行跟跟表值函数返回的所有行位于算子右侧进行 crossjoin。leftOuterJoinLateral 算子也是把外表算子左侧的表的每一行跟表值函数返回的所有行位于算子右侧进行crossjoin并且如果表值函数返回 0 行也会保留外表的这一行。 在 SQL 里面用 JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 LATERAL TABLE() 的使用。 下面的例子展示了如何实现一个分隔函数并在查询里调用它 Java代码 import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.api.*; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.*;FunctionHint(output DataTypeHint(ROWword STRING, length INT)) public static class SplitFunction extends TableFunctionRow {public void eval(String str) {for (String s : str.split( )) {// use collect(...) to emit a rowcollect(Row.of(s, s.length()));}} }TableEnvironment env TableEnvironment.create(...);// 在 Table API 里不经注册直接“内联”调用函数 env.from(MyTable).joinLateral(call(SplitFunction.class, $(myField))).select($(myField), $(word), $(length)); env.from(MyTable).leftOuterJoinLateral(call(SplitFunction.class, $(myField))).select($(myField), $(word), $(length));// 在 Table API 里重命名函数字段 env.from(MyTable).leftOuterJoinLateral(call(SplitFunction.class, $(myField)).as(newWord, newLength)).select($(myField), $(newWord), $(newLength));// 注册函数 env.createTemporarySystemFunction(SplitFunction, SplitFunction.class);// 在 Table API 里调用注册好的函数 env.from(MyTable).joinLateral(call(SplitFunction, $(myField))).select($(myField), $(word), $(length)); env.from(MyTable).leftOuterJoinLateral(call(SplitFunction, $(myField))).select($(myField), $(word), $(length));// 在 SQL 里调用注册好的函数 env.sqlQuery(SELECT myField, word, length FROM MyTable, LATERAL TABLE(SplitFunction(myField))); env.sqlQuery(SELECT myField, word, length FROM MyTable LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE);// 在 SQL 里重命名函数字段 env.sqlQuery(SELECT myField, newWord, newLength FROM MyTable LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE);Scala代码 import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.annotation.FunctionHint import org.apache.flink.table.api._ import org.apache.flink.table.functions.TableFunction import org.apache.flink.types.RowFunctionHint(output new DataTypeHint(ROWword STRING, length INT)) class SplitFunction extends TableFunction[Row] {def eval(str: String): Unit {// use collect(...) to emit a rowstr.split( ).foreach(s collect(Row.of(s, Int.box(s.length))))} }val env TableEnvironment.create(...)// 在 Table API 里不经注册直接“内联”调用函数 env.from(MyTable).joinLateral(call(classOf[SplitFunction], $myField).select($myField, $word, $length) env.from(MyTable).leftOuterJoinLateral(call(classOf[SplitFunction], $myField)).select($myField, $word, $length)// 在 Table API 里重命名函数字段 env.from(MyTable).leftOuterJoinLateral(call(classOf[SplitFunction], $myField).as(newWord, newLength)).select($myField, $newWord, $newLength)// 注册函数 env.createTemporarySystemFunction(SplitFunction, classOf[SplitFunction])// 在 Table API 里调用注册好的函数 env.from(MyTable).joinLateral(call(SplitFunction, $myField)).select($myField, $word, $length) env.from(MyTable).leftOuterJoinLateral(call(SplitFunction, $myField)).select($myField, $word, $length)// 在 SQL 里调用注册好的函数 env.sqlQuery(SELECT myField, word, length FROM MyTable, LATERAL TABLE(SplitFunction(myField))); env.sqlQuery(SELECT myField, word, length FROM MyTable LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE)// 在 SQL 里重命名函数字段 env.sqlQuery(SELECT myField, newWord, newLength FROM MyTable LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE)如果你打算使用 Scala不要把表值函数声明为 Scala objectScala object 是单例对象将导致并发问题。 十四、聚合函数 自定义聚合函数UDAGG是把一个表一行或者多行每行可以有一列或者多列聚合成一个标量值。 上面的图片展示了一个聚合的例子。假设你有一个关于饮料的表。表里面有三个字段分别是 id、name、price表里有 5 行数据。假设你需要找到所有饮料里最贵的饮料的价格即执行一个 max() 聚合。你需要遍历所有 5 行数据而结果就只有一个数值。 自定义聚合函数是通过扩展 AggregateFunction 来实现的。AggregateFunction 的工作过程如下。首先它需要一个 accumulator它是一个数据结构存储了聚合的中间结果。通过调用 AggregateFunction 的 createAccumulator() 方法创建一个空的 accumulator。接下来对于每一行数据会调用 accumulate() 方法来更新 accumulator。当所有的数据都处理完了之后通过调用 getValue 方法来计算和返回最终的结果。 下面几个方法是每个 AggregateFunction 必须要实现的 createAccumulator()accumulate()getValue() Flink 的类型推导在遇到复杂类型的时候可能会推导出错误的结果比如那些非基本类型和普通的 POJO 类型的复杂类型。所以跟 ScalarFunction 和 TableFunction 一样AggregateFunction 也提供了 AggregateFunction#getResultType() 和 AggregateFunction#getAccumulatorType() 来分别指定返回值类型和 accumulator 的类型两个函数的返回值类型也都是 TypeInformation。 除了上面的方法还有几个方法可以选择实现。这些方法有些可以让查询更加高效而有些是在某些特定场景下必须要实现的。例如如果聚合函数用在会话窗口当两个会话窗口合并的时候需要 merge 他们的 accumulator的话merge() 方法就是必须要实现的。 AggregateFunction 的以下方法在某些场景下是必须实现的 retract() 在 bounded OVER 窗口中是必须实现的。merge() 在许多批式聚合和会话以及滚动窗口聚合中是必须实现的。除此之外这个方法对于优化也很多帮助。例如两阶段聚合优化就需要所有的 AggregateFunction 都实现 merge 方法。resetAccumulator() 在许多批式聚合中是必须实现的。 AggregateFunction 的所有方法都必须是 public 的不能是 static 的而且名字必须跟上面写的一样。createAccumulator、getValue、getResultType 以及 getAccumulatorType 这几个函数是在抽象类 AggregateFunction 中定义的而其他函数都是约定的方法。如果要定义一个聚合函数你需要扩展 org.apache.flink.table.functions.AggregateFunction并且实现一个或者多个accumulate 方法。accumulate 方法可以重载每个方法的参数类型不同并且支持变长参数。 AggregateFunction 的所有方法的详细文档如下。 Java代码 /*** Base class for user-defined aggregates and table aggregates.** param T the type of the aggregation result.* param ACC the type of the aggregation accumulator. The accumulator is used to keep the* aggregated values which are needed to compute an aggregation result.*/ public abstract class UserDefinedAggregateFunctionT, ACC extends UserDefinedFunction {/*** Creates and init the Accumulator for this (table)aggregate function.** return the accumulator with the initial value*/public ACC createAccumulator(); // MANDATORY/*** Returns the TypeInformation of the (table)aggregate functions result.** return The TypeInformation of the (table)aggregate functions result or null if the result* type should be automatically inferred.*/public TypeInformationT getResultType null; // PRE-DEFINED/*** Returns the TypeInformation of the (table)aggregate functions accumulator.** return The TypeInformation of the (table)aggregate functions accumulator or null if the* accumulator type should be automatically inferred.*/public TypeInformationACC getAccumulatorType null; // PRE-DEFINED }/*** Base class for aggregation functions.** param T the type of the aggregation result* param ACC the type of the aggregation accumulator. The accumulator is used to keep the* aggregated values which are needed to compute an aggregation result.* AggregateFunction represents its state using accumulator, thereby the state of the* AggregateFunction must be put into the accumulator.*/ public abstract class AggregateFunctionT, ACC extends UserDefinedAggregateFunctionT, ACC {/** Processes the input values and update the provided accumulator instance. The method* accumulate can be overloaded with different custom types and arguments. An AggregateFunction* requires at least one accumulate() method.** param accumulator the accumulator which contains the current aggregated results* param [user defined inputs] the input value (usually obtained from a new arrived data).*/public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY/*** Retracts the input values from the accumulator instance. The current design assumes the* inputs are the values that have been previously accumulated. The method retract can be* overloaded with different custom types and arguments. This function must be implemented for* datastream bounded over aggregate.** param accumulator the accumulator which contains the current aggregated results* param [user defined inputs] the input value (usually obtained from a new arrived data).*/public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL/*** Merges a group of accumulator instances into one accumulator instance. This function must be* implemented for datastream session window grouping aggregate and bounded grouping aggregate.** param accumulator the accumulator which will keep the merged aggregate results. It should* be noted that the accumulator may contain the previous aggregated* results. Therefore user should not replace or clean this instance in the* custom merge method.* param its an {link java.lang.Iterable} pointed to a group of accumulators that will be* merged.*/public void merge(ACC accumulator, java.lang.IterableACC its); // OPTIONAL/*** Called every time when an aggregation result should be materialized.* The returned value could be either an early and incomplete result* (periodically emitted as data arrive) or the final result of the* aggregation.** param accumulator the accumulator which contains the current* aggregated results* return the aggregation result*/public T getValue(ACC accumulator); // MANDATORY/*** Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for* bounded grouping aggregate.** param accumulator the accumulator which needs to be reset*/public void resetAccumulator(ACC accumulator); // OPTIONAL/*** Returns true if this AggregateFunction can only be applied in an OVER window.** return true if the AggregateFunction requires an OVER window, false otherwise.*/public Boolean requiresOver false; // PRE-DEFINED }Scala代码 /*** Base class for user-defined aggregates and table aggregates.** tparam T the type of the aggregation result.* tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the* aggregated values which are needed to compute an aggregation result.*/ abstract class UserDefinedAggregateFunction[T, ACC] extends UserDefinedFunction {/*** Creates and init the Accumulator for this (table)aggregate function.** return the accumulator with the initial value*/def createAccumulator(): ACC // MANDATORY/*** Returns the TypeInformation of the (table)aggregate functions result.** return The TypeInformation of the (table)aggregate functions result or null if the result* type should be automatically inferred.*/def getResultType: TypeInformation[T] null // PRE-DEFINED/*** Returns the TypeInformation of the (table)aggregate functions accumulator.** return The TypeInformation of the (table)aggregate functions accumulator or null if the* accumulator type should be automatically inferred.*/def getAccumulatorType: TypeInformation[ACC] null // PRE-DEFINED }/*** Base class for aggregation functions.* * tparam T the type of the aggregation result* tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the* aggregated values which are needed to compute an aggregation result.* AggregateFunction represents its state using accumulator, thereby the state of the* AggregateFunction must be put into the accumulator.*/ abstract class AggregateFunction[T, ACC] extends UserDefinedAggregateFunction[T, ACC] {/*** Processes the input values and update the provided accumulator instance. The method* accumulate can be overloaded with different custom types and arguments. An AggregateFunction* requires at least one accumulate() method.** param accumulator the accumulator which contains the current aggregated results* param [user defined inputs] the input value (usually obtained from a new arrived data).*/def accumulate(accumulator: ACC, [user defined inputs]): Unit // MANDATORY/*** Retracts the input values from the accumulator instance. The current design assumes the* inputs are the values that have been previously accumulated. The method retract can be* overloaded with different custom types and arguments. This function must be implemented for* datastream bounded over aggregate.** param accumulator the accumulator which contains the current aggregated results* param [user defined inputs] the input value (usually obtained from a new arrived data).*/def retract(accumulator: ACC, [user defined inputs]): Unit // OPTIONAL/*** Merges a group of accumulator instances into one accumulator instance. This function must be* implemented for datastream session window grouping aggregate and bounded grouping aggregate.** param accumulator the accumulator which will keep the merged aggregate results. It should* be noted that the accumulator may contain the previous aggregated* results. Therefore user should not replace or clean this instance in the* custom merge method.* param its an [[java.lang.Iterable]] pointed to a group of accumulators that will be* merged.*/def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL/*** Called every time when an aggregation result should be materialized.* The returned value could be either an early and incomplete result* (periodically emitted as data arrive) or the final result of the* aggregation.** param accumulator the accumulator which contains the current* aggregated results* return the aggregation result*/def getValue(accumulator: ACC): T // MANDATORY/*** Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for* bounded grouping aggregate.** param accumulator the accumulator which needs to be reset*/def resetAccumulator(accumulator: ACC): Unit // OPTIONAL/*** Returns true if this AggregateFunction can only be applied in an OVER window.** return true if the AggregateFunction requires an OVER window, false otherwise.*/def requiresOver: Boolean false // PRE-DEFINED }下面的例子展示了如何 定义一个聚合函数来计算某一列的加权平均在 TableEnvironment 中注册函数在查询中使用函数。 为了计算加权平均值accumulator 需要存储加权总和以及数据的条数。在我们的例子里我们定义了一个类 WeightedAvgAccum 来作为 accumulator。Flink 的 checkpoint 机制会自动保存 accumulator在失败时进行恢复以此来保证精确一次的语义。 我们的 WeightedAvg聚合函数的 accumulate 方法有三个输入参数。第一个是 WeightedAvgAccum accumulator另外两个是用户自定义的输入输入的值 ivalue 和 输入的权重 iweight。尽管 retract()、merge()、resetAccumulator() 这几个方法在大多数聚合类型中都不是必须实现的我们也在样例中提供了他们的实现。请注意我们在 Scala 样例中也是用的是 Java 的基础类型并且定义了 getResultType() 和 getAccumulatorType()因为 Flink 的类型推导对于 Scala 的类型推导做的不是很好。 Java代码 /*** Accumulator for WeightedAvg.*/ public static class WeightedAvgAccum {public long sum 0;public int count 0; }/*** Weighted Average user-defined aggregate function.*/ public static class WeightedAvg extends AggregateFunctionLong, WeightedAvgAccum {Overridepublic WeightedAvgAccum createAccumulator() {return new WeightedAvgAccum();}Overridepublic Long getValue(WeightedAvgAccum acc) {if (acc.count 0) {return null;} else {return acc.sum / acc.count;}}public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum iValue * iWeight;acc.count iWeight;}public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum - iValue * iWeight;acc.count - iWeight;}public void merge(WeightedAvgAccum acc, IterableWeightedAvgAccum it) {IteratorWeightedAvgAccum iter it.iterator();while (iter.hasNext()) {WeightedAvgAccum a iter.next();acc.count a.count;acc.sum a.sum;}}public void resetAccumulator(WeightedAvgAccum acc) {acc.count 0;acc.sum 0L;} }// 注册函数 StreamTableEnvironment tEnv ... tEnv.registerFunction(wAvg, new WeightedAvg());// 使用函数 tEnv.sqlQuery(SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user);Scala代码 import java.lang.{Long JLong, Integer JInteger} import org.apache.flink.api.java.tuple.{Tuple1 JTuple1} import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.table.api.Types import org.apache.flink.table.functions.AggregateFunction/*** Accumulator for WeightedAvg.*/ class WeightedAvgAccum extends JTuple1[JLong, JInteger] {sum 0Lcount 0 }/*** Weighted Average user-defined aggregate function.*/ class WeightedAvg extends AggregateFunction[JLong, CountAccumulator] {override def createAccumulator(): WeightedAvgAccum {new WeightedAvgAccum}override def getValue(acc: WeightedAvgAccum): JLong {if (acc.count 0) {null} else {acc.sum / acc.count}}def accumulate(acc: WeightedAvgAccum, iValue: JLong, iWeight: JInteger): Unit {acc.sum iValue * iWeightacc.count iWeight}def retract(acc: WeightedAvgAccum, iValue: JLong, iWeight: JInteger): Unit {acc.sum - iValue * iWeightacc.count - iWeight}def merge(acc: WeightedAvgAccum, it: java.lang.Iterable[WeightedAvgAccum]): Unit {val iter it.iterator()while (iter.hasNext) {val a iter.next()acc.count a.countacc.sum a.sum}}def resetAccumulator(acc: WeightedAvgAccum): Unit {acc.count 0acc.sum 0L}override def getAccumulatorType: TypeInformation[WeightedAvgAccum] {new TupleTypeInfo(classOf[WeightedAvgAccum], Types.LONG, Types.INT)}override def getResultType: TypeInformation[JLong] Types.LONG }// 注册函数 val tEnv: StreamTableEnvironment ??? tEnv.registerFunction(wAvg, new WeightedAvg())// 使用函数 tEnv.sqlQuery(SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user)python代码 Java code:/*** Accumulator for WeightedAvg.*/ public static class WeightedAvgAccum {public long sum 0;public int count 0; }// The java class must have a public no-argument constructor and can be founded in current java classloader. // Java 类必须有一个 public 的无参构造函数并且可以在当前类加载器中加载到。/*** Weighted Average user-defined aggregate function.*/ public static class WeightedAvg extends AggregateFunctionLong, WeightedAvgAccum {Overridepublic WeightedAvgAccum createAccumulator() {return new WeightedAvgAccum();}Overridepublic Long getValue(WeightedAvgAccum acc) {if (acc.count 0) {return null;} else {return acc.sum / acc.count;}}public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum iValue * iWeight;acc.count iWeight;}public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum - iValue * iWeight;acc.count - iWeight;}public void merge(WeightedAvgAccum acc, IterableWeightedAvgAccum it) {IteratorWeightedAvgAccum iter it.iterator();while (iter.hasNext()) {WeightedAvgAccum a iter.next();acc.count a.count;acc.sum a.sum;}}public void resetAccumulator(WeightedAvgAccum acc) {acc.count 0;acc.sum 0L;} } # 注册函数 t_env ... # type: StreamTableEnvironment t_env.register_java_function(wAvg, my.java.function.WeightedAvg)# 使用函数 t_env.sql_query(SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user)十五、表值聚合函数 自定义表值聚合函数UDTAGG可以把一个表一行或者多行每行有一列或者多列聚合成另一张表结果中可以有多行多列 上图展示了一个表值聚合函数的例子。假设你有一个饮料的表这个表有 3 列分别是 id、name 和 price一共有 5 行。假设你需要找到价格最高的两个饮料类似于 top2() 表值聚合函数。你需要遍历所有 5 行数据结果是有 2 行数据的一个表。 用户自定义表值聚合函数是通过扩展 TableAggregateFunction 类来实现的。一个 TableAggregateFunction 的工作过程如下。首先它需要一个 accumulator这个 accumulator 负责存储聚合的中间结果。 通过调用 TableAggregateFunction 的 createAccumulator 方法来构造一个空的 accumulator。接下来对于每一行数据会调用 accumulate 方法来更新 accumulator。当所有数据都处理完之后调用 emitValue 方法来计算和返回最终的结果。 下面几个 TableAggregateFunction 的方法是必须要实现的 createAccumulator()accumulate() Flink 的类型推导在遇到复杂类型的时候可能会推导出错误的结果比如那些非基本类型和普通的 POJO 类型的复杂类型。所以类似于 ScalarFunction 和 TableFunctionTableAggregateFunction 也提供了 TableAggregateFunction#getResultType() 和 TableAggregateFunction#getAccumulatorType() 方法来指定返回值类型和 accumulator 的类型这两个方法都需要返回 TypeInformation。 除了上面的方法还有几个其他的方法可以选择性的实现。有些方法可以让查询更加高效而有些方法对于某些特定场景是必须要实现的。比如在会话窗口当两个会话窗口合并时会合并两个 accumulator中使用聚合函数时必须要实现merge() 方法。 下面几个 TableAggregateFunction 的方法在某些特定场景下是必须要实现的 retract() 在 bounded OVER 窗口中的聚合函数必须要实现。merge() 在许多批式聚合和以及流式会话和滑动窗口聚合中是必须要实现的。resetAccumulator() 在许多批式聚合中是必须要实现的。emitValue() 在批式聚合以及窗口聚合中是必须要实现的。 下面的 TableAggregateFunction 的方法可以提升流式任务的效率 emitUpdateWithRetract() 在 retract 模式下该方法负责发送被更新的值。 emitValue 方法会发送所有 accumulator 给出的结果。拿 TopN 来说emitValue 每次都会发送所有的最大的 n 个值。这在流式任务中可能会有一些性能问题。为了提升性能用户可以实现 emitUpdateWithRetract 方法。这个方法在 retract 模式下会增量的输出结果比如有数据更新了我们必须要撤回老的数据然后再发送新的数据。如果定义了 emitUpdateWithRetract 方法那它会优先于 emitValue 方法被使用因为一般认为 emitUpdateWithRetract 会更加高效因为它的输出是增量的。 TableAggregateFunction 的所有方法都必须是 public 的、非 static 的而且名字必须跟上面提到的一样。createAccumulator、getResultType 和 getAccumulatorType 这三个方法是在抽象父类 TableAggregateFunction 中定义的而其他的方法都是约定的方法。要实现一个表值聚合函数你必须扩展 org.apache.flink.table.functions.TableAggregateFunction并且实现一个或者多个accumulate 方法。accumulate 方法可以有多个重载的方法也可以支持变长参数。 TableAggregateFunction 的所有方法的详细文档如下 Java代码 /*** Base class for user-defined aggregates and table aggregates.** param T the type of the aggregation result.* param ACC the type of the aggregation accumulator. The accumulator is used to keep the* aggregated values which are needed to compute an aggregation result.*/ public abstract class UserDefinedAggregateFunctionT, ACC extends UserDefinedFunction {/*** Creates and init the Accumulator for this (table)aggregate function.** return the accumulator with the initial value*/public ACC createAccumulator(); // MANDATORY/*** Returns the TypeInformation of the (table)aggregate functions result.** return The TypeInformation of the (table)aggregate functions result or null if the result* type should be automatically inferred.*/public TypeInformationT getResultType null; // PRE-DEFINED/*** Returns the TypeInformation of the (table)aggregate functions accumulator.** return The TypeInformation of the (table)aggregate functions accumulator or null if the* accumulator type should be automatically inferred.*/public TypeInformationACC getAccumulatorType null; // PRE-DEFINED }/*** Base class for table aggregation functions.** param T the type of the aggregation result* param ACC the type of the aggregation accumulator. The accumulator is used to keep the* aggregated values which are needed to compute a table aggregation result.* TableAggregateFunction represents its state using accumulator, thereby the state of* the TableAggregateFunction must be put into the accumulator.*/ public abstract class TableAggregateFunctionT, ACC extends UserDefinedAggregateFunctionT, ACC {/** Processes the input values and update the provided accumulator instance. The method* accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction* requires at least one accumulate() method.** param accumulator the accumulator which contains the current aggregated results* param [user defined inputs] the input value (usually obtained from a new arrived data).*/public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY/*** Retracts the input values from the accumulator instance. The current design assumes the* inputs are the values that have been previously accumulated. The method retract can be* overloaded with different custom types and arguments. This function must be implemented for* datastream bounded over aggregate.** param accumulator the accumulator which contains the current aggregated results* param [user defined inputs] the input value (usually obtained from a new arrived data).*/public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL/*** Merges a group of accumulator instances into one accumulator instance. This function must be* implemented for datastream session window grouping aggregate and bounded grouping aggregate.** param accumulator the accumulator which will keep the merged aggregate results. It should* be noted that the accumulator may contain the previous aggregated* results. Therefore user should not replace or clean this instance in the* custom merge method.* param its an {link java.lang.Iterable} pointed to a group of accumulators that will be* merged.*/public void merge(ACC accumulator, java.lang.IterableACC its); // OPTIONAL/*** Called every time when an aggregation result should be materialized. The returned value* could be either an early and incomplete result (periodically emitted as data arrive) or* the final result of the aggregation.** param accumulator the accumulator which contains the current* aggregated results* param out the collector used to output data*/public void emitValue(ACC accumulator, CollectorT out); // OPTIONAL/*** Called every time when an aggregation result should be materialized. The returned value* could be either an early and incomplete result (periodically emitted as data arrive) or* the final result of the aggregation.** Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated.* This method outputs data incrementally in retract mode, i.e., once there is an update, we* have to retract old records before sending new updated ones. The emitUpdateWithRetract* method will be used in preference to the emitValue method if both methods are defined in the* table aggregate function, because the method is treated to be more efficient than emitValue* as it can outputvalues incrementally.** param accumulator the accumulator which contains the current* aggregated results* param out the retractable collector used to output data. Use collect method* to output(add) records and use retract method to retract(delete)* records.*/public void emitUpdateWithRetract(ACC accumulator, RetractableCollectorT out); // OPTIONAL/*** Collects a record and forwards it. The collector can output retract messages with the retract* method. Note: only use it in {code emitRetractValueIncrementally}.*/public interface RetractableCollectorT extends CollectorT {/*** Retract a record.** param record The record to retract.*/void retract(T record);} }Scala代码 /*** Base class for user-defined aggregates and table aggregates.** tparam T the type of the aggregation result.* tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the* aggregated values which are needed to compute an aggregation result.*/ abstract class UserDefinedAggregateFunction[T, ACC] extends UserDefinedFunction {/*** Creates and init the Accumulator for this (table)aggregate function.** return the accumulator with the initial value*/def createAccumulator(): ACC // MANDATORY/*** Returns the TypeInformation of the (table)aggregate functions result.** return The TypeInformation of the (table)aggregate functions result or null if the result* type should be automatically inferred.*/def getResultType: TypeInformation[T] null // PRE-DEFINED/*** Returns the TypeInformation of the (table)aggregate functions accumulator.** return The TypeInformation of the (table)aggregate functions accumulator or null if the* accumulator type should be automatically inferred.*/def getAccumulatorType: TypeInformation[ACC] null // PRE-DEFINED }/*** Base class for table aggregation functions.** tparam T the type of the aggregation result* tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the* aggregated values which are needed to compute an aggregation result.* TableAggregateFunction represents its state using accumulator, thereby the state of* the TableAggregateFunction must be put into the accumulator.*/ abstract class TableAggregateFunction[T, ACC] extends UserDefinedAggregateFunction[T, ACC] {/*** Processes the input values and update the provided accumulator instance. The method* accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction* requires at least one accumulate() method.** param accumulator the accumulator which contains the current aggregated results* param [user defined inputs] the input value (usually obtained from a new arrived data).*/def accumulate(accumulator: ACC, [user defined inputs]): Unit // MANDATORY/*** Retracts the input values from the accumulator instance. The current design assumes the* inputs are the values that have been previously accumulated. The method retract can be* overloaded with different custom types and arguments. This function must be implemented for* datastream bounded over aggregate.** param accumulator the accumulator which contains the current aggregated results* param [user defined inputs] the input value (usually obtained from a new arrived data).*/def retract(accumulator: ACC, [user defined inputs]): Unit // OPTIONAL/*** Merges a group of accumulator instances into one accumulator instance. This function must be* implemented for datastream session window grouping aggregate and bounded grouping aggregate.** param accumulator the accumulator which will keep the merged aggregate results. It should* be noted that the accumulator may contain the previous aggregated* results. Therefore user should not replace or clean this instance in the* custom merge method.* param its an [[java.lang.Iterable]] pointed to a group of accumulators that will be* merged.*/def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL/*** Called every time when an aggregation result should be materialized. The returned value* could be either an early and incomplete result (periodically emitted as data arrive) or* the final result of the aggregation.** param accumulator the accumulator which contains the current* aggregated results* param out the collector used to output data*/def emitValue(accumulator: ACC, out: Collector[T]): Unit // OPTIONAL/*** Called every time when an aggregation result should be materialized. The returned value* could be either an early and incomplete result (periodically emitted as data arrive) or* the final result of the aggregation.** Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated.* This method outputs data incrementally in retract mode, i.e., once there is an update, we* have to retract old records before sending new updated ones. The emitUpdateWithRetract* method will be used in preference to the emitValue method if both methods are defined in the* table aggregate function, because the method is treated to be more efficient than emitValue* as it can outputvalues incrementally.** param accumulator the accumulator which contains the current* aggregated results* param out the retractable collector used to output data. Use collect method* to output(add) records and use retract method to retract(delete)* records.*/def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]): Unit // OPTIONAL/*** Collects a record and forwards it. The collector can output retract messages with the retract* method. Note: only use it in emitRetractValueIncrementally.*/trait RetractableCollector[T] extends Collector[T] {/*** Retract a record.** param record The record to retract.*/def retract(record: T): Unit} }下面的例子展示了如何 定义一个 TableAggregateFunction 来计算给定列的最大的 2 个值在 TableEnvironment 中注册函数在 Table API 查询中使用函数当前只在 Table API 中支持 TableAggregateFunction。 为了计算最大的 2 个值accumulator 需要保存当前看到的最大的 2 个值。在我们的例子中我们定义了类 Top2Accum 来作为 accumulator。Flink 的 checkpoint 机制会自动保存 accumulator并且在失败时进行恢复来保证精确一次的语义。 我们的 Top2 表值聚合函数TableAggregateFunction的 accumulate() 方法有两个输入第一个是 Top2Accum accumulator另一个是用户定义的输入输入的值 v。尽管 merge() 方法在大多数聚合类型中不是必须的我们也在样例中提供了它的实现。请注意我们在 Scala 样例中也使用的是 Java 的基础类型并且定义了 getResultType() 和 getAccumulatorType() 方法因为 Flink 的类型推导对于 Scala 的类型推导支持的不是很好。 Java代码 /*** Accumulator for Top2.*/ public class Top2Accum {public Integer first;public Integer second; }/*** The top2 user-defined table aggregate function.*/ public static class Top2 extends TableAggregateFunctionTuple2Integer, Integer, Top2Accum {Overridepublic Top2Accum createAccumulator() {Top2Accum acc new Top2Accum();acc.first Integer.MIN_VALUE;acc.second Integer.MIN_VALUE;return acc;}public void accumulate(Top2Accum acc, Integer v) {if (v acc.first) {acc.second acc.first;acc.first v;} else if (v acc.second) {acc.second v;}}public void merge(Top2Accum acc, java.lang.IterableTop2Accum iterable) {for (Top2Accum otherAcc : iterable) {accumulate(acc, otherAcc.first);accumulate(acc, otherAcc.second);}}public void emitValue(Top2Accum acc, CollectorTuple2Integer, Integer out) {// emit the value and rankif (acc.first ! Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.first, 1));}if (acc.second ! Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.second, 2));}} }// 注册函数 StreamTableEnvironment tEnv ... tEnv.registerFunction(top2, new Top2());// 初始化表 Table tab ...;// 使用函数 tab.groupBy(key).flatAggregate(top2(a) as (v, rank)).select(key, v, rank);Scala代码 import java.lang.{Integer JInteger} import org.apache.flink.table.api.Types import org.apache.flink.table.functions.TableAggregateFunction/*** Accumulator for top2.*/ class Top2Accum {var first: JInteger _var second: JInteger _ }/*** The top2 user-defined table aggregate function.*/ class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] {override def createAccumulator(): Top2Accum {val acc new Top2Accumacc.first Int.MinValueacc.second Int.MinValueacc}def accumulate(acc: Top2Accum, v: Int) {if (v acc.first) {acc.second acc.firstacc.first v} else if (v acc.second) {acc.second v}}def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit {val iter its.iterator()while (iter.hasNext) {val top2 iter.next()accumulate(acc, top2.first)accumulate(acc, top2.second)}}def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): Unit {// emit the value and rankif (acc.first ! Int.MinValue) {out.collect(JTuple2.of(acc.first, 1))}if (acc.second ! Int.MinValue) {out.collect(JTuple2.of(acc.second, 2))}} }// 初始化表 val tab ...// 使用函数 tab.groupBy(key).flatAggregate(top2(a) as (v, rank)).select(key, v, rank)下面的例子展示了如何使用 emitUpdateWithRetract 方法来只发送更新的数据。为了只发送更新的结果accumulator 保存了上一次的最大的2个值也保存了当前最大的2个值。注意如果 TopN 中的 n 非常大这种既保存上次的结果也保存当前的结果的方式不太高效。一种解决这种问题的方式是把输入数据直接存储到 accumulator 中然后在调用 emitUpdateWithRetract 方法时再进行计算。 Java代码 /*** Accumulator for Top2.*/ public class Top2Accum {public Integer first;public Integer second;public Integer oldFirst;public Integer oldSecond; }/*** The top2 user-defined table aggregate function.*/ public static class Top2 extends TableAggregateFunctionTuple2Integer, Integer, Top2Accum {Overridepublic Top2Accum createAccumulator() {Top2Accum acc new Top2Accum();acc.first Integer.MIN_VALUE;acc.second Integer.MIN_VALUE;acc.oldFirst Integer.MIN_VALUE;acc.oldSecond Integer.MIN_VALUE;return acc;}public void accumulate(Top2Accum acc, Integer v) {if (v acc.first) {acc.second acc.first;acc.first v;} else if (v acc.second) {acc.second v;}}public void emitUpdateWithRetract(Top2Accum acc, RetractableCollectorTuple2Integer, Integer out) {if (!acc.first.equals(acc.oldFirst)) {// if there is an update, retract old value then emit new value.if (acc.oldFirst ! Integer.MIN_VALUE) {out.retract(Tuple2.of(acc.oldFirst, 1));}out.collect(Tuple2.of(acc.first, 1));acc.oldFirst acc.first;}if (!acc.second.equals(acc.oldSecond)) {// if there is an update, retract old value then emit new value.if (acc.oldSecond ! Integer.MIN_VALUE) {out.retract(Tuple2.of(acc.oldSecond, 2));}out.collect(Tuple2.of(acc.second, 2));acc.oldSecond acc.second;}} }// 注册函数 StreamTableEnvironment tEnv ... tEnv.registerFunction(top2, new Top2());// 初始化表 Table tab ...;// 使用函数 tab.groupBy(key).flatAggregate(top2(a) as (v, rank)).select(key, v, rank);Scala代码 import java.lang.{Integer JInteger} import org.apache.flink.table.api.Types import org.apache.flink.table.functions.TableAggregateFunction/*** Accumulator for top2.*/ class Top2Accum {var first: JInteger _var second: JInteger _var oldFirst: JInteger _var oldSecond: JInteger _ }/*** The top2 user-defined table aggregate function.*/ class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] {override def createAccumulator(): Top2Accum {val acc new Top2Accumacc.first Int.MinValueacc.second Int.MinValueacc.oldFirst Int.MinValueacc.oldSecond Int.MinValueacc}def accumulate(acc: Top2Accum, v: Int) {if (v acc.first) {acc.second acc.firstacc.first v} else if (v acc.second) {acc.second v}}def emitUpdateWithRetract(acc: Top2Accum,out: RetractableCollector[JTuple2[JInteger, JInteger]]): Unit {if (acc.first ! acc.oldFirst) {// if there is an update, retract old value then emit new value.if (acc.oldFirst ! Int.MinValue) {out.retract(JTuple2.of(acc.oldFirst, 1))}out.collect(JTuple2.of(acc.first, 1))acc.oldFirst acc.first}if (acc.second ! acc.oldSecond) {// if there is an update, retract old value then emit new value.if (acc.oldSecond ! Int.MinValue) {out.retract(JTuple2.of(acc.oldSecond, 2))}out.collect(JTuple2.of(acc.second, 2))acc.oldSecond acc.second}} }// 初始化表 val tab ...// 使用函数 tab.groupBy(key).flatAggregate(top2(a) as (v, rank)).select(key, v, rank)
http://wiki.neutronadmin.com/news/220540/

相关文章:

  • 镇赉县做网站的企业团队建设案例公司
  • 中国建设银行网站的社保板块在哪里查询优惠券的网站如何做
  • 个人网站建立展示型网站 asp.net
  • 柴沟堡网站建设python基础教程雪峰
  • 苏州建站免费模板苏州建设人才网官网
  • 凌云县 城市建设 网站怎么看网站用哪个系统做的
  • 做网站上传视频关于购物网站建设的论文
  • 宜昌网站建设选择宜昌慧享互动衡阳专业的关键词优化终报价
  • 青岛建设厅网站新网站前期seo怎么做
  • 怎样做营销型网站seo关键字优化
  • 一个公司可以做两个网站不做ps的素材哪个网站
  • 网站移动端建设热门关键词
  • 无锡网站建设 君通科技苏州网站建设书生
  • 网站建设与管理的条件企业网站seo外包
  • 网站色彩代码云建设平台
  • 金华市建设银行网站php做网站架构图
  • 做电力招聘的有哪些网站seo网站推广有哪些
  • 东莞网站营销推广公司和城乡建设部网站
  • 网站开发外包公司网站建设yuanmus
  • 网站建设人工费电脑首页wordpress
  • 网站建设流行技术wordpress 写入权限设置
  • 做网站还是微信小程序西宁网站建设君博解决
  • 阜宁做网站哪家好大型银行网站建设
  • 建设旅游网站的总结书店网站建设个人总结
  • 网站 架构 设计友情链接权重高的网站
  • 租房网站模板平潭做网站
  • 亚马逊网站开发设计免费软件怎么盈利
  • 制作企业网站是怎么收费的网站建设文化如何
  • 微信小程序制作商seo关键词推广多少钱
  • 石家庄电商网站开发网站制作软件图标