书接上回,【Flink源码】再谈 Flink 程序提交流程(上) 一文中我们已经将程序从客户端提交给了 ResourceManager
接下来我们就去 ResourceManager 中一探究竟
创建 Dispatcher、ResourceManager
YarnJobClusterEntrypoint 类是 Yarn per-job 集群的入口,包含了我们想看的 main 方法
YarnJobClusterEntrypoint.java
public static void main(String[] args) { LOG.warn( "Job Clusters are deprecated since Flink 1.15. Please use an Application Cluster/Application Mode instead."); // startup checks and logging EnvironmentInformation.logEnvironmentInfo( LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); Map<String, String> env = System.getenv(); final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key()); Preconditions.checkArgument( workingDirectory != null, "Working directory variable (%s) not set", ApplicationConstants.Environment.PWD.key()); try { YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG); } catch (IOException e) { LOG.warn("Could not log YARN environment information.", e); } final Configuration dynamicParameters = ClusterEntrypointUtils.parseParametersOrExit( args, new DynamicParametersConfigurationParserFactory(), YarnJobClusterEntrypoint.class); final Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env); YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(configuration); ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);}
main 方法中通过 ClusterEntrypoint.runClusterEntrypoint 方法以 YarnJobClusterEntrypoint 对象为参数加载运行入口
ClusterEntrypoint.java
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName(); try { clusterEntrypoint.startCluster(); } catch (ClusterEntrypointException e) { LOG.error( String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e); System.exit(STARTUP_FAILURE_RETURN_CODE); } int returnCode; Throwable throwable = null; try { returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode(); } catch (Throwable e) { throwable = ExceptionUtils.stripExecutionException(e); returnCode = RUNTIME_FAILURE_RETURN_CODE; } LOG.info( "Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable); System.exit(returnCode);}public void startCluster() throws ClusterEntrypointException { LOG.info("Starting {}.", getClass().getSimpleName()); try { FlinkSecurityManager.setFromConfiguration(configuration); PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); configureFileSystems(configuration, pluginManager); SecurityContext securityContext = installSecurityContext(configuration); ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration); securityContext.runSecured( (Callable<Void>) () -> { runCluster(configuration, pluginManager); return null; }); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); try { // clean up any partial state shutDownAsync( ApplicationStatus.FAILED, ShutdownBehaviour.GRACEFUL_SHUTDOWN, ExceptionUtils.stringifyException(strippedThrowable), false) .get( INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { strippedThrowable.addSuppressed(e); } throw new ClusterEntrypointException( String.format( "Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()), strippedThrowable); }}private void runCluster(Configuration configuration, PluginManager pluginManager) throws Exception { synchronized (lock) { initializeServices(configuration, pluginManager); // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); // 创建 dispatcher、ResourceManager 对象的工厂类 // 其中有从本地重新构建 JobGraph 的过程 final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); // 通过工厂类创建 dispatcher、ResourceManager 对象 // Entry 启动 RpcService、HAService、BlobServer、HeartbeatService、MetricRegistry、ExecutionGraphStore 等 clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, resourceId.unwrap(), ioExecutor, commonRpcService, haServices, blobServer, heartbeatServices, delegationTokenManager, metricRegistry, executionGraphInfoStore, new RpcMetricQueryServiceRetriever( metricRegistry.getMetricQueryServiceRpcService()), this); clusterComponent .getShutDownFuture() .whenComplete( (ApplicationStatus applicationStatus, Throwable throwable) -> { if (throwable != null) { shutDownAsync( ApplicationStatus.UNKNOWN, ShutdownBehaviour.GRACEFUL_SHUTDOWN, ExceptionUtils.stringifyException(throwable), false); } else { // This is the general shutdown path. If a separate more // specific shutdown was // already triggered, this will do nothing shutDownAsync( applicationStatus, ShutdownBehaviour.GRACEFUL_SHUTDOWN, null, true); } }); }}
我们继续看创建过程
找到 DispatcherResourceManagerComponentFactory 接口的实现类 DefaultDispatcherResourceManagerComponentFactory
DefaultDispatcherResourceManagerComponentFactory.java
public DispatcherResourceManagerComponent create( Configuration configuration, ResourceID resourceId, Executor ioExecutor, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, DelegationTokenManager delegationTokenManager, MetricRegistry metricRegistry, ExecutionGraphInfoStore executionGraphInfoStore, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { LeaderRetrievalService dispatcherLeaderRetrievalService = null; LeaderRetrievalService resourceManagerRetrievalService = null; WebMonitorEndpoint<?> webMonitorEndpoint = null; ResourceManagerService resourceManagerService = null; DispatcherRunner dispatcherRunner = null; try { dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever(); final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>( rpcService, DispatcherGateway.class, DispatcherId::fromUuid, new ExponentialBackoffRetryStrategy( 12, Duration.ofMillis(10), Duration.ofMillis(50))); final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( rpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, new ExponentialBackoffRetryStrategy( 12, Duration.ofMillis(10), Duration.ofMillis(50))); final ScheduledExecutorService executor = WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"); final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL); final MetricFetcher metricFetcher = updateInterval == 0 ? VoidMetricFetcher.INSTANCE : MetricFetcherImpl.fromConfiguration( configuration, metricQueryServiceRetriever, dispatcherGatewayRetriever, executor); // 创建接收前端 Rest 请求的节点 webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler); log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start(); final String hostname = RpcUtils.getHostname(rpcService); // 创建 ResourceManager 对象,返回的是 new YarnResourceManager // 调度过程:AbstractDispatcherResourceManagerComponentFactory // -> ActiveResourceManagerFactory // -> YarnResourceManagerFactory resourceManagerService = ResourceManagerServiceImpl.create( resourceManagerFactory, configuration, resourceId, rpcService, highAvailabilityServices, heartbeatServices, delegationTokenManager, fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname, ioExecutor); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist( configuration, webMonitorEndpoint, ioExecutor); final DispatcherOperationCaches dispatcherOperationCaches = new DispatcherOperationCaches( configuration.get(RestOptions.ASYNC_OPERATION_STORE_DURATION)); final PartialDispatcherServices partialDispatcherServices = new PartialDispatcherServices( configuration, highAvailabilityServices, resourceManagerGatewayRetriever, blobServer, heartbeatServices, () -> JobManagerMetricGroup.createJobManagerMetricGroup( metricRegistry, hostname), executionGraphInfoStore, fatalErrorHandler, historyServerArchivist, metricRegistry.getMetricQueryServiceGatewayRpcAddress(), ioExecutor, dispatcherOperationCaches); // 创建 dispatcherRunner 对象并启动 log.debug("Starting Dispatcher."); dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, new HaServicesJobPersistenceComponentFactory(highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices); // 启动 ResourceManager log.debug("Starting ResourceManagerService."); resourceManagerService.start(); resourceManagerRetrievalService.start(resourceManagerGatewayRetriever); dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); return new DispatcherResourceManagerComponent( dispatcherRunner, resourceManagerService, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, fatalErrorHandler, dispatcherOperationCaches); } catch (Exception exception) { // clean up all started components if (dispatcherLeaderRetrievalService != null) { try { dispatcherLeaderRetrievalService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } } if (resourceManagerRetrievalService != null) { try { resourceManagerRetrievalService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } } final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3); if (webMonitorEndpoint != null) { terminationFutures.add(webMonitorEndpoint.closeAsync()); } if (resourceManagerService != null) { terminationFutures.add(resourceManagerService.closeAsync()); } if (dispatcherRunner != null) { terminationFutures.add(dispatcherRunner.closeAsync()); } final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures); try { terminationFuture.get(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } throw new FlinkException( "Could not create the DispatcherResourceManagerComponent.", exception); }}
至此,我们找到了 dispatcher、ResouceManager 创建和启动方法,接下来我们有必要深入看看具体的过程
创建 YarnResourceManager
首先我们看 YarnResourceManager 创建过程
ResourceManagerFactory.java
public ResourceManager<T> createResourceManager( ResourceManagerProcessContext context, UUID leaderSessionId) throws Exception { final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices( context.getRmRuntimeServicesConfig(), context.getRpcService(), context.getHighAvailabilityServices(), SlotManagerMetricGroup.create( context.getMetricRegistry(), context.getHostname())); return createResourceManager( context.getRmConfig(), context.getResourceId(), context.getRpcService(), leaderSessionId, context.getHeartbeatServices(), context.getDelegationTokenManager(), context.getFatalErrorHandler(), context.getClusterInformation(), context.getWebInterfaceUrl(), ResourceManagerMetricGroup.create( context.getMetricRegistry(), context.getHostname()), resourceManagerRuntimeServices, context.getIoExecutor());}
这里的 createResourceManager 是一个抽象方法,我们找到 ResourceManagerFactory 的 Yarn 实现类 YarnResourceManagerFactory
YarnResourceManagerFactory.java
public ResourceManager<YarnWorkerNode> createResourceManager( Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, ResourceManagerMetricGroup resourceManagerMetricGroup, ResourceManagerRuntimeServices resourceManagerRuntimeServices) { return new YarnResourceManager( rpcService, resourceId, configuration, System.getenv(), highAvailabilityServices, heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), ResourceManagerPartitionTrackerImpl::new, resourceManagerRuntimeServices.getJobLeaderIdService(), clusterInformation, fatalErrorHandler, webInterfaceUrl, resourceManagerMetricGroup);}
创建 YarnResourceManager 时,创建了 SlotManager
我们再继续看一下 SlotManager 是如何创建的
ResourceManagerFactory.java
private ResourceManagerRuntimeServices createResourceManagerRuntimeServices( ResourceManagerRuntimeServicesConfiguration rmRuntimeServicesConfig, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, SlotManagerMetricGroup slotManagerMetricGroup) { return ResourceManagerRuntimeServices.fromConfiguration( rmRuntimeServicesConfig, highAvailabilityServices, rpcService.getScheduledExecutor(), slotManagerMetricGroup);}
ResourceManagerRuntimeServices.java
public static ResourceManagerRuntimeServices fromConfiguration( ResourceManagerRuntimeServicesConfiguration configuration, HighAvailabilityServices highAvailabilityServices, ScheduledExecutor scheduledExecutor, SlotManagerMetricGroup slotManagerMetricGroup) { final SlotManager slotManager = createSlotManager(configuration, scheduledExecutor, slotManagerMetricGroup); final JobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService( highAvailabilityServices, scheduledExecutor, configuration.getJobTimeout()); return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);}
到这里,我们找到了创建 YarnResouceManager 的方法 createSlotManager
创建并启动 Dispatcher
接下来我们看 Dispatcher 的创建和启动过程
找到接口 DispatcherRunnerFactory 的实现类 DefaultDispatcherRunnerFactory
DefaultDispatcherRunnerFactory.java
public DispatcherRunner createDispatcherRunner( LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, JobPersistenceComponentFactory jobPersistenceComponentFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices) throws Exception { final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactoryFactory.createFactory( jobPersistenceComponentFactory, ioExecutor, rpcService, partialDispatcherServices, fatalErrorHandler); return DefaultDispatcherRunner.create( leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);}
再看 DefaultDispatcherRunner 类
DefaultDispatcherRunner.java
public static DispatcherRunner create( LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception { final DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner( leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory); return DispatcherRunnerLeaderElectionLifecycleManager.createFor( dispatcherRunner, leaderElectionService);}
DispatcherRunnerLeaderElectionLifecycleManager.java
public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor( T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception { return new DispatcherRunnerLeaderElectionLifecycleManager<>( dispatcherRunner, leaderElectionService);}private DispatcherRunnerLeaderElectionLifecycleManager( T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception { this.dispatcherRunner = dispatcherRunner; this.leaderElectionService = leaderElectionService; // 启动 dispatcher 的 leader 选举 leaderElectionService.start(dispatcherRunner);}
找到 LeaderElectionService 接口的实现类 StandaloneLeaderElectionService
StandaloneLeaderElectionService.java
public void start(LeaderContender newContender) throws Exception { if (contender != null) { // Service was already started throw new IllegalArgumentException( "Leader election service cannot be started multiple times."); } contender = Preconditions.checkNotNull(newContender); // directly grant leadership to the given contender contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);}
DefaultDispatcherRunner.java
public void grantLeadership(UUID leaderSessionID) { runActionIfRunning( () -> { LOG.info( "{} was granted leadership with leader id {}. Creating new {}.", getClass().getSimpleName(), leaderSessionID, DispatcherLeaderProcess.class.getSimpleName()); startNewDispatcherLeaderProcess(leaderSessionID); });}private void startNewDispatcherLeaderProcess(UUID leaderSessionID) { stopDispatcherLeaderProcess(); dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID); final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess; FutureUtils.assertNoException( previousDispatcherLeaderProcessTerminationFuture.thenRun( newDispatcherLeaderProcess::start));}
AbstractDispatcherLeaderProcess.java
public final void start() { runIfStateIs(State.CREATED, this::startInternal);}private void startInternal() { log.info("Start {}.", getClass().getSimpleName()); state = State.RUNNING; onStart();}
再往下找实现了 onStart 方法的实现类
JobDispatcherLeaderProcess.java
protected void onStart() { final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create( DispatcherId.fromUuid(getLeaderSessionId()), CollectionUtil.ofNullable(jobGraph), CollectionUtil.ofNullable(recoveredDirtyJobResult), ThrowingJobGraphWriter.INSTANCE, jobResultStore); completeDispatcherSetup(dispatcherService);}
DefaultDispatcherGatewayServiceFactory.java
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create( DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, Collection<JobResult> recoveredDirtyJobResults, JobGraphWriter jobGraphWriter, JobResultStore jobResultStore) { final Dispatcher dispatcher; try { dispatcher = dispatcherFactory.createDispatcher( rpcService, fencingToken, recoveredJobs, recoveredDirtyJobResults, (dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(), PartialDispatcherServicesWithJobPersistenceComponents.from( partialDispatcherServices, jobGraphWriter, jobResultStore)); } catch (Exception e) { throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e); } // 启动 dispatcher dispatcher.start(); return DefaultDispatcherGatewayService.from(dispatcher);}
至此,dispatcher 启动完毕
启动 ResourceManager
下面我们来看 ResourceManager 启动过程
ResourceManager.java
public final void onStart() throws Exception { try { log.info("Starting the resource manager."); startResourceManagerServices(); startedFuture.complete(null); } catch (Throwable t) { final ResourceManagerException exception = new ResourceManagerException( String.format("Could not start the ResourceManager %s", getAddress()), t); onFatalError(exception); throw exception; }}private void startResourceManagerServices() throws Exception { try { jobLeaderIdService.start(new JobLeaderIdActionsImpl()); registerMetrics(); startHeartbeatServices(); slotManager.start( getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl(), blocklistHandler::isBlockedTaskManager); delegationTokenManager.start(); initialize(); } catch (Exception e) { handleStartResourceManagerServicesException(e); }}
在 startResourceManagerServices 方法中,包含了初始化、心跳开启、slotManager 开启等操作
到这里,我们总算探究完成了 dispatcher 和 ResourceManager 的创建和启动过程
现在我们回到最开始,继续看 Flink 程序提交流程的下一个步骤
Dispatcher 启动 JobManager
在启动了 dispatcher 和 ResourceManager 后,Dispatcher 启动了 JobManager
要一探究竟首先我们先进入 dispatcher 的实现类 Dispatcher
Dispatcher.java
public void onStart() throws Exception { try { // 启动 Dispatcher startDispatcherServices(); } catch (Throwable t) { final DispatcherException exception = new DispatcherException( String.format("Could not start the Dispatcher %s", getAddress()), t); onFatalError(exception); throw exception; } startCleanupRetries(); // 启动 Job startRecoveredJobs(); this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create( getSelfGateway(DispatcherGateway.class), this.getRpcService().getScheduledExecutor(), this::onFatalError);}private void startRecoveredJobs() { for (JobGraph recoveredJob : recoveredJobs) { runRecoveredJob(recoveredJob); } recoveredJobs.clear();}private void runRecoveredJob(final JobGraph recoveredJob) { checkNotNull(recoveredJob); try { runJob(createJobMasterRunner(recoveredJob), ExecutionType.RECOVERY); } catch (Throwable throwable) { onFatalError( new DispatcherException( String.format( "Could not start recovered job %s.", recoveredJob.getJobID()), throwable)); }}private void runJob(JobGraph jobGraph, ExecutionType executionType) { ... ... CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp); ... ... }CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, longinitializationTimestamp) { final RpcService rpcService = getRpcService(); return CompletableFuture.supplyAsync( () -> { try { JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner( jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler, initializationTimestamp); // 启动 JobManagerRunner runner.start(); return runner; } ... ...}
JobManagerRunnerImpl.java
public void start() throws Exception { try { leaderElectionService.start(this); } catch (Exception e) { log.error("Could not start the JobManager because the leader election service did not start.", e); throw new Exception("Could not start the leader election service.", e); }}
StandaloneLeaderElectionService.java
public void start(LeaderContender newContender) throws Exception { ... ... contender.grantLeadership(HighAvaliabilityServices.DEFAULT_LEADER_ID);}
JobManagerRunnerImpl.java
public void grantLeadership(final UUID leaderSessionID) { synchronized (lock) { if (shutdown) { log.debug("JobManagerRunner cannot be granted leadership because it is already shut down."); return; } leadershipOperation = leadershipOperation.thenCompose( (ignored) -> { synchronized (lock) { // 校验作业的调度状态然后启动作业管理器 return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID); } }); handleException(leadershipOperation, "Could not start the job manager."); }}private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) { final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus(); return jobSchedulingStatusFuture.thenCompose( jobSchedulingStatus -> { if (jobSchedulingStatus == JobSchedulingStatus.DONE) { return jobAlreadyDone(); } else { return startJobMaster(leaderSessionId); } }); }private CompletionStage<Void> startJobMaster(UUID leaderSessionId) { ... ... startFuture = jobMasterService.start(new JobMasterId(leaderSessionId)); ... ...}
JobMaster.java
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception { // make sure we receive RPC and async calls start(); return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);}private void startJobExecution() throws Exception { validateRunsInMainThread(); JobShuffleContext context = new JobShuffleContextImpl(jobGraph.getJobID(), this); shuffleMaster.registerJob(context); // 启动 JobMaster startJobMasterServices(); log.info( "Starting execution of job '{}' ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), getFencingToken()); // 开始调度 startScheduling();}
最终,由 Dispatcher 类经过层层调用找到 JobMaster 类调用了其启动方法。
ResourceManager 启动 SlotManager
在创建了 ResourceManager 和 Dispatcher 之后,Dispatcher 启动了 JobManager,而 ResourceManager 则启动了 SlotManager
下面我们就具体来看这一过程
故事还要从 ResouceManager 类的 onStart 方法说起
ResourceManager.java
public final void onStart() throws Exception { try { log.info("Starting the resource manager."); startResourceManagerServices(); startedFuture.complete(null); } catch (Throwable t) { final ResourceManagerException exception = new ResourceManagerException( String.format("Could not start the ResourceManager %s", getAddress()), t); onFatalError(exception); throw exception; }}// 开启 ResourceManager 服务 private void startResourceManagerServices() throws Exception { try { jobLeaderIdService.start(new JobLeaderIdActionsImpl()); registerMetrics(); startHeartbeatServices(); // 开启 SlotManager slotManager.start( getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl(), blocklistHandler::isBlockedTaskManager); delegationTokenManager.start(); // 初始化 ResourceManager initialize(); } catch (Exception e) { handleStartResourceManagerServicesException(e); }}
由源码可知,该过程分为了两个重要步骤:开启 SlotManager 和初始化 ResourceManager,即创建 Yarn 的 ResourceManager 和 NodeManager 客户端
start 为 SlotManager 接口的方法,找到该接口的实现类 FineGrainedSlotManager,该类中的 start 方法根据给定的 leader id 和 ResourceManager 行为来实现开启 SlotManager
FineGrainedSlotManager.java
public void start( ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions, BlockedTaskManagerChecker newBlockedTaskManagerChecker) { LOG.info("Starting the slot manager."); resourceManagerId = Preconditions.checkNotNull(newResourceManagerId); mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); resourceActions = Preconditions.checkNotNull(newResourceActions); // slot 状态同步 slotStatusSyncer.initialize( taskManagerTracker, resourceTracker, resourceManagerId, mainThreadExecutor); blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker); started = true; // TaskManager 超时检查 taskManagerTimeoutsCheck = scheduledExecutor.scheduleWithFixedDelay( () -> mainThreadExecutor.execute(this::checkTaskManagerTimeouts), 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); registerSlotManagerMetrics();}
步骤已经很明显了
JobManager 申请 Slot
在创建了 JobManager 和 SlotManager 之后,下一步 JobManager 申请了 slot
启动 SlotPool
在 JobMaster 启动之时,同时启动了 SlotPool,向 ResourceManager 注册
JobMaster.java
private void startJobMasterServices() throws Exception { try { // 启动 TaskManager 心跳服务 this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices); // 启动 ResourceManager 心跳服务 this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices); // start the slot pool make sure the slot pool now accepts messages for this leader // 启动 slotPool slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor()); // job is ready to go, try to establish connection with resource manager // - activate leader retrieval for the resource manager // - on notification of the leader, the connection will be established and // the slot pool will start requesting slots // 启动后 slot pool 开始向 slot manager 请求 slot resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); } catch (Exception e) { handleStartJobMasterServicesError(e); }}
向 ResourceManager 注册
经过下面层层调用:
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
-> notifyOfNewResourceManagerLeader()
-> reconnectToResourceManagerLeader()
-> tryConnectToResourceManager()
-> connectToResourceManager()
private void connectToResourceManager() { ... ... resourceManagerConnection = new ResourceManagerConnection( log, jobGraph.getJobID(), resourceId, getAddress(), getFencingToken(), resourceManagerAddress.getAddress(), resourceManagerAddress.getResourceManagerId(), scheduledExecutorService ); resourceManagerConnection.start();}
RegisteredRpcConnection.java
public void start() { checkState(!closed, "The RPC connection is already closed"); checkState( !isConnected() && pendingRegistration == null, "The RPC connection is already started"); final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration(); if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) { newRegistration.startRegistration(); } else { // concurrent start operation newRegistration.cancel(); }}private RetryingRegistration<F, G, S, R> createNewRegistration() { RetryingRegistration<F, G, S, R> newRegistration = checkNotNull(generateRegistration()); CompletableFuture<RetryingRegistration.RetryingRegistrationResult<G, S, R>> future = newRegistration.getFuture(); future.whenCompleteAsync( (RetryingRegistration.RetryingRegistrationResult<G, S, R> result, Throwable failure) -> { if (failure != null) { if (failure instanceof CancellationException) { // we ignore cancellation exceptions because they originate from // cancelling // the RetryingRegistration log.debug( "Retrying registration towards {} was cancelled.", targetAddress); } else { // this future should only ever fail if there is a bug, not if the // registration is declined onRegistrationFailure(failure); } } else { if (result.isSuccess()) { targetGateway = result.getGateway(); onRegistrationSuccess(result.getSuccess()); } else if (result.isRejection()) { onRegistrationRejection(result.getRejection()); } else { throw new IllegalArgumentException( String.format( "Unknown retrying registration response: %s.", result)); } } }, executor); return newRegistration;}
TaskExecutorToResourceManagerConnection.java
protected RetryingRegistration< ResourceManagerId, ResourceManagerGateway, TaskExecutorRegistrationSuccess, TaskExecutorRegistrationRejection> generateRegistration() { return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration( log, rpcService, getTargetAddress(), getTargetLeaderId(), retryingRegistrationConfiguration, taskExecutorRegistration);}ResourceManagerRegistration( Logger log, RpcService rpcService, String targetAddress, ResourceManagerId resourceManagerId, RetryingRegistrationConfiguration retryingRegistrationConfiguration, TaskExecutorRegistration taskExecutorRegistration) { super( log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, resourceManagerId, retryingRegistrationConfiguration); this.taskExecutorRegistration = taskExecutorRegistration;}
SlotPool 申请 slot
注册成功调用 onRegistrationSuccess(),向 ResourceManager 进行 slot 的申请
JobMaster.java 的内部类 ResourceManagerConnection
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) { runAsync( () -> { // filter out outdated connections //noinspection ObjectEquality if (this == resourceManagerConnection) { establishResourceManagerConnection(success); } });}private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) { final ResourceManagerId resourceManagerId = success.getResourceManagerId(); // verify the response with current connection if (resourceManagerConnection != null && Objects.equals( resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) { log.info( "JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerId); final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId(); establishedResourceManagerConnection = new EstablishedResourceManagerConnection( resourceManagerGateway, resourceManagerResourceId); blocklistHandler.registerBlocklistListener(resourceManagerGateway); // 连接到 ResourceManager slotPoolService.connectToResourceManager(resourceManagerGateway); partitionTracker.connectToResourceManager(resourceManagerGateway); resourceManagerHeartbeatManager.monitorTarget( resourceManagerResourceId, new ResourceManagerHeartbeatReceiver(resourceManagerGateway)); } else { log.debug( "Ignoring resource manager connection to {} because it's duplicated or outdated.", resourceManagerId); }}
DeclarativeSlotPoolService.java
public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) { this.resourceManagerGateway = checkNotNull(resourceManagerGateway); // work on all slots waiting for this connection for (PendingRequest pendingRequest : waitingForResourceManager.values()) { // 向 ResourceManager 申请 slot requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); } // all sent off waitingForResourceManager.clear();}private void requestSlotFromResourceManager(final ResourceManagerGateway resourceManagerGateway, final PendingRequest pendingRequest) { ... ... CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot( jobMasterId, new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress), rpcTimeout); ... ... }
ResourceManager.java:由 ResourceManager 里的 SlotManager 处理请求
public CompletableFuture<Acknowledge> requestSlot( JobMasterId jobMasterId,SlotRequest slotRequest, final Time timeout) { ... ... try { // SlotManager 处理 slot 请求 slotManager.registerSlotRequest(slotRequest); } ... ...}
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException { checkInit(); ... ... PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest); try { internalRequestSlot(pendingSlotRequest); } ... ... }private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throwsResourceManagerException { final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile(); OptionalConsumer.of(findMatchingSlot(resourceProfile)) .ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest)) .ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest)); }private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException { ... ... if(!pendingTaskManagerSlotOptional.isPresent()) { pendingTaskManagerSlotOptional = allocateResource(resourceProfile); } ... ...}