ICP网站忘记密码,鞍山信息港号吧,国内品牌营销成功案例,上海3d网站建设简介#xff1a;阿里云EMR自2020年推出Remote Shuffle Service(RSS)以来#xff0c;帮助了诸多客户解决Spark作业的性能、稳定性问题#xff0c;并使得存算分离架构得以实施#xff0c;与此同时RSS也在跟合作方小米的共建下不断演进。本文将介绍RSS的最新架构#xff0c;在…简介阿里云EMR自2020年推出Remote Shuffle Service(RSS)以来帮助了诸多客户解决Spark作业的性能、稳定性问题并使得存算分离架构得以实施与此同时RSS也在跟合作方小米的共建下不断演进。本文将介绍RSS的最新架构在小米的实践以及开源。 作者 | 一锤、明济、紫槿 来源 | 阿里技术公众号
阿里云EMR自2020年推出Remote Shuffle Service(RSS)以来帮助了诸多客户解决Spark作业的性能、稳定性问题并使得存算分离架构得以实施与此同时RSS也在跟合作方小米的共建下不断演进。本文将介绍RSS的最新架构在小米的实践以及开源。
一 问题回顾
Shuffle是大数据计算中最为重要的算子。首先覆盖率高超过50%的作业都包含至少一个Shuffle[2]。其次资源消耗大阿里内部平台Shuffle的CPU占比超过20%LinkedIn内部Shuffle Read导致的资源浪费高达15%[1]单Shuffle数据量超100T[2]。第三不稳定硬件资源的稳定性CPU内存磁盘≈网络而Shuffle的资源消耗是倒序。OutOfMemory和Fetch Failure可能是Spark作业最常见的两种错误前者可以通过调参解决而后者需要系统性重构Shuffle。
传统Shuffle如下图所示Mapper把Shuffle数据按PartitionId排序写盘后交给External Shuffle Service(ESS)管理Reducer从每个Mapper Output中读取属于自己的Block。 传统Shuffle存在以下问题。
本地盘依赖限制了存算分离。存算分离是近年来兴起的新型架构它解耦了计算和存储可以更灵活地做机型设计计算节点强CPU弱磁盘存储节点强磁盘强网络弱CPU。计算节点无状态可根据负载弹性伸缩。存储端随着对象存储(OSS, S3)数据湖格式(Delta, Iceberg, Hudi)本地/近地缓存等方案的成熟可当作容量无限的存储服务。用户通过计算弹性存储按量付费获得成本节约。然而Shuffle对本地盘的依赖限制了存算分离。写放大。当Mapper Output数据量超过内存时触发外排从而引入额外磁盘IO。大量随机读。Mapper Output属于某个Reducer的数据量很小如Output 128MReducer并发2000则每个Reducer只读64K从而导致大量小粒度随机读。对于HDD随机读性能极差对于SSD会快速消耗SSD寿命。高网络连接数导致线程池消耗过多CPU带来性能和稳定性问题。Shuffle数据单副本大规模集群场景坏盘/坏节点很普遍Shuffle数据丢失引发的Stage重算带来性能和稳定性问题。
二 RSS发展历程
针对Shuffle的问题工业界尝试了各种方法近两年逐渐收敛到Push Shuffle的方案。
1 Sailfish
Sailfish3最早提出Push Shuffle Partition数据聚合的方法对大作业有20%-5倍的性能提升。Sailfish魔改了分布式文件系统KFS[4]不支持多副本。
2 Dataflow
Goolge BigQuery和Cloud Dataflow5实现了Shuffle跟计算的解耦采用多层存储(内存磁盘)除此之外没有披露更多技术细节。
3 Riffle
Facebook Riffle2采用了在Mapper端Merge的方法物理节点上部署的Riffle服务负责把此节点上的Shuffle数据按照PartitionId做Merge从而一定程度把小粒度的随机读合并成较大粒度。
4 Cosco
Facebook Cosco[6]7采用了Sailfish的方法并做了重设计保留了Push Shuffle Parititon数据聚合的核心方法但使用了独立服务。服务端采用Master-Worker架构使用内存两副本用DFS做持久化。Cosco基本上定义了RSS的标准架构但受到DFS的拖累性能上并没有显著提升。
5 Zeus
Uber Zeus[8]9同样采用了去中心化的服务架构但没有类似etcd的角色维护Worker状态因此难以做状态管理。Zeus通过Client双推的方式做多副本采用本地存储。
6 RPMP
Intel RPMP10依靠RDMA和PMEM的新硬件来加速Shuffle并没有做数据聚合。
7 Magnet
LinkedIn Magnet1融合了本地ShufflePush Shuffle其设计哲学是尽力而为Mapper的Output写完本地后Push线程会把数据推给远端的ESS做聚合且不保证所有数据都会聚合。受益于本地ShuffleMagnet在容错和AE的支持上的表现更好(直接Fallback到传统Shuffle)。Magnet的局限包括依赖本地盘不支持存算分离数据合并依赖ESS对NodeManager造成额外压力Shuffle Write同时写本地和远端性能达不到最优。Magnet方案已经被Apache Spark接纳成为默认的开源方案。
8 FireStorm
FireStorm11混合了Cosco和Zeus的设计服务端采用Master-Worker架构通过Client多写实现多副本。FireStorm使用了本地盘对象存储的多层存储采用较大的PushBlock(默认3M)。FireStorm在存储端保留了PushBlock的元信息并记录在索引文件中。FireStorm的Client缓存数据的内存由Spark MemoryManager进行管理并通过细颗粒度的内存分配(默认3K)来尽量避免内存浪费。
从上述描述可知当前的方案基本收敛到Push Shuffle但在一些关键设计上的选择各家不尽相同主要体现在:
集成到Spark内部还是独立服务。RSS服务侧架构选项包括Master-Worker含轻量级状态管理的去中心化完全去中心化。Shuffle数据的存储选项包括内存本地盘DFS对象存储。多副本的实现选项包括Client多推服务端做Replication。
阿里云RSS12由2020年推出核心设计参考了Sailfish和Cosco并且在架构和实现层面做了改良下文将详细介绍。
三 阿里云RSS核心架构
针对上一节的关键设计阿里云RSS的选择如下
独立服务。考虑到将RSS集成到Spark内部无法满足存算分离架构阿里云RSS将作为独立服务提供Shuffle服务。Master-Worker架构。通过Master节点做服务状态管理非常必要基于etcd的状态状态管理能力受限。多种存储方式。目前支持本地盘/DFS等存储方式主打本地盘将来会往分层存储方向发展。服务端做Replication。Client多推会额外消耗计算节点的网络和计算资源在独立部署或者服务化的场景下对计算集群不友好。
下图展示了阿里云RSS的关键架构包含Client(RSS Client, Meta Service)Master(Resource Manager)和Worker三个角色。Shuffle的过程如下:
Mapper在首次PushData时请求Master分配Worker资源Worker记录自己所需要服务的Partition列表。Mapper把Shuffle数据缓存到内存超过阈值时触发Push。隶属同个Partition的数据被Push到同一个Worker做合并主Worker内存接收到数据后立即向从Worker发起Replication数据达成内存两副本后即向Client发送ACKFlusher后台线程负责刷盘。Mapper Stage运行结束MetaService向Worker发起CommitFiles命令把残留在内存的数据全部刷盘并返回文件列表。Reducer从对应的文件列表中读取Shuffle数据。阿里云RSS的核心架构和容错方面的介绍详见[13]本文接下来介绍阿里云RSS近一年的架构演进以及不同于其他系统的特色。
1 状态下沉
RSS采用Master-Worker架构最初的设计中Master统一负责集群状态管理和Shuffle生命周期管理。集群状态包括Worker的健康度和负载生命周期包括每个Shuffle由哪些Worker服务每个Worker所服务的Partition列表Shuffle所处的状态(Shuffle WriteCommitFileShuffle Read)是否有数据丢失等。维护Shuffle生命周期需要较大数据量和复杂数据结构给Master HA的实现造成阻力。同时大量生命周期管理的服务调用使Master易成为性能瓶颈限制RSS的扩展性。
为了缓解Master压力我们把生命周期状态管理下沉到Driver由Application管理自己的ShuffleMaster只需维护RSS集群本身的状态。这个优化大大降低Master的负载并使得Master HA得以顺利实现。 2 Adaptive Pusher
在最初的设计中阿里云RSS跟其他系统一样采用Hash-Based Pusher即Client会为每个Partition维护一个(或多个[11])内存Buffer当Buffer超过阈值时触发推送。这种设计在并发度适中的情况下没有问题而在超大并发度的情况下会导致OOM。例如Reducer的并发5W在小Buffer[13]的系统中(64K)极端内存消耗为64K5W3G在大Buffer[11]的系统中(3M)极端内存消耗为3M5W146G这是不可接受的。针对这个问题我们开发了Sort-Based Pusher缓存数据时不区分Partition当总的数据超过阈值(i.e. 64M)时对当前数据按照PartitionId排序然后把数据Batch后推送从而解决内存消耗过大的问题。
Sort-Based Pusher会额外引入一次排序性能上比Hash-Based Pusher略差。我们在ShuffleWriter初始化阶段根据Reducer的并发度自动选择合适的Pusher。
3 磁盘容错
出于性能的考虑阿里云RSS推荐本地盘存储因此处理坏/慢盘是保证服务可靠性的前提。Worker节点的DeviceMonitor线程定时对磁盘进行检查检查项包括IOHang使用量读写异常等。此外Worker在所有磁盘操作处(创建文件刷盘)都会捕捉异常并上报。IOHang、读写异常被认为是Critical Error磁盘将被隔离并终止该磁盘上的存储服务。慢盘、使用量超警戒线等异常仅将磁盘隔离不再接受新的Partition存储请求但已有的Partition服务保持正常。在磁盘被隔离后Worker的容量和负载将发生变化这些信息将通过心跳发送给Master。
4 滚动升级
RSS作为常驻服务有永不停服的要求而系统本身总在向前演进因此滚动升级是必选的功能。尽管通过Sub-Cluster部署方式可以绕过即部署多个子集群对子集群做灰度灰度的集群暂停服务但这种方式依赖调度系统感知正在灰度的集群并动态修改作业配置。我们认为RSS应该把滚动升级闭环掉核心设计如下图所示。Client向Master节点的Leader角色(Master实现了HA见上文)发起滚动升级请求并把更新包上传给LeaderLeader通过Raft协议修改状态为滚动升级并启动第一阶段的升级升级Master节点。Leader首先升级所有的Follower然后替换本地包并重启。在Leader节点改变的情况下升级过程不会中断或异常。Master节点升级结束后进入第二阶段Worker节点升级。RSS采用滑动窗口做升级窗口内的Worker尽量优雅下线即拒绝新的Partition请求并等待本地Shuffle结束。为了避免等待时间过长会设置超时时间。此外窗口内的Worker选择会尽量避免同时包含主从两副本以降低数据丢失的概率。 5 混乱测试框架
对于服务来说仅依靠UT、集成测试、e2e测试等无法保证服务可靠性因为这些测试无法覆盖线上复杂环境如坏盘、CPU过载、网络过载、机器挂掉等。RSS要求在出现这些复杂情况时保持服务稳定为了模拟线上环境我们开发了仿真(混乱)测试框架在测试环境中模拟线上可能出现的异常同时保证满足RSS运行的最小运行环境即至少3个Master节点和2个Worker节点可用并且每个Worker节点至少有一块盘。我们持续对RSS做此类压力测试。
仿真测试框架架构如下图所示首先定义测试Plan来描述事件类型、事件触发的顺序及持续时间事件类型包括节点异常磁盘异常IO异常CPU过载等。客户端将Plan提交给SchedulerScheduler根据Plan的描述给每个节点的Runner发送具体的OperationRunner负责具体执行并汇报当前节点的状态。在触发Operation之前Scheduler会推演该事件发生产生的后果若导致无法满足RSS的最小可运行环境将拒绝此事件。
我们认为仿真测试框架的思路是通用设计可以推广到更多的服务测试中。 6 多引擎支持
Shuffle是通用操作不跟引擎绑定因此我们尝试了多引擎支持。当前我们支持了HiveRSS同时也在探索跟流计算引擎(Flink)MPP引擎(Presto)结合的可能性。尽管Hive和Spark都是批计算引擎但Shuffle的行为并不一致最大的差异是Hive在Mapper端做排序Reducer只做Merge而Spark在Reducer端做排序。由于RSS暂未支持计算因此需要改造Tez支持Reducer排序。此外Spark有干净的Shuffle插件接口RSS只需在外围扩展而Tez没有类似抽象在这方面也有一定侵入性。
当前大多数引擎都没有Shuffle插件化的抽象需要一定程度的引擎修改。此外流计算和MPP都是上游即时Push给下游的模式而RSS是上游Push下游Pull的模式这两者如何结合也是需要探索的。
7 测试
我们对比了阿里云RSS、Magent及开源系统X。由于大家的系统还在向前演进因此测试结果仅代表当前。
测试环境
Header 1: ecs.g6e.4xlarge, 16 2.5GHz/3.2GHz, 64GiB, 10Gbps Worker 3: ecs.g6e.8xlarge, 32 2.5GHz/3.2GHz, 128GiB, 10Gbps
阿里云RSS vs. Magnet
5T Terasort的性能测试如下图所示如上文描述Magent的Shuffle Write有额外开销差于RSS和传统做法。Magent的Shuffle Read有提升但差于RSS。在这个Benchmark下RSS明显优于另外两个Magent的e2e时间略好于传统Shuffle。 阿里云RSS vs. 开源系统X
RSS跟开源系统X在TPCDS-3T的性能对比如下总时间RSS快了20%。 稳定性
在稳定性方面我们测试了Reducer大规模并发的场景Magnet可以跑通但时间比RSS慢了数倍System X在Shuffle Write阶段报错。
四 阿里云RSS在小米的实践
1 现状及痛点
小米的离线集群以YarnHDFS为主NodeManager和DataNode混合部署。Spark是主要的离线引擎支撑着核心计算任务。Spark作业当前最大的痛点集中在Shuffle导致的稳定性差性能差和对存算分离架构的限制。在进行资源保证和作业调优后作业失败原因主要归结为Fetch Failure如下图所示。由于大部分集群使用的是HDD传统Shuffle的高随机读和高网络连接导致性能很差低稳定性带来的Stage重算会进一步加剧性能回退。此外小米一直在尝试利用存算分离架构的计算弹性降低成本但Shuffle对本地盘的依赖造成了阻碍。 2 RSS在小米的落地
小米一直在关注Shuffle优化相关技术21年1月份跟阿里云EMR团队就RSS项目建立了共创关系3月份第一个生产集群上线开始接入作业6月份第一个HA集群上线规模达100节点9月份第一个300节点上线集群默认开启RSS后续规划会进一步扩展RSS的灰度规模。
在落地的过程小米主导了磁盘容错的开发大大提高了RSS的服务稳定性技术细节如上文所述。此外在前期RSS还未完全稳定阶段小米在多个环节对RSS的作业进行了容错。在调度端若开启RSS的Spark作业因Shuffle报错则Yarn的下次重试会回退到ESS。在ShuffleWriter初始化阶段小米主导了自适应Fallback机制根据当前RSS集群的负载和作业的特征(如Reducer并发是否过大)自动选择RSS或ESS从而提升稳定性。
3 效果
接入RSS后Spark作业的稳定性、性能都取得了显著提升。之前因Fetch Failure失败的作业几乎不再失败性能平均有20%的提升。下图展示了接入RSS前后作业稳定性的对比。
ESS RSS: 下图展示了接入RSS前后作业运行时间的对比。
ESS: RSS: 在存算分离方面小米海外某集群接入RSS后成功上线了1600 Core的弹性集群且作业运行稳定。
在阿里云EMR团队及小米Spark团队的共同努力下RSS带来的稳定性和性能提升得到了充分的验证。后续小米将会持续扩大RSS集群规模以及作业规模并且在弹性资源伸缩场景下发挥更大的作用。
五 开源
重要的事说三遍“阿里云RSS开源啦!” X 3
git地址: https://github.com/alibaba/RemoteShuffleService
开源代码包含核心功能及容错满足生产要求。
计划中的重要Feature:
AESpark多版本支持Better 流控Better 监控Better HA多引擎支持
欢迎各路开发者共建
六 Reference
[1]Min Shen, Ye Zhou, Chandni Singh. Magnet: Push-based Shuffle Service for Large-scale Data Processing. VLDB 2020. [2]Haoyu Zhang, Brian Cho, Ergin Seyfe, Avery Ching, Michael J. Freedman. Riffle: Optimized Shuffle Service for Large-Scale Data Analytics. EuroSys 2018. [3]Sriram Rao, Raghu Ramakrishnan, Adam Silberstein. Sailfish: A Framework For Large Scale Data Processing. SoCC 2012. [4]KFS. http://code.google.com/p/kosmosfs/ [5]Google Dataflow Shuffle. https://cloud.google.com/blog/products/data-analytics/how-distributed-shuffle-improves-scalability-and-performance-cloud-dataflow-pipelines [6]Cosco: An Efficient Facebook-Scale Shuffle Service. Cosco: An Efficient Facebook-Scale Shuffle Service - Databricks [7]Flash for Apache Spark Shuffle with Cosco. Flash for Apache Spark Shuffle with Cosco - Databricks [8]Uber Zeus. Zeus: Ubers Highly Scalable and Distributed Shuffle as a Service - Databricks [9]Uber Zeus. https://github.com/uber/RemoteShuffleService [10]Intel RPMP. Accelerating Apache Spark Shuffle for Data Analytics on the Cloud with Remote Persistent Memory Pools - Databricks [11]Tencent FireStorm. https://github.com/Tencent/Firestorm [12]Aliyun RSS在趣头条的实践. 降本增效利器趣头条Spark Remote Shuffle Service最佳实践-阿里云开发者社区 [13]Aliyun RSS架构. Serverless Spark的弹性利器 - EMR Shuffle Service-阿里云开发者社区
原文链接
本文为阿里云原创内容未经允许不得转载。