成都网站建设排名,下载中国移动商旅100最新版本,建一个平台网站需要多少钱,外贸推广是做什么的目录 1、Job Stage划分
2、Task最佳位置
3、总结
3.1 Stage划分总结#xff1a;
3.2 Task最佳位置总结#xff1a; 1、Job Stage划分
Spark Application中因为不同的Action触发众多的Job#xff0c;也就是说一个Application中可以有很多的Job#xff0c;每个Job是由是…目录 1、Job Stage划分
2、Task最佳位置
3、总结
3.1 Stage划分总结
3.2 Task最佳位置总结 1、Job Stage划分
Spark Application中因为不同的Action触发众多的Job也就是说一个Application中可以有很多的Job每个Job是由是由一个或者多个Stage构成的后面的Stage依赖于前面的Stage也就是说只有前面依赖的Stage计算完毕后后面的Stage才会运行。而Stage划分的依据就是宽依赖。下面以RDD的collect方法为例
1他是一个action会触发一个具体的作业runJob 2runJob有很多重载方法不断地往里调用最后交给dagScheduler的runJob在dagScheduler的runJob交给了submitJob后面还有一个等待作业结果看成功还是失败会有相应的动作。 3在submitJob中首先看一下分区长度是因为要进行计算这个肯定是RDD导致的action他要校验一下是不是在运行的时候相应的Partition存在。
eventProcessLoop调用post的时候有个Jobsubmitted的参数他是一个case class因为一个application中可能有很多的Job不同的job的Jobsubmitted实例不一样所以不能用case object。他里面封装了job的id最后一个RDD具体对RDD操作的函数有哪些Partition要被计算监听作业状态等。
他的核心就是将Jobsubmitted交给eventProcessLoop。他是通过post方法post给eventProcessLoop这个post其实就是发往EventLoop里面的eventQueue 4发现在EventLoop里面开辟了一个线程他是setDaemon方式作为后台线程因为要在后台做不断的循环如果是前台线程的话对垃圾回收是有影响的在run方法里面会不断的循环我们的消息队列从eventQueue是一个LinkedBlockingDeque我们可以往他里面信息中获得消息调用了onReceive发现在里面没有具体的实现所以在DAGSchedulerEventProcessLoop中对onReceive进行了实现这里就收到了DAGSchedulerEvent这里面再调用doOnReceive。doOnReceive收到信息就开始处理 5接下来就是HandleJobSubmited。这个时候Stage就开始了。我们知道最后一个Stage一定是ResultStage前面所有的Stage都是ShuffleMapStage。 6发现有个getOrCreateParentStages的方法开始创建ResultStage的父stage里面有多个嵌套获取shuffle依赖和循环创建shuffleMapStage若没有shuffle操作则返回空list 进入到创建父Stage的方法getOrCreateParentStages这里仅仅是抽取当前RDD的shuffle依赖shuffleMapStage如果不是shuffleDependency就继续抽取父RDD迭代遍历一直到抽取出为止或者没有 进入getOrCreateShuffleMapStage方法中进行匹配能不能取到ParentStage的值当没有parentStage的时候会返回空能取到就返回stageShuffleMapStage是根据遍历出的ShuffleDependencies一次次创建出来的 进入createShuffleMapStage方法 此方法是递归循环创建shuffleMapStage的过程 这个时候ShuffleMapStage已经创建完成了并不是一次就创建完成而是遇见shuffle的时候会由下往上递归创建ShuffleMapStage
7构建完所有的ShuffleMapStage后将其作为参数创建ResultStage 8最后将Stage和id关联更新job所有的Stage并将Stage返回出去。 9回到handleJobsubmited方法中finalStage构建完之后新建一个ActiveJob保存了当前job的一些信息打印一堆日志之类。getMissingParentStages(finalStage)根据finalStage刚才找父Stage的时候如果有的话直接返回如果没有的话就会创建所以如果曾经有就不需要再去做。listenerBus.post监听事件最后submitStage(finalStage)。 首先获得id如果jobId是defined的话再次getMissingParentStages(stage)获得missing的stage之后判断一下是否为空如果为空的话就submitMissingTasks(stage, jobId.get)个就是没有前置性的Tasks也就是没有父Stage。在这个底层其实是DAGScheduler把这个处理的过程交给具体的TaskScheduler去处理
2、Task最佳位置
1在handleJobsubmited方法中最后是最后调用submitStage在他里面会调用submitMissingTasks 2这里面有很多代码我们要关心Stage本身的算法以及Task任务本地性把当前的Stage加进去然后对Stage进行判断一种是ShuffleMapStage一种是ResultStage。继续往下走会看到taskIdToLocations这是关键的代码taskIdToLocations是一个Map partitionsToCompute这里面获得是具体的要计算的PartitionID我们我们这边看到的map里面的id是Partition的id。这里面匿名函数产生的是tuple根据Partition的id。后面toMap就是Partition的id和TaskLocation的位置。
3进入到getPreferredLocs(stage.rdd, id)进来的是RDDPartitionID返回的是一个集合。 再进入getPreferredLocsInternal visited: HashSet[(RDD[_], Int)]这个HashSet开始是空所以直接传进来一个new HashSet然后判断visited如果已经有的话那么添加就不成功那么就是已经计算了数据本地性了就返回Nil。
下面的cached就是已经在DAGScheduler的内存数据结构中了。进入getCacheLocs这边返回的是序列cacheLocs是一个HashMap这包含了每个RDD的Partition的id以及id对应的taskLocation这个包含了Stage本身也包含了Stage内部任务的本地性 4回到getPreferredLocsInternal中上面是看一下DAGScheduler中有没有缓存根据Partition而保存的数据本地性的内容如果不为空的话就把内容返回。然后调用下面的getpreferdLocations如果自定义一个RDD的话是一定要写这个方法的 5最后判断一下如果是窄依赖的话就自己调用自己 3、总结
3.1 Stage划分总结
1Action触发Job开始逆向分析job执行过程Action中利用SparkContext runJob路由到dagScheduler.runJob(rdd,func,分区数其他)提交Job作业
2DAGScheduler的runJob中调用submitJob并返回监听waiter生命周期内监听Job状态
3在submitJob内部将该获取到的Job(已有JobId插入到名为eventProcessLoop的LinkedBlockingDeque结构的事件处理队列中
4eventProcessLoop放入新事件后调起底层的DAGSchedulerEventProcessLoop的onReceive方法
5执行doOnReceive根据DAGSchedulerEvent的具体类型如JobSubmitted事件或者MapStageSubmitted事件调取具体的Submitted handle函数提交具体的Job
6以JobSubmitted为例在handleJobSubmitted内部返回从ResultStage 建立stage 建立finalStage createResultStage(finalRDD, func, partitions, jobId, callSite)finalStage激活Job val job new ActiveJob(jobId, finalStage, callSite, listener, properties)同时开始逆向构建缺失的stage
7DAG构建完毕提交stagesubmitStage(finalStage)submitStage中stage提交为taskssubmitMissingTaskssubmitMissingTasks根据ShuffleMapStage还是ResultStage创建 ShuffleMapTask 或 ResultTask。
7taskScheduler.submitTasks开始调起具体的task
3.2 Task最佳位置总结
1在划分Stage的时候submitMissingTasks方法中会有一个taskIdToLocations的属性他的结构为 Map[Int, Seq[TaskLocation]]他保存的就是PartitionID及其对应的最佳位置
2在对taskIdToLocations赋值的时候会调用getPreferredLocs方法再路由到getPreferredLocsInternal返回最佳位置Seq[TaskLocation]
3在getPreferredLocsInternal方法中
①判断rdd的partition是否被访问过如果被访问过则什么都不做
②然后判断DAGScheduler的内存中是否cache了在当前Paritition的信息如果有的话直接返回
③如果没有cache则调用rdd.getPreferredLocations方法获取RDD partition的最佳位置
④遍历RDD的依赖如果有窄依赖遍历父依赖的partition对遍历到的每个partition递归调用getPreferredLocsInternal方法
即从第一个窄依赖的第一个partition开始然后将每个partition的最佳位置添加到序列中最后返回所有partition的最佳位置序列
注意DAGScheduler计算数据本地性的时候借助了RDD自身的getPreferredLocations中的数据因为getPreferredLocations中表明了每个Partition的数据本地性虽然当前Partition可能被persist或者checkpoint但是persist或者checkpoint默认情况下肯定是和getPreferredLocations中的Partition的数据本地性是一致的所以这就极大的简化Task数据本地性算法的实现和效率的优化。