Flink源码学习笔记(3)了解Flink HA功能的实现

2022/1/31 20:41:49

本文主要是介绍Flink源码学习笔记(3)了解Flink HA功能的实现,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

封面图片不要使用微信打开文章,可以使用手机/电脑浏览器

使用Flink HA功能维护JobManager中组件的生命周期,可以有效的避免因为JobManager 进程失败导致任务无法恢复的情况。

接下来分享下 Flink HA功能的实现

大纲

  1. 基于Zookeeper+Hadoop HA功能的实现

  2. HA功能的接口概述

  3. 基于Zookeeper实现的HA接口

  4. 手工课: 添加个新的组件并使用HA功能维护生命周期

1.基于Zookeeper+Hadoop HA功能的实现

Zookeeper:

Zookeeper的结构:
  • /leaderlatch : leaderlatch 目录下的节点用于竞选leader (相关类ZooKeeperLeaderElectionService,LeaderContender)

  • /leader : 通过监听leader下的节点获取到leader 信息的实时变化 (相关类ZooKeeperLeaderRetrievalService,LeaderRetrievalListener)

  • /checkpoints : checkpoints 目录下记录任务可用的checkpoint,最终可以获得hadoop的HA目录下的checkpoint metadata 的路径信息 (相关类ZooKeeperCompletedCheckpointStore )

  • /checkpoint-counter :checkpoint的计数器 (相关类ZooKeeperCheckpointIDCounter)

  • /running_job_registry : 运行中的任务及状态 (相关类ZooKeeperRunningJobsRegistry)

    Flink使用 Zookeeper不光负责竞选leader和实时通知其他组件最新的leader信息,还会存放JobManager和任务的信息,保证新的JobManager起来后,这些信息不会丢失。

举例:

ResourceManager在这个节点/leaderlatch/resource_manager_lock竞选到leader之后会在/leader/resource_manager_lock节点更新leader的信息,监听/leader/resource_manager_lock节点变化的其他组件会立即使用新的地址和SessionId连接ResourceManager

Hadoop :

  • FLink会创建基于Hadoop的BlobServer (相关类FileSystemBlobStore)

  • 在HA路径下会保存Hadoop的checkpoint的元数据文件 (相关类FileSystemStateStorageHelper)

 

2.HA功能的接口概述

  • HighAvailabilityServices: 可以获取所有组件的 LeaderRetrievalServiceLeaderElectionService接口及记录JobManager中需要持久化的状态,例如完成的保存点,JobGraph,BlobStore,任务调度的状态

  • LeaderElectionService : 负责选举leader的Service接口。

    具体方法:

    //开启选举服务,一般是在RPC的Endpoint初始化好之后,开始调用选举
    void start(LeaderContender contender) throws Exception;
    //停止选举服务,组件的生命周期结束,停止选举
    void stop() throws Exception;
    //组件选上leader之后的确认操作,并回写信息,比如在基于zk的HA上会向leader目录下的节点回写leader的信息
    void confirmLeaderSessionID(UUID leaderSessionID);
    //判断这个sessiondId是否是leader
    boolean hasLeadership(@Nonnull UUID leaderSessionId);
  • LeaderContender: 参与选举的接口。在Flink中需要实现HA的组件,如: ResourceManager,Dispatcher,WebMonitorEndpoint,每个Job的JobManager都会实现这个接口。 通过LeaderElectionService#start(LeaderContender)方法开始竞选leader

  • LeaderRetrievalService : 实时接收Leader的变更信息的服务。Leader信息变更会调用 LeaderRetrievalListenernotifyLeaderAddress方法通知新Leader的变更信息(address,sessionid)

  • LeaderRetrievalListener: 如果需要实时监听leader的信息,需要实现这个接口。通过对应组件实现的LeaderRetrievalService#start(LeaderRetrievalListener listener)方法实时监听leader的信息

//通知有Leader的信息的变更
void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);
//处理监听Leader服务报错
void handleError(Exception exception);

 

3.基于Zookeeper实现的HA接口

ZooKeeperHaServices

ZooKeeperHaServices(实现自步骤2的HighAvailabilityServices),通过 ZooKeeperHaServices 可以获取每个组件的ZooKeeperLeaderElectionServiceZooKeeperLeaderRetrievalService

  @Override
  public LeaderRetrievalService getAutoRescaleLeaderRetriever() {
    return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESCALE_SERVICE_LEADER_PATH);
  }
  
  @Override
  public LeaderRetrievalService getResourceManagerLeaderRetriever() {
    return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
  }
​
  @Override
  public LeaderRetrievalService getDispatcherLeaderRetriever() {
    return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH);
  }
​
  @Override
  public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
    return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
  }
​
  @Override
  public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
    return getJobManagerLeaderRetriever(jobID);
  }
​
  @Override
  public LeaderRetrievalService getWebMonitorLeaderRetriever() {
    return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, REST_SERVER_LEADER_PATH);
  }
​
  @Override
  public LeaderElectionService getResourceManagerLeaderElectionService() {
    return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
  }
​
  
  
  @Override
  public LeaderElectionService getDispatcherLeaderElectionService() {
    return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH);
  }
​
  @Override
  public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
    return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
  }
​
  @Override
  public LeaderElectionService getWebMonitorLeaderElectionService() {
    return ZooKeeperUtils.createLeaderElectionService(client, configuration, REST_SERVER_LEADER_PATH);
  }

 

ZooKeeperLeaderElectionService

ZooKeeperLeaderElectionService (实现自步骤2的LeaderElectionService)负责Flink组件选举的service。分别实现了以下三个curator的接口

  • LeaderLatchListener : 监听leaderlatch下的对应的组件的节点,已确保当前组件是否获取leader或者失去leadership

  • NodeCacheListener : 监听leader下的对应实例节点发生变化,且当前实例节点是leader,则向leader对应节点重新写入当前实例的连接信息

  • UnhandledErrorListener : 监听是否与zk通信出错

LeaderContender

LeaderContender接口不同的实现对应不同的选举者,举例ResourceManager 在确认选举上leader,旧状态清除后,

  • 会设置旧的FencedRpcEndpoint设置新的Fencingtoken(防止脑裂)

      private CompletableFuture<Boolean> tryAcceptLeadership(final UUID newLeaderSessionID) {
        if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
          final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
    ​
          log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
    ​
          // clear the state if we've been the leader before
          if (getFencingToken() != null) {
            clearStateInternal();
          }
    ​
          setFencingToken(newResourceManagerId);
    ​
          startServicesOnLeadership();
    ​
          return prepareLeadershipAsync().thenApply(ignored -> true);
        } else {
          return CompletableFuture.completedFuture(false);
        }
      }

     

  • 开启与TaskManager,JobManager的心跳服务和SlotManager

protected void startServicesOnLeadership() {
   startHeartbeatServices();
​
   slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
}
  • 向ResourceManager的ZooKeeperLeaderElectionService 确定新的ResourceManager已经成功成为leader

@Override
public void grantLeadership(final UUID newLeaderSessionID) {
   final CompletableFuture<Boolean> acceptLeadershipFuture = clearStateFuture
      .thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor());
​
   final CompletableFuture<Void> confirmationFuture = acceptLeadershipFuture.thenAcceptAsync(
      (acceptLeadership) -> {
         if (acceptLeadership) {
            // confirming the leader session ID might be blocking,
            leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
         }
      },
      getRpcService().getExecutor());
​
   confirmationFuture.whenComplete(
      (Void ignored, Throwable throwable) -> {
         if (throwable != null) {
            onFatalError(ExceptionUtils.stripCompletionException(throwable));
         }
      });
}

有一点需要提到的是confirmLeaderSessionID方法主要是向leader下的节点写入连接信息

@Override
public void confirmLeaderSessionID(UUID leaderSessionID) {
   if (LOG.isDebugEnabled()) {
      LOG.debug(
         "Confirm leader session ID {} for leader {}.",
         leaderSessionID,
         leaderContender.getAddress());
   }
​
   Preconditions.checkNotNull(leaderSessionID);
​
   if (leaderLatch.hasLeadership()) {
      // check if this is an old confirmation call
      synchronized (lock) {
         if (running) {
            if (leaderSessionID.equals(this.issuedLeaderSessionID)) {
               confirmedLeaderSessionID = leaderSessionID;
               writeLeaderInformation(confirmedLeaderSessionID);
            }
         } else {
            LOG.debug("Ignoring the leader session Id {} confirmation, since the " +
               "ZooKeeperLeaderElectionService has already been stopped.", leaderSessionID);
         }
      }
   } else {
      LOG.warn("The leader session ID {} was confirmed even though the " +
            "corresponding JobManager was not elected as the leader.", leaderSessionID);
   }
}

 

ZooKeeperLeaderRetrievalService

ZooKeeperLeaderRetrievalService(实现自步骤2的LeaderRetrievalService),监听Flink Leader信息的变更的服务。分别实现了两个curator的接口:

  • UnhandledErrorListener : 监听是否与zk通信出错

  • NodeCacheListener 监听leader下对应的实例节点是否变更,如果变更则通知持有`LeaderRetrievalListener实现类的其他实例,重新连接该实例的新leader

 

LeaderRetrievalListener的对应实现

比如JobMaster的ResourceManagerLeaderListener(实现自 LeaderRetrievalListener),当收到leader变更,则会连接新的ResourceManager

private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
​
   @Override
   public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
      runAsync(
         () -> notifyOfNewResourceManagerLeader(
            leaderAddress,
            ResourceManagerId.fromUuidOrNull(leaderSessionID)));
   }
​
   @Override
   public void handleError(final Exception exception) {
      handleJobMasterError(new Exception("Fatal error in the ResourceManager leader service", exception));
   }
}

 

4.手工课: 添加个新的组件并使用HA功能维护生命周期

使用HA模块,整体上我们可以非常方便地添加一个新的组件,使用HA维护其生命周期。现在举例添加个RescaleCoordinator组件。

  • ZooKeeperHaServices中实现添加RescaleCoordinator的HA service的实现,这里和其他组件一样,只需要指定不同的leader path即可。

  private static final String RESCALE_SERVICE_LEADER_PATH ="xxxxx";
  @Override
	public LeaderElectionService getAutoRescaleLeaderElectionService() {
		return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESCALE_SERVICE_LEADER_PATH);
	}
	@Override
	public LeaderRetrievalService getAutoRescaleLeaderRetriever() {
		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESCALE_SERVICE_LEADER_PATH);
	}
  • 实现AutoRescaleCoodinator组件,继承自FencedRpcEndpoint(RPC的节点,后面分享Flink RPC实现的时候会详细讲这个) ,LeaderContender(上文提到过),AutoRescaleGateway(RPC调用的接口声明)

public class AutoRescaleCoodinator extends FencedRpcEndpoint<AutoRescaleCoodinatorId> implements LeaderContender, AutoRescaleGateway{
  autoRescaleLeaderElectionService=highAvailabilityServices.getAutoRescaleLeaderElectionService();
}

在AutoRescaleCoodinator启动成功后开始参与选举

	@Override
	protected void onStart() throws Exception {
		autoRescaleLeaderElectionService.start(this);
	}

在被通知选上leader之后,初始化服务,设置Fencingtoken ,最后向LeaderElectionService确认已选上leader

	@Override
	public void grantLeadership(UUID leaderSessionID) {
		logger.info("autorescale coodinator {} grant leadership", leaderSessionID);
		if (autoRescaleLeaderElectionService.hasLeadership(leaderSessionID)) {
			try {
				if (configuration.getBoolean(RescaleOptions.RESCALE_ENABLE)) {
					initAutoRescaleCoordinatorService();
				} else {
					//只能触发手动伸缩容
					logger.info("当前任务未开启自动伸缩容功能");
				}
				setFencingToken(new AutoRescaleCoodinatorId(leaderSessionID));
				autoRescaleLeaderElectionService.confirmLeaderSessionID(leaderSessionID);
			} catch (Exception exception) {
				if (schedulerUtil.isRunning()){
					schedulerUtil.close();
				}
				this.handleError(new RuntimeException("AutoRescaleCoodinator 选主失败",exception));
			}
		}
	}

在revokeLeadership方法中停止AutoRescaleCoodinator内置的服务

@Override
public void revokeLeadership() {
   schedulerUtil.close();
   runAsyncWithoutFencing(
      () -> {
         log.info("AutoRescaleCoordinator  {} was revoked leadership.", getAddress());
         setFencingToken(null);
      });
}
  • 创建RpcGatewayRetriever对象(实现LeaderRetrievalListener接口)

//从ZooKeeperHaServices中获取AutoRescaleLeaderRetriever
autoRescaleLeaderRetrieverService = highAvailabilityServices.getAutoRescaleLeaderRetriever();
//新建RescaleCoodinator的RpcGatewayRetriever
RpcGatewayRetriever<AutoRescaleCoodinatorId, AutoRescaleGateway> rescaleCoGtwRetriever = new RpcGatewayRetriever<>(rpcService, AutoRescaleGateway.class, AutoRescaleCoodinatorId::fromUuid, 10, Time.milliseconds(50L));
//实时从zk监听RescaleCoodinator的信息的变化
autoRescaleLeaderRetrieverService.start(rescaleCoGtwRetriever)
  • 最后通过rescaleCoodinatorRetriever可以获取Gateway接口与RescaleCoodinator通信了

@Override
public CompletableFuture<String> callOnlineRescale(RescaleState rescaleState) {
   return rescaleCoodinatorRetriever.getFuture().thenCompose(
      autoRescaleGateway -> autoRescaleGateway.doRescale(rescaleState)
   );
}

 



这篇关于Flink源码学习笔记(3)了解Flink HA功能的实现的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程