当前位置: 首页 > news >正文

东莞专业网站设计专业服务正规少儿编程排名

东莞专业网站设计专业服务,正规少儿编程排名,中国最新军事新闻头条,网站建设客户去哪找引言 典型的Spark作业读取位于OSS的Parquet外表时#xff0c;源端的并发度#xff08;task/partition#xff09;如何确定#xff1f;特别是在做TPCH测试时有一些疑问#xff0c;如源端扫描文件的并发度是如何确定的#xff1f;是否一个parquet文件对应一个partition源端的并发度task/partition如何确定特别是在做TPCH测试时有一些疑问如源端扫描文件的并发度是如何确定的是否一个parquet文件对应一个partition多个parquet文件对应一个partition还是一个parquet文件对应多个partition本文将从源码角度进行分析进而解答这些疑问。 分析 数据源读取对应的物理执行节点为FileSourceScanExec读取数据代码块如下 lazy val inputRDD: RDD[InternalRow] {val readFile: (PartitionedFile) Iterator[InternalRow] relation.fileFormat.buildReaderWithPartitionValues(sparkSession relation.sparkSession,dataSchema relation.dataSchema,partitionSchema relation.partitionSchema,requiredSchema requiredSchema,filters pushedDownFilters,options relation.options,hadoopConf relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))val readRDD if (bucketedScan) {createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,relation)} else {createReadRDD(readFile, dynamicallySelectedPartitions, relation)}sendDriverMetrics()readRDD} 主要关注非bucket的处理对于非bucket的扫描调用createReadRDD方法定义如下 /*** Create an RDD for non-bucketed reads.* The bucketed variant of this function is [[createBucketedReadRDD]].** param readFile a function to read each (part of a) file.* param selectedPartitions Hive-style partition that are part of the read.* param fsRelation [[HadoopFsRelation]] associated with the read.*/private def createReadRDD(readFile: (PartitionedFile) Iterator[InternalRow],selectedPartitions: Array[PartitionDirectory],fsRelation: HadoopFsRelation): RDD[InternalRow] {// 文件打开开销每次打开文件最少需要读取的字节 val openCostInBytes fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes// 最大切分分片大小val maxSplitBytes FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)logInfo(sPlanning scan with bin packing, max size: $maxSplitBytes bytes, sopen cost is considered as scanning $openCostInBytes bytes.)// Filter files with bucket pruning if possibleval bucketingEnabled fsRelation.sparkSession.sessionState.conf.bucketingEnabledval shouldProcess: Path Boolean optionalBucketSet match {case Some(bucketSet) if bucketingEnabled // Do not prune the file if bucket file name is invalidfilePath BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)case _ _ true}// 对分区下文件进行切分并按照从大到小进行排序val splitFiles selectedPartitions.flatMap { partition partition.files.flatMap { file // getPath() is very expensive so we only want to call it once in this block:val filePath file.getPathif (shouldProcess(filePath)) {// 文件是否可splitparquet/orc/avro均可被splitval isSplitable relation.fileFormat.isSplitable(relation.sparkSession, relation.options, filePath)// 切分文件PartitionedFileUtil.splitFiles(sparkSession relation.sparkSession,file file,filePath filePath,isSplitable isSplitable,maxSplitBytes maxSplitBytes,partitionValues partition.values)} else {Seq.empty}}}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)val partitions FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)new FileScanRDD(fsRelation.sparkSession, readFile, partitions)} 可以看到确定最大切分分片大小maxSplitBytes对于后续切分为多少个文件非常重要其核心逻辑如下 def maxSplitBytes(sparkSession: SparkSession,selectedPartitions: Seq[PartitionDirectory]): Long {// 读取文件时打包成最大的partition大小默认为128MB对应一个block大小val defaultMaxSplitBytes sparkSession.sessionState.conf.filesMaxPartitionBytes// 打开每个文件的开销默认为4MBval openCostInBytes sparkSession.sessionState.conf.filesOpenCostInBytes// 建议的不保证最小分割文件分区数默认未设置从leafNodeDefaultParallelism获取// 代码逻辑调用链 SparkSession#leafNodeDefaultParallelism - SparkContext#defaultParallelism// - TaskSchedulerImpl#defaultParallelism - CoarseGrainedSchedulerBackend#defaultParallelism// - 总共多少核max(executor core总和, 2)最少为2val minPartitionNum sparkSession.sessionState.conf.filesMinPartitionNum.getOrElse(sparkSession.leafNodeDefaultParallelism)// 总共读取的大小val totalBytes selectedPartitions.flatMap(_.files.map(_.getLen openCostInBytes)).sum// 单core读取的大小val bytesPerCore totalBytes / minPartitionNum// 计算大小不会超过设置的128MBMath.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))} 对于PartitionedFileUtil#splitFiles其核心逻辑如下较为简单直接按照最大切分大小切分大文件来进行分片 def splitFiles(sparkSession: SparkSession,file: FileStatus,filePath: Path,isSplitable: Boolean,maxSplitBytes: Long,partitionValues: InternalRow): Seq[PartitionedFile] {if (isSplitable) {// 切分为多个分片(0L until file.getLen by maxSplitBytes).map { offset val remaining file.getLen - offsetval size if (remaining maxSplitBytes) maxSplitBytes else remainingval hosts getBlockHosts(getBlockLocations(file), offset, size)PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)}} else {Seq(getPartitionedFile(file, filePath, partitionValues))}} 在获取到Seq[PartitionedFile]列表后还并没有完成对文件的切分还需要调用FilePartition#getFilePartitions做最后的处理方法核心逻辑如下 def getFilePartitions(sparkSession: SparkSession,partitionedFiles: Seq[PartitionedFile],maxSplitBytes: Long): Seq[FilePartition] {val partitions new ArrayBuffer[FilePartition]val currentFiles new ArrayBuffer[PartitionedFile]var currentSize 0L/** Close the current partition and move to the next. */def closePartition(): Unit {if (currentFiles.nonEmpty) {// Copy to a new Array.// 重新生成一个新的PartitionFileval newPartition FilePartition(partitions.size, currentFiles.toArray)partitions newPartition}currentFiles.clear()currentSize 0}// 打开文件开销默认为4MBval openCostInBytes sparkSession.sessionState.conf.filesOpenCostInBytes// Assign files to partitions using Next Fit DecreasingpartitionedFiles.foreach { file if (currentSize file.length maxSplitBytes) {// 如果累加的文件大小大于的最大切分大小则关闭该分区表示完成一个Task读取的数据切分closePartition()}// Add the given file to the current partition.currentSize file.length openCostInBytescurrentFiles file}// 最后关闭一次分区文件可能较小closePartition()partitions.toSeq} 可以看到经过这一步后会把一些小文件做合并生成maxSplitBytes大小的PartitionFile这样可以避免拉起太多task读取太多小的文件。 生成的FileScanRDD(new FileScanRDD(fsRelation.sparkSession, readFile, partitions))的并发度为partitions的长度也即最后Spark生成的Task个数 override protected def getPartitions: Array[RDDPartition] filePartitions.toArray 整体流程图如下图所示 拆分、合并过程如下图所示 实战 对于TPCH 10G生成的customer parquet表 https://oss.console.aliyun.com/bucket/oss-cn-hangzhou/fengzetest/object?pathrt_spark_test%2Fcustomer-parquet%2F 共8个Parquet文件总文件大小为113.918MB Spark作业配置如下executor只有1core conf spark.driver.resourceSpecsmall; conf spark.executor.instances1; conf spark.executor.resourceSpecsmall; conf spark.app.nameSpark SQL Test; conf spark.adb.connectorsoss; use tpcd; select * from customer order by C_CUSTKEY desc limit 100; 根据前面的公式计算 defaultMaxSplitBytes 128MB openCostInBytes 4MB minPartitionNum max(1, 2) 2 totalBytes 113.918 8 * 4MB 145.918MB bytesPerCore 145.918MB / 2 72.959MB maxSplitBytes 72.959MB Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) 得到maxSplitBytes为72.959MB从日志中也可看到对应大小 经过排序后的文件顺序为(00000, 00001, 00002, 00003, 00004, 00006, 00005, 00007)再次经过合并后得到3个FilePartitioned分别对应 FilePartitioned 1: 00000, 00001, 00002FilePartitioned 2: 00003, 00004, 00006FilePartitioned 3: 00005, 00007 即总共会生成3个Task 从Spark UI查看确实生成3个Task 从日志查看也是生成3个Task 变更Spark作业配置5个executor共10core conf spark.driver.resourceSpecsmall; conf spark.executor.instances5; conf spark.executor.resourceSpecmedium; conf spark.app.nameSpark SQL Test; conf spark.adb.connectorsoss; use tpcd; select * from customer order by C_CUSTKEY desc limit 100; 根据前面的公式计算 defaultMaxSplitBytes 128MB openCostInBytes 4MB minPartitionNum max(10, 2) 10 totalBytes 113.918 8 * 4MB 145.918MB bytesPerCore 145.918MB / 10 14.5918MB maxSplitBytes 14.5918MB Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) 查看日志 此时可以看到14.5918MB会对源文件进行切分会对00001, 00002,00003,00004,00005,00006进行切分切分成两份00007由于小于14.5918MB因此不会进行切分经过PartitionedFileUtil#splitFiles后总共存在7 * 2 1 15个PartitionedFile 00000(0 - 14.5918MB), 00000(14.5918MB - 15.698MB)00001(0 - 14.5918MB), 00001(14.5918MB - 15.632MB)00002(0 - 14.5918MB), 00002(14.5918MB - 15.629MB)00003(0 - 14.5918MB), 00003(14.5918MB - 15.624MB)00004(0 - 14.5918MB), 00004(14.5918MB - 15.617MB)00005(0 - 14.5918MB), 00005(14.5918MB - 15.536MB)00006(0 - 14.5918MB), 00006(14.5918MB - 15.539MB)00007(0 - 4.634MB) 经过排序后得到如下以及合并后得到10个FilePartitioned分别对应 FilePartitioned 1: 00000(0 - 14.5918MB)FilePartitioned 2: 00001(0 - 14.5918MB)FilePartitioned 3: 00002(0 - 14.5918MB)FilePartitioned 4: 00003(0 - 14.5918MB)FilePartitioned 5: 00004(0 - 14.5918MB)FilePartitioned 6: 00005(0 - 14.5918MB)FilePartitioned 7: 00006(0 - 14.5918MB)FilePartitioned 8: 00007(0 - 4.634MB),00000(14.5918MB - 15.698MB)FilePartitioned 9: 00001(14.5918MB - 15.632MB),00002(14.5918MB - 15.629MB),00003(14.5918MB - 15.624MB)FilePartitioned 10: 00004(14.5918MB - 15.617MB),00005(14.5918MB - 15.536MB),00006(14.5918MB - 15.539MB) 即总共会生成10个Task 通过Spark UI也可查看到生成了10个Task 查看日志000004(14.5918MB - 15.617MB),00005(14.5918MB - 15.536MB),00006(14.5918MB - 15.539MB)在同一个Task中 00007(0 - 4.634MB),00000(14.5918MB - 15.698MB) 00001(14.5918MB - 15.632MB),00002(14.5918MB - 15.629MB),00003(14.5918MB - 15.624MB)在同一个Task中 总结 通过源码可知Spark对于源端Partition切分会考虑到分区下所有文件大小以及打开每个文件的开销同时会涉及对大文件的切分以及小文件的合并最后得到一个相对合理的Partition。 原文链接 本文为阿里云原创内容未经允许不得转载。
http://wiki.neutronadmin.com/news/386089/

相关文章:

  • 徐汇网站建设公司深圳社保
  • 网站地图建设有什么用网站建设公司案例
  • 望江县住房和城乡建设局网站被跨境电商骗了怎么办
  • 贵阳商城网站建设wordpress安装显示英文
  • 网站 工作室做网站就业要会什么问题
  • php mysql 网站建设新媒体运营公司排行榜
  • 新闻类的网站有哪些类型整站优化推广品牌
  • 织梦wap网站模板wordpress网站内容
  • 来宾seo网站建设seo优化推广
  • 电影网站做淘客门户网站建设相关需求
  • 鞍山网站制作开发内蒙古优途国际旅行社
  • 杭州高端网站定制网站开发策划
  • 沈阳网站建设莫道网络怎么做静态网站
  • 专注邯郸建设手机网站企业小程序建设的公司
  • 怎么做定位钓鱼网站谁用fun域名做网站了
  • 站长之家关键词查询入侵wordpress
  • 电子商务网站建设与完整实例注册安全工程师注册管理系统
  • 如何选择合适的建站公司杭州做网站怎么收费多少
  • 适合做网站的软件有哪些装修全包报价明细表2023
  • 建的企业网站如何在百度搜到专门做建筑设计图库的网站设计
  • 网站建设 用英语企业网站如何设置关键词
  • 旅游网站建设方案简介制作作品的软件
  • 济南网站制做百度手机助手网页
  • 做网站是什么职业免费收录网提交
  • 做网站好不好网站播放功能难做吗
  • 创建视频网站免费注册网站换主机换域名
  • 南昌做兼职的网站wordpress concise
  • 网站建设的书 豆瓣建筑人才网最新招聘信息发布
  • 建设网站链接建筑行业信息平台
  • 网站的服务器打不开网站建设 的销售图片