专业做能源招聘的网站,如何做行业平台网站,天津建设工程信息网欧美小镇农贸市场,it培训机构Flink中Graph转换流程如下#xff1a; Flink Job提交时各种类型Graph转换流程中#xff0c;JobGraph是Client端形成StreamGraph后经过Operator Chain优化后形成的#xff0c;然后提交给JobManager的Restserver#xff0c;最终转发给JobManager的Dispatcher处理。
Completa…Flink中Graph转换流程如下 Flink Job提交时各种类型Graph转换流程中JobGraph是Client端形成StreamGraph后经过Operator Chain优化后形成的然后提交给JobManager的Restserver最终转发给JobManager的Dispatcher处理。
CompletableFutureAcknowledge submitJob(JobGraph jobGraph, RpcTimeout Time timeout);本文主要解析从JobGraph转换为ExecutionGraph过程执行栈如下
Dispacher::submitJob
Dispacher::internalSubmitJob
Dispacher::persistAndRunJob
Dispacher::runJob
Dispacher::createJobManagerRunner
JobMasterServiceLeadershipRunnerFactory::createJobManagerRunner
JobMasterServiceLeadershipRunner:start
JobMasterServiceLeadershipRunner::grantLeadership
JobMasterServiceLeadershipRunner::startJobMasterServiceProcessAsync
JobMasterServiceLeadershipRunner::verifyJobSchedulingStatusAndCreateJobMasterServiceProcess
JobMasterServiceLeadershipRunner::createNewJobMasterServiceProcess
DefaultJobMasterServiceProcessFactory::create
DefaultJobMasterServiceProcess::new
DefaultJobMasterServiceFactory::createJobMasterService
DefaultJobMasterServiceFactory::internalCreateJobMasterService //创建JobMaster并调用其start
JobMaster::new //调用DefaultSlotPoolServiceSchedulerFactory::createScheduler
DefaultSlotPoolServiceSchedulerFactory::createScheduler //根据调度模式选择调度器
DefaultSchedulerFactory::createInstance //创建SchedulerNG
DefaultScheduler::new //
SchedulerBase::newSchedulerBase::createAndRestoreExecutionGraph DefaultExecutionGraphFactory::createAndRestoreExecutionGraphDefaultExecutionGraphBuilder.buildGraph//在此会将JobGraph转换为ExecutionGraphDefaultExecutionGraph::newDefaultExecutionGraph::attachJobGraph //创建ExecutionJobVertexDefaultExecutionTopology.fromExecutionGraph //创建ExecutionTopologyDefaultExecutionGraph::enableCheckpointing //创建CheckpointCoordinatorCheckpointCoordinator::new
PipelinedRegionSchedulingStrategy.Factory.createInstance //创建PipelinedRegionSchedulingStrategyJobMaster::start
JobMaster::onStart
JobMaster::startJobExecution
JobMaster::startJobMasterServices //获取RM地址后与RM建立连接
JobMaster::startScheduling
SchedulerBase::startScheduling
DefaultScheduler::startSchedulingInternal
PipelinedRegionSchedulingStrategy::startScheduling
PipelinedRegionSchedulingStrategy::maybeScheduleRegions
DefaultScheduler::allocateSlotsAndDeploy
DefaultScheduler::allocateSlotsSlotSharingExecutionSlotAllocator::allocateSlotsFor //分配Slot
DefaultScheduler::waitForAllSlotsAndDeployDefaultScheduler::assignAllResourcesAndRegisterProducedPartitionsDefaultScheduler::assignResource //为每个Execution分配SlotDefaultScheduler::registerProducedPartitionsDefaultScheduler::deployAllDefaultScheduler::deployOrHandleErrorDefaultScheduler::deployTaskSafeDefaultExecutionVertexOperations::deployExecutionVertex::deployExecution::deploy //提交任务向TM提交DeploymenTaskManagerGateway.submitTask在整个提交过程中首先获取JobMasterService的Leader权限然后对一个JobGraph生成一个JobMasterJobMaster先将JobGraph转换为ExecutionGraph转换核心逻辑在DefaultExecutionGraph::attachJobGraph方法中最后为每个Execution申请Slot资源对每个Execution向TM提交TaskDeploymentDescriptor调度执行。 JobMaster管理整个Job的生命周期主要有以下功能
将JobGraph转换为ExecutionGraph创建调度器调度执行通过心跳保持与ResourceManager的连接为当前Job向RM申请Slot资源接受TaskManager的OfferSlot, 向TM提交task, 主动发送心跳请求保持与执行当前Job的TM的连接创建CheckpointCoordinator触发Checkpoint
Flink中可通过jobmanager.scheduler配置调度类型默认为NG
NG:new generation scheduler
Adaptive: adaptive scheduler; supports reactive mode