dolphinscheduler源码解析-MasterSchedulerService
2021/7/29 22:35:43
本文主要是介绍dolphinscheduler源码解析-MasterSchedulerService,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
dolphinscheduler 源码解析-MasterSchedulerService
文章目录
- dolphinscheduler 源码解析-MasterSchedulerService
- 类定义
- 类属性
- 初始化方法
类定义
@Service public class MasterSchedulerService extends Thread
可以看出该类继承了线程基类,那该类就可以在线程池内执行。
类属性
/** * logger of MasterSchedulerService */ private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class); /** * dolphinscheduler database interface */ @Autowired private ProcessService processService; /** * zookeeper master client */ @Autowired private MasterRegistryClient masterRegistryClient; /** * master config */ @Autowired private MasterConfig masterConfig; /** * alert manager */ @Autowired private ProcessAlertManager processAlertManager; /** * netty remoting client */ private NettyRemotingClient nettyRemotingClient; /** * master exec service */ private ThreadPoolExecutor masterExecService;
可以看出它有一个ProcessService
这个属性集成了很多mappers类,提供数据dao服务。还有一个ProcessAlertManager
告警的管理器,另外也有netty的客户端和一个线程池。
初始化方法
@PostConstruct public void init() { this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); }
对线程池进行赋值,并且创建一个netty的客户端。
既然是线程类,就必须有run方法,我们查看一下run方法
/** * run of MasterSchedulerService */ @Override public void run() { logger.info("master scheduler started"); while (Stopper.isRunning()) { try { boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); if (!runCheckFlag) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } scheduleProcess(); } catch (Exception e) { logger.error("master scheduler thread error", e); } } }
该run方法会先检查一下资源,看是否有空闲资源,如果没有就让线程睡眠一会儿然后重新检查资源,当有了足够的资源就开始执行scheduleProcess
方法
我们向下追踪scheduleProcess
方法
private void scheduleProcess() throws Exception { try { masterRegistryClient.blockAcquireMutex(); int activeCount = masterExecService.getActiveCount(); // make sure to scan and delete command table in one transaction Command command = processService.findOneCommand(); if (command != null) { logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType()); try { ProcessInstance processInstance = processService.handleCommand(logger, getLocalAddress(), this.masterConfig.getMasterExecThreads() - activeCount, command); if (processInstance != null) { logger.info("start master exec thread , split DAG ..."); masterExecService.execute( new MasterExecThread( processInstance , processService , nettyRemotingClient , processAlertManager , masterConfig)); } } catch (Exception e) { logger.error("scan command error ", e); processService.moveToErrorCommand(command, e.toString()); } } else { //indicate that no command ,sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); } } finally { masterRegistryClient.releaseLock(); } }
首先获取一个master的分布式锁
查看master的executor线程池中有多少active状态的线程。
从数据库中拿到一个command命令
然后构造一个该命令对应的实体类ProcessInstance
然后在该类的线程池内执行MasterExecThread线程【点击打开MasterExecThread】。
这篇关于dolphinscheduler源码解析-MasterSchedulerService的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享