免费的ppt网站推荐,如何建设网站公司,网站建设怎么销售,做网站如何不被忽悠众所周知 Flink 是当前广泛使用的计算引擎#xff0c;Flink 使用 checkpoint 机制进行容错处理[1]#xff0c;Flink 的 checkpoint 会将状态快照备份到分布式存储系统#xff0c;供后续恢复使用。在 Alibaba 内部我们使用的存储主要是 HDFS#xff0c;当同一个集群的 Job 到…众所周知 Flink 是当前广泛使用的计算引擎Flink 使用 checkpoint 机制进行容错处理[1]Flink 的 checkpoint 会将状态快照备份到分布式存储系统供后续恢复使用。在 Alibaba 内部我们使用的存储主要是 HDFS当同一个集群的 Job 到达一定数量后会对 HDFS 造成非常大的压力本文将介绍一种大幅度降低 HDFS 压力的方法 -- 小文件合并。
背景
不管使用 FsStateBackend、RocksDBStateBackend 还是 NiagaraStateBackendFlink 在进行 checkpoint 的时候TM 会将状态快照写到分布式文件系统中然后将文件句柄发给 JMJM 完成全局 checkpoint 快照的存储如下图所示。 对于全量 checkpoint 来说TM 将每个 checkpoint 内部的数据都写到同一个文件而对于 RocksDBStateBackend/NiagaraStateBackend 的增量 checkpoint [2]来说则会将每个 sst 文件写到一个分布式系统的文件内。当作业量很大且作业的并发很大时则会对底层 HDFS 形成非常大的压力1大量的 RPC 请求会影响 RPC 的响应时间如下图所示2大量文件对 NameNode 内存造成很大压力。 在 Flink 中曾经尝试使用 ByteStreamStateHandle 来解决小文件多的问题[3]将小于一定阈值的 state 直接发送到 JM由 JM 统一写到分布式文件中从而避免在 TM 端生成小文件。但是这个方案有一定的局限性阈值设置太小还会有很多小文件生成阈值设置太大则会导致 JM 内存消耗太多有 OOM 的风险。
1 小文件合并方案
针对上面的问题我们提出一种解决方案 -- 小文件合并。 在原来的实现中每个 sst 文件会打开一个 CheckpointOutputStream每个 CheckpointOutputStream 对应一个 FSDataOutputStream将本地文件写往一个分布式文件然后关闭 FSDataOutputStream生成一个 StateHandle。如下图所示 小文件合并则会重用打开的 FSDataOutputStream直至文件大小达到预设的阈值为止换句话说多个 sst 文件会重用同一个 DFS 上的文件每个 sst 文件占用 DFS 文件中的一部分最终多个 StateHandle 共用一个物理文件如下图所示。 在接下来的章节中我们会描述实现的细节其中需要重点考虑的地方包括
并发 checkpoint 的支持 Flink 天生支持并发 checkpoint小文件合并方案则会将多个文件写往同一个分布式存储文件中如果考虑不当数据会写串或者损坏因此我们需要有一种机制保证该方案的正确性详细描述参考 2.1 节防止误删文件 我们使用引用计数来记录文件的使用情况仅通过文件引用计数是否降为 0 进行判断删除则可能误删文件如何保证文件不会被错误删除我们将会在 2.2 节进行阐述降低空间放大 使用小文件合并之后只要文件中还有一个 statehandle 被使用整个分布式文件就不能被删除因此会占用更多的空间我们在 2.3 节描述了解决该问题的详细方案异常处理 我们将在 2.4 节阐述如何处理异常情况包括 JM 异常和 TM 异常的情况2.5 节中会详细描述在 Checkpoint 被取消或者失败后如何取消 TM 端的 Snapshot如果不取消 TM 端的 Snapshot则会导致 TM 端实际运行的 Snapshot 比正常的多
在第 3 节中阐述了小文件合并方案与现有方案的兼容性第 4 节则会描述小文件合并方案的优势和不足最后在第 5 节我们展示在生产环境下取得的效果。
2 设计实现
本节中我们会详细描述整个小文件合并的细节以及其中的设计要点。 这里我们大致回忆一下 TM 端 Snapshot 的过程
TM 端 barrier 对齐TM Snapshot 同步操作TM Snapshot 异步操作
其中上传 sst 文件到分布式存储系统在上面的第三步同一个 checkpoint 内的文件顺序上传多个 checkpoint 的文件上传可能同时进行。
2.1 并发 checkpoint 支持
Flink 天生支持并发 checkpoint因此小文件合并方案也需要能够支持并发 checkpoint如果不同 checkpoint 的 sst 文件同时写往一个分布式文件则会导致文件内容损坏后续无法从该文件进行 restore。
在 FLINK-11937[4] 的提案中我们会将每个 checkpoint 的 state 文件写到同一个 HDFS 文件不同 checkpoint 的 state 写到不同的 HDFS 文件 -- 换句话说HDFS 文件不跨 Checkpoint 共用从而避免了多个客户端同时写入同一个文件的情况。
后续我们会继续推进跨 Checkpoint 共用文件的方案当然在跨 Checkpoint 共用文件的方案中并行的 Checkpoint 也会写往不同的 HDFS 文件。
2.2 防止误删文件
复用底层文件之后我们使用引用计数追踪文件的使用情况在文件引用数降为 0 的情况下删除文件。但是在某些情况下文件引用数为 0 的时候并不代表文件不会被继续使用可能导致文件误删。下面我们会详细描述开启并发 checkpoint 后可能导致文件误删的情况以及解决方案。
我们以下图为例maxConcurrentlyCheckpoint 2 上图中共有 3 个 checkpoint其中 chk-1 已经完成chk-2 和 chk-3 都基于 chk-1 进行chk-2 在 chk-3 前完成chk-3 在注册 4.sst 的时候发现发现 4.sst 在 chk-2 中已经注册过会重用 chk-2 中 4.sst 对应的 stateHandle然后取消 chk-3 中的 4.sst 的注册并且删除 stateHandle在处理完 chk-3 中 4.sst 之后该 stateHandle 对应的分布式文件的引用计数为 0如果我们这个时候删除分布式文件则会同时删除 5.sst 对应的内容导致后续无法从 chk-3 恢复。
这里的问题是如何在 stateHandle 对应的分布式文件引用计数降为 0 的时候正确判断是否还会继续引用该文件因此在整个 checkpoint 完成处理之后再判断某个分布式文件能否删除如果真个 checkpoint 完成发现文件没有被引用则可以安全删除否则不进行删除。
2.3 降低空间放大
使用小文件合并方案后每个 sst 文件对应分布式文件中的一个 segment如下图所示 文件仅能在所有 segment 都不再使用时进行删除上图中有 4 个 segment仅 segment-4 被使用但是整个文件都不能删除其中 segment[1-3] 的空间被浪费掉了从实际生产环境中的数据可知整体的空间放大率实际占用的空间 / 真实有用的空间在 1.3 - 1.6 之间。
为了解决空间放大的问题在 TM 端起异步线程对放大率超过阈值的文件进行压缩。而且仅对已经关闭的文件进行压缩。
整个压缩的流程如下所示
计算每个文件的放大率如果放大率较小则直接跳到步骤 7如果文件 A 的放大率超过阈值则生成一个对应的新文件 A‘如果这个过程中创建文件失败则由 TM 负责清理工作记录 A 与 A’ 的映射关系在下一次 checkpoint X 往 JM 发送落在文件 A 中的 StateHandle 时则使用 A 中的信息生成一个新的 StateHandle 发送给 JMcheckpoint X 完成后我们增加 A‘ 的引用计数减少 A 的引用计数在引用计数降为 0 后将文件 A 删除如果 JM 增加了 A’ 的引用然后出现异常则会从上次成功的 checkpoint 重新构建整个引用计数器文件压缩完成
2.4 异常情况处理
在 checkpoint 的过程中主要有两种异常JM 异常和 TM 异常我们将分情况阐述。
2.4.1 JM 异常
JM 端主要记录 StateHandle 以及文件的引用计数引用计数相关数据不需要持久化到外存中因此不需要特殊的处理也不需要考虑 transaction 等相关操作如果 JM 发送 failover则可以直接从最近一次 complete checkpoint 恢复并重建引用计数即可。
2.4.2 TM 异常
TM 异常可以分为两种1该文件在之前 checkpoint 中已经汇报过给 JM2文件尚未汇报过给 JM我们会分情况阐述。
文件已经汇报过给 JM 文件汇报过给 JM因此在 JM 端有文件的引用计数文件的删除由 JM 控制当文件的引用计数变为 0 之后JM 将删除该文件。文件尚未汇报给 JM 该文件暂时尚未汇报过给 JM该文件不再被使用也不会被 JM 感知成为孤儿文件。这种情况暂时有外围工具统一进行清理。
2.5 取消 TM 端 snapshot
像前面章节所说我们需要在 checkpoint 超时/失败时取消 TM 端的 snapshot而 Flink 则没有相应的通知机制现在 FLINK-8871[5] 在追踪相应的优化我们在内部增加了相关实现当 checkpoint 失败时会发送 RPC 数据给 TMTM 端接受到相应的 RPC 消息后会取消相应的 snapshot。
3 兼容性
小文件合并功能支持从之前的版本无缝迁移过来。从之前的 checkpoint restore 的的步骤如下
每个 TM 分到自己需要 restore 的 state handleTM 从远程下载 state handle 对应的数据从本地进行恢复
小文件合并主要影响的是第 2 步从远程下载对应数据的时候对不同的 StateHandle 进行适配因此不影响整体的兼容性。
4 优势和不足
优势大幅度降低 HDFS 的压力包括 RPC 压力以及 NameNode 内存的压力不足不支持 State 多线程上传的功能State 上传暂时不是 checkpoint 的瓶颈
5 线上环境的结果
在该方案上线后对 Namenode 的压力大幅降低下面的截图来自线上生产集群从数据来看文件创建和关闭的 RPC 有明显下降RPC 的响应时间也有大幅度降低确保顺利度过双十一。 原文链接 本文为阿里云原创内容未经允许不得转载。