写在前面
在 【Flink源码】再谈 Flink 程序提交流程(中) 一文中,笔者后来发现谬误颇多,且随着 Flink 版本的更迭,部分方法实现方式已发生较大改变。因此,思虑再三决定针对 JobManager 相关源码根据最新的 Flink 版本(1.17)单独成文。
JobManager 是什么?
Flink 的主节点 JobManager 是一个逻辑上的主节点,针对不同的部署模式,主节点的实现类也不一样
JobManager 有三大核心内容:ResourceManager、Dispatcher 和 WebMonitorEndpoint
当 Client 提交一个 Job 到集群时(Client 会把 Job 构建成一个 JobGraph),主节点接收到提交的 Job 的 Rest 请求后,WebMonitorEndpoint 会通过 Router 进行解析找到对应的 Handler 来执行处理,处理完毕后交由 Dispatcher,Dispatcher 负责搭起 JobMaster 来负责这个 Job 内部的 Task 的部署执行,执行 Task 所需的资源由 JobMaster 向 ResourceManager 申请
JobManager 启动源码
JobManager 启动流程
JobManager 的启动流程分为三个部分:
初始化 8 个基础服务创建工厂实例通过不同的工厂实例创建三大核心组件 ResourceManager、Dispatcher、WebMonitorEndpoint主节点准备工作
我们以 Standalone 模式为例,下同
找到主节点启动类 StandaloneSessionClusterEntrypoint
StandaloneSessionClusterEntrypoint.java
public static void main(String[] args) { // startup checks and logging EnvironmentInformation.logEnvironmentInfo( LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); // 解析 flink run 命令的参数 final EntrypointClusterConfiguration entrypointClusterConfiguration = ClusterEntrypointUtils.parseParametersOrExit( args, new EntrypointClusterConfigurationParserFactory(), StandaloneSessionClusterEntrypoint.class); // 解析 flink-conf.yaml 配置文件 Configuration configuration = loadConfiguration(entrypointClusterConfiguration); // 创建主节点 StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration); // 启动主节点 ClusterEntrypoint.runClusterEntrypoint(entrypoint);}
在这个入口类主要做了四件事:
解析提交作业命令的参数解析 flink-conf.yaml 配置文件创建主节点启动主节点首先来看解析 flink-conf.yaml 的过程
public static Configuration loadConfiguration( final String configDir, @Nullable final Configuration dynamicProperties) { if (configDir == null) { throw new IllegalArgumentException( "Given configuration directory is null, cannot load configuration"); } final File confDirFile = new File(configDir); if (!(confDirFile.exists())) { throw new IllegalConfigurationException( "The given configuration directory name '" + configDir + "' (" + confDirFile.getAbsolutePath() + ") does not describe an existing directory."); } // get Flink yaml configuration file // TODO 读取flink-conf.yaml文件 final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME); // 文件不存在则报错 if (!yamlConfigFile.exists()) { throw new IllegalConfigurationException( "The Flink config file '" + yamlConfigFile + "' (" + yamlConfigFile.getAbsolutePath() + ") does not exist."); } // TODO 解析flink-conf.yaml文件 Configuration configuration = loadYAMLResource(yamlConfigFile); if (dynamicProperties != null) { configuration.addAll(dynamicProperties); } return configuration;}
首先根据 conf 路径将文件读进来,再通过 loadYAMLResource() 方法解析文件中的配置,并将 configuration 返回出去
主节点启动过程
ClusterEntrypoint.java
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName(); try { // 启动主类 clusterEntrypoint.startCluster(); } catch (ClusterEntrypointException e) { LOG.error( String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e); System.exit(STARTUP_FAILURE_RETURN_CODE); } int returnCode; Throwable throwable = null; try { returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode(); } catch (Throwable e) { throwable = ExceptionUtils.stripExecutionException(e); returnCode = RUNTIME_FAILURE_RETURN_CODE; } LOG.info( "Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable); System.exit(returnCode);}public void startCluster() throws ClusterEntrypointException { LOG.info("Starting {}.", getClass().getSimpleName()); try { FlinkSecurityManager.setFromConfiguration(configuration); PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); configureFileSystems(configuration, pluginManager); SecurityContext securityContext = installSecurityContext(configuration); ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration); securityContext.runSecured( (Callable<Void>) () -> { runCluster(configuration, pluginManager); return null; }); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); try { // clean up any partial state shutDownAsync( ApplicationStatus.FAILED, ShutdownBehaviour.GRACEFUL_SHUTDOWN, ExceptionUtils.stringifyException(strippedThrowable), false) .get( INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { strippedThrowable.addSuppressed(e); } throw new ClusterEntrypointException( String.format( "Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()), strippedThrowable); }}
我们继续进入 runCluster 方法
该方法是主节点启动的核心方法,主要做了三件事:
首先来看初始化八大基础服务
protected void initializeServices(Configuration configuration, PluginManager pluginManager) throws Exception { LOG.info("Initializing cluster services."); synchronized (lock) { resourceId = configuration .getOptional(JobManagerOptions.JOB_MANAGER_RESOURCE_ID) .map( value -> DeterminismEnvelope.deterministicValue( new ResourceID(value))) .orElseGet( () -> DeterminismEnvelope.nondeterministicValue( ResourceID.generate())); LOG.debug( "Initialize cluster entrypoint {} with resource id {}.", getClass().getSimpleName(), resourceId); workingDirectory = ClusterEntrypointUtils.createJobManagerWorkingDirectory( configuration, resourceId); LOG.info("Using working directory: {}.", workingDirectory); rpcSystem = RpcSystem.load(configuration); // 初始化和启动 AkkaRpcService,内部包装了 ActorSystem // 创建一个 AkkaRpc 服务,基于 Akka 的 RpcService 实现 // commonRpcService 是一个基于 Akka 的 ActorSystem,其实就是一个 TCP 的 RPC 服务,端口:6123 commonRpcService = RpcUtils.createRemoteRpcService( rpcSystem, configuration, configuration.getString(JobManagerOptions.ADDRESS), getRPCPortRange(configuration), configuration.getString(JobManagerOptions.BIND_HOST), configuration.getOptional(JobManagerOptions.RPC_BIND_PORT)); // 启动一个 JMXService,用于客户端连接 JobManager,JVM 监控 JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT)); // update the configuration used to create the high availability services configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); // 初始化 IO 线程池,大小为当前节点 CPU 核心数 * 4 ioExecutor = Executors.newFixedThreadPool( ClusterEntrypointUtils.getPoolSize(configuration), new ExecutorThreadFactory("cluster-io")); // 初始化一个基于 Zookeeper 的 HA 服务:ZookeeperHaServices haServices = createHaServices(configuration, ioExecutor, rpcSystem); // 初始化大文件存储 BlobServer 服务端 // 所谓大文件例如上传 Flink-job 的 jar 时所依赖的一些需要一起上传的 jar,或者 TaskManager 上传的 log 文件等 blobServer = BlobUtils.createBlobServer( configuration, Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()), haServices.createBlobStore()); blobServer.start(); configuration.setString(BlobServerOptions.PORT, String.valueOf(blobServer.getPort())); // 心跳服务 heartbeatServices = createHeartbeatServices(configuration); delegationTokenManager = KerberosDelegationTokenManagerFactory.create( getClass().getClassLoader(), configuration, commonRpcService.getScheduledExecutor(), ioExecutor); // 启动 Metric(性能监控)相关服务,内部也是启动一个 ActorSystem metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem); final RpcService metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService( configuration, commonRpcService.getAddress(), configuration.getString(JobManagerOptions.BIND_HOST), rpcSystem); metricRegistry.startQueryService(metricQueryServiceRpcService, null); final String hostname = RpcUtils.getHostname(commonRpcService); processMetricGroup = MetricUtils.instantiateProcessMetricGroup( metricRegistry, hostname, ConfigurationUtils.getSystemResourceMetricsProbingInterval( configuration)); // 初始化一个用来存储 ExecutionGraph 的 Store,实现是 FileArchivedExecutionGraphStore // JobGraphStore 会在 Dispatcher 启动时启动 executionGraphInfoStore = createSerializableExecutionGraphStore( configuration, commonRpcService.getScheduledExecutor()); }}
初始化的服务:
commonRPCService:基于 Akka 的 RpcService 实现。内部包装了 ActorSystem,这个服务其实就是一个 TCP 的 RPC 服务,端口为 6123JMXService:启动一个 JMXService,用于客户端连接 JobManager JVM 监控IOExecutor:启动一个线程池,大小为当前节点 CPU 核心数 * 4haServices:初始化一个基于 Zookeeper 的 HA 服务 ZookeeperHaServices,提供对高可用性的所有服务的访问注册,分布式计数器和领导人选举BlobServer:初始化大文件存储 BlobServer 服务端,所谓大文件例如上传 Flink-job 的 jar 时所依赖的一些需要一起上传的 jar,或者 TaskManager 上传的 log 文件等heartbeatServices:提供心跳所需的所有服务,包括创建心跳接收器和心跳发送者metricRegistry:启动 Metric(性能监控)相关服务,内部也是启动一个 ActorSystem,跟踪所有已注册的 Metric,作为连接 MetricGroup 和 MetricReporterarchivedExecutionGraphStore:存储执行图 ExecutionGraph 的可序列化形式。注意此处不是 JobGraphStore,JobGraphStore 会在 Dispatcher 启动时启动接下来创建核心工厂类
找到 StandaloneSessionClusterEntrypoint 类
StandaloneSessionClusterEntrypoint.java
protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { // 创建第一个工厂 StandaloneResourceManagerFactory return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory( StandaloneResourceManagerFactory.getInstance());}
进入 createSessionComponentFactory 方法
DefaultDispatcherResourceManagerComponentFactory.java
public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory( ResourceManagerFactory<?> resourceManagerFactory) { // 构建工厂 return new DefaultDispatcherResourceManagerComponentFactory( // 第二个工厂 DefaultDispatcherRunnerFactory.createSessionRunner( SessionDispatcherFactory.INSTANCE), // 第一个工厂 resourceManagerFactory, // 第三个工厂 SessionRestEndpointFactory.INSTANCE);}
可见,主节点一共创建了三个核心组件的工厂实例:
生产 DefaultDispatcherRunner生产 StandaloneResourceManager生产 DispatcherRestEndpoint接下来通过工厂实例创建 ResourceManager、DispatcherRunner、WebMonitorEndpoint
DispatcherRunner,实现是:DefaultDispatcherRunnerResourceManager,实现是:StandaloneResourceManagerWebMonitorEndpoint,实现是:DispatcherRestEndpoint我们从 dispatcherResourceManagerComponentFactory.create 开始看
第一步:首先初始化一些监控服务
DefaultDispatcherResourceManagerComponentFactory.java
// 监控 DispatcherdispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();// 监控 ResourceManagerresourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();// ResourceManager 的 GatewayRetrieverfinal LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>( rpcService, DispatcherGateway.class, DispatcherId::fromUuid, new ExponentialBackoffRetryStrategy( 12, Duration.ofMillis(10), Duration.ofMillis(50)));
第二步:构建一个线程池用于执行 WebMonitorEndpoint 所接收到的 client 发送过来的请求
// 创建线程池用于执行 WebMonitorEndpoint 所接收到的 client 发送过来的请求final ScheduledExecutorService executor = WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint");
第三步:初始化 MetricFetcher,刷新间隔 10s
// 初始化 MetricFetcher,刷新间隔 10sfinal long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);final MetricFetcher metricFetcher = updateInterval == 0 ? VoidMetricFetcher.INSTANCE : MetricFetcherImpl.fromConfiguration( configuration, metricQueryServiceRetriever, dispatcherGatewayRetriever, executor);
第四步:创建 WebMonitorEndpoint 实例,并启动,在Standalone 模式下为:DispatcherRestEndpoint 该实例内部会启动一个 Netty 服务端,绑定了一堆 Handler
// 创建 WebMonitorEndpoint 实例,在 Standalone 模式下为:DispatcherRestEndpoint// 该实例内部会启动一个 Netty 服务端,绑定了一堆 HandlerwebMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler);// 启动 WebMonitorEndpointlog.debug("Starting Dispatcher REST endpoint.");webMonitorEndpoint.start();
第五步:创建 ResourceManager 对象
ResourceManager 是一个 RpcEndpoint(Actor),当构建好对象后启动时会触发 onStart(Actor 的 perStart 生命周期方法)方法ResourceManager 也是一个 LeaderContender,也会执行竞选,会执行竞选结果方法ResourceManagerService 具有两个心跳服务和两个定时服务: 两个心跳服务:从节点和主节点之间的心跳,Job 的主控程序和主节点之间的心跳两个定时服务:TaskManager 的超时检查服务 Slot 申请的超时检查服务// 创建 ResourceManager 对象resourceManagerService = ResourceManagerServiceImpl.create( resourceManagerFactory, configuration, resourceId, rpcService, highAvailabilityServices, heartbeatServices, delegationTokenManager, fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname, ioExecutor);
第六步:构建了一个 DispatcherRunner,注意不是 Dispatcher,Dispatcher 的构建和启动时再 DispatcherRunner 内部实现的
// 创建 dispatcherRunner 对象并启动log.debug("Starting Dispatcher.");dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, new HaServicesJobPersistenceComponentFactory(highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices);
第七步:启动 ResourceManager
// 启动 ResourceManagerlog.debug("Starting ResourceManagerService.");resourceManagerService.start();
至此,JobManager 启动完毕
关于 ResourceManager、WebMonitorEndpoint、Dispatcher 的启动流程留待后文讨论