做网站竞价是什么意思,深圳外贸公司获客,学校网站建设 效果,wordpress+商场源码简介#xff1a; Flink 1.13.0 版本让流处理应用的使用像普通应用一样简单和自然#xff0c;并且让用户可以更好地理解流作业的性能。
翻译 | 高赟 Review | 朱翥、马国维
Flink 1.13 发布了#xff01;Flink 1.13 包括了超过 200 名贡献者所提交的 1000 多项修复和优化…简介 Flink 1.13.0 版本让流处理应用的使用像普通应用一样简单和自然并且让用户可以更好地理解流作业的性能。
翻译 | 高赟 Review | 朱翥、马国维
Flink 1.13 发布了Flink 1.13 包括了超过 200 名贡献者所提交的 1000 多项修复和优化。
这一版本中Flink 的一个主要目标取得了重要进展即让流处理应用的使用和普通应用一样简单和自然。Flink 1.13 新引入的被动扩缩容使得流作业的扩缩容和其它应用一样简单用户仅需要修改并发度即可。
这个版本还包括一系列重要改动使用户可以更好的理解流作业的性能。当流作业的性能不及预期的时候这些改动可以使用户可以更好的分析原因。这些改动包括用于识别瓶颈节点的负载和反压可视化、分析算子热点代码的 CPU 火焰图和分析 State Backend 状态的 State 访问性能指标。
除了这些特性外Flink 社区还添加了大量的其它优化我们会在本文后续讨论其中的一些。我们希望用户可以享受新的版本和特性带来的便利在本文最后我们还会介绍升级Flink版本需要注意的一些变化。
我们鼓励用户下载试用新版 Flink 并且通过邮件列表和 JIRA 来反馈遇到的问题。
重要特性
被动扩缩容
Flink 项目的一个初始目标就是希望流处理应用可以像普通应用一样简单和自然被动扩缩容是 Flink 针对这一目标上的最新进展。
当考虑资源管理和部分的时候Flink 有两种可能的模式。用户可以将 Flink 应用部署到 k8s、yarn 等资源管理系统之上并且由 Flink 主动的来管理资源并按需分配和释放资源。这一模式对于经常改变资源需求的作业和应用非常有用比如批作业和实时 SQL 查询。在这种模式下Flink 所启动的 Worker 数量是由应用设置的并发度决定的。在 Flink 中我们将这一模式叫做主动扩缩容。
对于长时间运行的流处理应用一种更适合的模型是用户只需要将作业像其它的长期运行的服务一样启动起来而不需要考虑是部署在 k8s、yarn 还是其它的资源管理平台上并且不需要考虑需要申请的资源的数量。相反它的规模是由所分配的 worker 数量来决定的。当 worker 数量发生变化时Flink 自动的改动应用的并发度。在 Flink 中我们将这一模式叫做被动扩缩容。
Flink 的 Application 部署模式开启了使 Flink 作业更接近普通应用即启动 Flink 作业不需要执行两个独立的步骤来启动集群和提交应用的努力而被动扩缩容完成了这一目标用户不再需要使用额外的工具如脚本、K8s 算子来让 worker 的数量与应用并发度设置保持一致。
用户现在可以将自动扩缩容的工具应用到 Flink 应用之上就像普通的应用程序一样只要用户了解扩缩容的代价有状态的流应用在扩缩容的时候需要将状态重新分发。
如果想要尝试被动扩缩容用户可以增加 scheduler-mode: reactive 这一配置项然后启动一个应用集群Standalone 或者 K8s。更多细节见被动扩缩容的文档。
分析应用的性能
对所有应用程序来说能够简单的分析和理解应用的性能是非常关键的功能。这一功能对 Flink 更加重要因为 Flink 应用一般是数据密集的即需要处理大量的数据并且需要在近实时的延迟内给出结果。
当 Flink 应用处理的速度跟不上数据输入的速度时或者当一个应用占用的资源超过预期下文介绍的这些工具可以帮你分析原因。
瓶颈检测与反压监控
Flink 性能分析首先要解决的问题经常是哪个算子是瓶颈
为了回答这一问题Flink 引入了描述作业繁忙即在处理数据与反压由于下游算子不能及时处理结果而无法继续输出程度的指标。应用中可能的瓶颈是那些繁忙并且上游被反压的算子。
Flink 1.13 优化了反压检测的逻辑使用基于任务 Mailbox 计时而不在再于堆栈采样并且重新实现了作业图的 UI 展示Flink 现在在 UI 上通过颜色和数值来展示繁忙和反压的程度。 Web UI 中的 CPU 火焰图
Flink 关于性能另一个经常需要回答的问题瓶颈算子中的哪部分计算逻辑消耗巨大
针对这一问题一个有效的可视化工具是火焰图。它可以帮助回答以下问题
哪个方法调现在在占用 CPU不同方法占用 CPU 的比例如何一个方法被调用的栈是什么样子的
火焰图是通过重复采样线程的堆栈来构建的。在火焰图中每个方法调用被表示为一个矩形矩形的长度与这个方法出现在采样中的次数成正比。火焰图在 UI 上的一个例子如下图所示。 火焰图的文档包括启用这一功能的更多细节和指令。
State 访问延迟指标
另一个可能的性能瓶颈是 state backend尤其是当作业的 state 超过内存容量而必须使用 RocksDB state backend 时。
这里并不是想说 RocksDB 性能不够好我们非常喜欢 RocksDB但是它需要满足一些条件才能达到最好的性能。例如用户可能很容易遇到非故意的在云上由于使用了错误的磁盘资源类型而不能满足 RockDB 的 IO 性能需求的问题。
基于 CPU 火焰图新的 State Backend 的延迟指标可以帮助用户更好的判断性能不符合预期是否是由 State Backend 导致的。例如如果用户发现 RocksDB 的单次访问需要几毫秒的时间那么就需要查看内存和 I/O 的配置。这些指标可以通过设置 state.backend.rocksdb.latency-track-enabled 这一选项来启用。这些指标是通过采样的方式来监控性能的所以它们对 RocksDB State Backend 的性能影响是微不足道的。
通过 Savepoint 来切换 State Backend
用户现在可以在从一个 Savepoint 重启时切换一个 Flink 应用的 State Backend。这使得 Flink 应用不再被限制只能使用应用首次运行时选择的 State Backend。
基于这一功能用户现在可以首先使用一个 HashMap State Backend纯内存的 State Backend如果后续状态变得过大的话就切换到 RocksDB State Backend 中。
在实现层Flink 现在统一了所有 State Backend 的 Savepoint 格式来实现这一功能。
K8s 部署时使用用户指定的 Pod 模式
原生 kubernetes 部署Flink 主动要求 K8s 来启动 Pod中现在可以使用自定义的 Pod 模板。
使用这些模板用户可以使用一种更符合 K8s 的方式来设置 JM 和 TM 的 Pod这种方式比 Flink K8s 集成内置的配置项更加灵活。
生产可用的 Unaligned Checkpoint
Unaligned Checkpoint 目前已达到了生产可用的状态我们鼓励用户在存在反压的情况下试用这一功能。
具体来说Flink 1.13 中引入的这些功能使 Unaligned Checkpoint 更容易使用
用户现在使用 Unaligned Checkpoint 时也可以扩缩容应用。如果用户需要因为性能原因不能使用 Savepoint而必须使用 Retained checkpoint 时这一功能会非常方便。对于没有反压的应用启用 Unaligned Checkpoint 现在代价更小。Unaligned Checkpoint 现在可以通过超时来自动触发即一个应用默认会使用 Aligned Checkpoint不存储传输中的数据而只在对齐超过一定时间范围时自动切换到 Unaligned Checkpoint存储传输中的数据。
关于如何启用 Unaligned Checkpoint 可以参考相关文档。
机器学习迁移到单独的仓库
为了加速 Flink 机器学习的进展流批统一的机器学习现在 Flink 机器学习开启了新的 flink-ml 仓库。我们采用类似于 Stateful Function 项目的管理方式通过使用一个单独的仓库从而简化代码合并的流程并且可以进行单独的版本发布从而提高开发的效率。
用户可以关注 Flink 在机器学习方面的进展比如与 AlinkFlink 常用机器学习算法套件的互操作以及 Flink 与 Tensorflow 的集成。
SQL / Table API 进展
与之前的版本类似SQL 和 Table API 仍然在所有开发中占用很大的比例。
通过 Table-valued 函数来定义时间窗口
在流式 SQL 查询中一个最经常使用的是定义时间窗口。Flink 1.13 中引入了一种新的定义窗口的方式通过 Table-valued 函数。这一方式不仅有更强的表达能力允许用户定义新的窗口类型并且与 SQL 标准更加一致。
Flink 1.13 在新的语法中支持 TUMBLE 和 HOP 窗口在后续版本中也会支持 SESSION 窗口。我们通过以下两个例子来展示这一方法的表达能力
例 1一个新引入的 CUMULATE 窗口函数它可以支持按特定步长扩展的窗口直到达到最大窗口大小
SELECT window_time, window_start, window_end, SUM(price) AS total_price FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL 2 MINUTES, INTERVAL 10 MINUTES))
GROUP BY window_start, window_end, window_time;
例 2用户在 table-valued 窗口函数中可以访问窗口的起始和终止时间从而使用户可以实现新的功能。例如除了常规的基于窗口的聚合和 Join 之外用户现在也可以实现基于窗口的 Top-K 聚合
SELECT window_time, ...FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC) as rank FROM t) WHERE rank 100;
提高 DataStream API 与 Table API / SQL 的互操作能力
这一版本极大的简化了 DataStream API 与 Table API 混合的程序。
Table API 是一种非常方便的应用开发接口因为这经支持表达式的程序编写并提供了大量的内置函数。但是有时候用户也需要切换回 DataStream例如当用户存在表达能力、灵活性或者 State 访问的需求时。
Flink 新引入的 StreamTableEnvironment.toDataStream()/.fromDataStream() 可以将一个 DataStream API 声明的 Source 或者 Sink 当作 Table 的 Source 或者 Sink 来使用。主要的优化包括
DataStream 与 Table API 类型系统的自动转换。Event Time 配置的无缝集成Watermark 行为的高度一致性。Row 类型即 Table API 中数据的表示有了极大的增强包括 toString() / hashCode() 和 equals() 方法的优化按名称访问字段值的支持与稀疏表示的支持。
Table table tableEnv.fromDataStream(dataStream,Schema.newBuilder().columnByMetadata(rowtime, TIMESTAMP(3)).watermark(rowtime, SOURCE_WATERMARK()).build());DataStreamRow dataStream tableEnv.toDataStream(table).keyBy(r - r.getField(user)).window(...);
SQL Client: 初始化脚本和语句集合 Statement Sets
SQL Client 是一种直接运行和部署 SQL 流或批作业的简便方式用户不需要编写代码就可以从命令行调用 SQL或者作为 CI / CD 流程的一部分。
这个版本极大的提高了 SQL Client 的功能。现在基于所有通过 Java 编程即通过编程的方式调用 TableEnvironment 来发起查询可以支持的语法现在 SQL Client 和 SQL 脚本都可以支持。这意味着 SQL 用户不再需要添加胶水代码来部署他们的SQL作业。
配置简化和代码共享
Flink 后续将不再支持通过 Yaml 的方式来配置 SQL Client注目前还在支持但是已经被标记为废弃。作为替代SQL Client 现在支持使用一个初始化脚本在主 SQL 脚本执行前来配置环境。
这些初始化脚本通常可以在不同团队/部署之间共享。它可以用来加载常用的 catalog应用通用的配置或者定义标准的视图。
./sql-client.sh -i init1.sql init2.sql -f sqljob.sql
更多的配置项
通过增加配置项优化 SET / RESET 命令用户可以更方便的在 SQL Client 和 SQL 脚本内部来控制执行的流程。
通过语句集合来支持多查询
多查询允许用户在一个 Flink 作业中执行多个 SQL 查询或者语句。这对于长期运行的流式 SQL 查询非常有用。
语句集可以用来将一组查询合并为一组同时执行。
以下是一个可以通过 SQL Client 来执行的 SQL 脚本的例子。它初始化和配置了执行多查询的环境。这一脚本包括了所有的查询和所有的环境初始化和配置的工作从而使它可以作为一个自包含的部署组件。
-- set up a catalog
CREATE CATALOG hive_catalog WITH (type hive);
USE CATALOG hive_catalog;-- or use temporary objects
CREATE TEMPORARY TABLE clicks (user_id BIGINT,page_id BIGINT,viewtime TIMESTAMP
) WITH (connector kafka,topic clicks,properties.bootstrap.servers ...,format avro
);-- set the execution mode for jobs
SET execution.runtime-modestreaming;-- set the sync/async mode for INSERT INTOs
SET table.dml-syncfalse;-- set the jobs parallelism
SET parallism.default10;-- set the job name
SET pipeline.name my_flink_job;-- restore state from the specific savepoint path
SET execution.savepoint.path/tmp/flink-savepoints/savepoint-bb0dab;BEGIN STATEMENT SET;INSERT INTO pageview_pv_sink
SELECT page_id, count(1) FROM clicks GROUP BY page_id;INSERT INTO pageview_uv_sink
SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;END;
Hive 查询语法兼容性
用户现在在 Flink 上也可以使用 Hive SQL 语法。除了 Hive DDL 方言之外Flink现在也支持常用的 Hive DML 和 DQL 方言。
为了使用 Hive SQL 方言需要设置 table.sql-dialect 为 hive 并且加载 HiveModule。后者非常重要因为必须要加载 Hive 的内置函数后才能正确实现对 Hive 语法和语义的兼容性。例子如下
CREATE CATALOG myhive WITH (type hive); -- setup HiveCatalog
USE CATALOG myhive;
LOAD MODULE hive; -- setup HiveModule
USE MODULES hive,core;
SET table.sql-dialect hive; -- enable Hive dialect
SELECT key, value FROM src CLUSTER BY key; -- run some Hive queries
需要注意的是 Hive 方言中不再支持 Flink 语法的 DML 和 DQL 语句。如果要使用 Flink 语法需要切换回 default 的方言配置。
优化的 SQL 时间函数
在数据处理中时间处理是一个重要的任务。但是与此同时处理不同的时区、日期和时间是一个日益复杂的任务。
在 Flink 1.13 中我们投入了大量的精力来简化时间函数的使用。我们调整了时间相关函数的返回类型使其更加精确例如 PROCTIME()CURRENT_TIMESTAMP() 和 NOW()。
其次用户现在还可以基于一个 TIMESTAMP_LTZ 类型的列来定义 Event Time 属性从而可以优雅的在窗口处理中支持夏令时。
用户可以参考 Release Note 来查看该部分的完整变更。
PyFlink 核心优化
这个版本对 PyFlink 的改进主要是使基于 Python 的 DataStream API 与 Table API 与 Java/scala 版本的对应功能更加一致。
Python DataStream API 中的有状态算子
在 Flink 1.13 中Python 程序员可以享受到 Flink 状态处理 API 的所有能力。在 Flink 1.12 版本重构过的 Python DataStream API 现在已经拥有完整的状态访问能力从而使用户可以将数据的信息记录到 state 中并且在后续访问。
带状态的处理能力是许多依赖跨记录状态共享例如 Window Operator的复杂数据处理场景的基础。
以下例子展示了一个自定义的计算窗口的实现
class CountWindowAverage(FlatMapFunction):def __init__(self, window_size):self.window_size window_sizedef open(self, runtime_context: RuntimeContext):descriptor ValueStateDescriptor(average, Types.TUPLE([Types.LONG(), Types.LONG()]))self.sum runtime_context.get_state(descriptor)def flat_map(self, value):current_sum self.sum.value()if current_sum is None:current_sum (0, 0)# update the countcurrent_sum (current_sum[0] 1, current_sum[1] value[1])# if the count reaches window_size, emit the average and clear the stateif current_sum[0] self.window_size:self.sum.clear()yield value[0], current_sum[1] // current_sum[0]else:self.sum.update(current_sum)ds ... # type: DataStream
ds.key_by(lambda row: row[0]) \.flat_map(CountWindowAverage(5))
PyFlink DataStream API 中的用户自定义窗口
Flink 1.13 中 PyFlink DataStream 接口增加了对用户自定义窗口的支持现在用户可以使用标准窗口之外的窗口定义。
由于窗口是处理无限数据流的核心机制 通过将流切分为多个有限的『桶』这一功能极大的提高的 API 的表达能力。
PyFlink Table API 中基于行的操作
Python Table API 现在支持基于行的操作例如用户对行数据的自定义函数。这一功能使得用户可以使用非内置的数据处理函数。
一个使用 map() 操作的 Python Table API 示例如下
udf(result_typeDataTypes.ROW([DataTypes.FIELD(c1, DataTypes.BIGINT()),DataTypes.FIELD(c2, DataTypes.STRING())]))
def increment_column(r: Row) - Row:return Row(r[0] 1, r[1])table ... # type: Table
mapped_result table.map(increment_column)
除了 map()这一 API 还支持 flat_map()aggregate()flat_aggregate() 和其它基于行的操作。这使 Python Table API 的功能与 Java Table API 的功能更加接近。
PyFlink DataStream API 支持 Batch 执行模式
对于有限流PyFlink DataStream API 现在已经支持 Flink 1.12 DataStream API 中引入的 Batch 执行模式。
通过复用数据有限性来跳过 State backend 和 Checkpoint 的处理Batch 执行模式可以简化运维并且提高有限流处理的性能。
其它优化
基于 Hugo 的 Flink 文档
Flink 文档从 JekyII 迁移到了 Hugo。如果您发现有问题请务必通知我们我们非常期待用户对新的界面的感受。
Web UI 支持历史异常
Flink Web UI 现在可以展示导致作业失败的 n 次历史异常从而提升在一个异常导致多个后续异常的场景下的调试体验。用户可以在异常历史中找到根异常。
优化失败 Checkpoint 的异常和失败原因的汇报
Flink 现在提供了失败或被取消的 Checkpoint 的统计从而使用户可以更简单的判断 Checkpoint 失败的原因而不需要去查看日志。
Flink 之前的版本只有在 Checkpoint 成功的时候才会汇报指标例如持久化数据的大小、触发时间等。
提供『恰好一次』一致性的 JDBC Sink
从 1.13 开始通过使用事务提交数据JDBC Sink 可以对支持 XA 事务的数据库提供『恰好一次』的一致性支持。这一特性要求目标数据库必须有或链接到一个 XA 事务处理器。
这一 Sink 现在只能在 DataStream API 中使用。用户可以通过 JdbcSink.exactlyOnceSink(…) 来创建这一 Sink或者通过显式初始化一个 JdbcXaSinkFunction。
PyFlink Table API 在 Group 窗口上支持用户自定义的聚合函数
PyFlink Table API 现在对 Group 窗口同时支持基于 Python 的用户自定义聚合函数User-defined Aggregate Functions, UDAFs以及 Pandas UDAFs。这些函数对许多数据分析或机器学习训练的程序非常重要。
在 Flink 1.13 之前这些函数仅能在无限的 Group-by 聚合场景下使用。Flink 1.13 优化了这一限制。
Batch 执行模式下 Sort-merge Shuffle 优化
Flink 1.13 优化了针对批处理程序的 Sort-merge Blocking Shuffle 的性能和内存占用情况。这一 Shuffle 模式是在Flink 1.12 的 FLIP-148 中引入的。
这一优化避免了大规模作业下不断出现 OutOfMemoryError: Direct Memory 的问题并且通过 I/O 调度和 broadcast 优化提高了性能尤其是在机械硬盘上。
HBase 连接器支持异步维表查询和查询缓存
HBase Lookup Table Source 现在可以支持异步查询模式和查询缓存。这极大的提高了使用这一 Source 的 Table / SQL 维表 Join 的性能并且在一些典型情况下可以减少对 HBase 的 I/O 请求数量。
在之前的版本中HBase Lookup Source 仅支持同步通信从而导致作业吞吐以及资源利用率降低。
升级 Flink 1.13 需要注意的改动
FLINK-21709 – 老的 Table SQL API 计划器已经被标记为废弃并且将在 Flink 1.14 中被删除。Blink 计划器在若干版本之前已经被设置为默认计划器并且将成为未来版本中的唯一计划器。这意味着 BatchTableEnvironment 和 DataSet API 互操作后续也将不再支持。用户需要切换到统一的 TableEnvironment 来编写流或者批的作业。FLINK-22352 – Flink 社区决定废弃对 Apache mesos 的支持未来有可能会进一步删除这部分功能。用户最好能够切换到其它的资源管理系统上。FLINK-21935 – state.backend.async 这一配置已经被禁用了因为现在 Flink 总是会异步的来保存快照即之前的配置默认值并且现在没有实现可以支持同步的快照保存操作。FLINK-17012 – Task 的 RUNNING 状态被细分为两步INITIALIZING 和 RUNNING。Task 的 INITIALIZING 阶段包括加载 state 和在启用 unaligned checkpoint 时恢复 In-flight 数据的过程。通过显式区分这两种状态监控系统可以更好的区分任务是否已经在实际工作。FLINK-21698 – NUMERIC 和 TIMESTAMP 类型之间的直接转换存在问题现在已经被禁用例如 CAST(numeric AS TIMESTAMP(3))。用户应该使用 TO_TIMESTAMP(FROM_UNIXTIME(numeric)) 来代替。FLINK-22133 – 新的 Source 接口有一个小的不兼容的修改即 SplitEnumerator.snapshotState() 方法现在多接受一个 checkpoint id 参数来表示正在进行的 snapshot 操作所属的 checkpoint 的 id。FLINK-19463 – 由于老的 Statebackend 接口承载了过多的语义并且容易引起困惑这一接口被标记为废弃。这是一个纯 API 层的改动而并不会影响应用运行时。对于如何升级现有作业请参考作业迁移指引 。
其它资源
二进制和代码可以从 Flink 官网的下载页面获得最新的 PyFlink 发布可以从 PyPI 获得。
如果想要升级到 Flink 1.13请参考发布说明。这一版本与之前 1.x 的版本在标记为Public 的接口上是兼容的。
用户也可以查看新版本修改列表与更新后的文档来获得修改和新功能的详细列表。
原文链接
本文为阿里云原创内容未经允许不得转载。