dolphinscheduler源码解析-MasterSchedulerService
2021/7/29 22:35:43
本文主要是介绍dolphinscheduler源码解析-MasterSchedulerService,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
dolphinscheduler 源码解析-MasterSchedulerService
文章目录
- dolphinscheduler 源码解析-MasterSchedulerService
- 类定义
- 类属性
- 初始化方法
类定义
@Service public class MasterSchedulerService extends Thread
可以看出该类继承了线程基类,那该类就可以在线程池内执行。
类属性
/** * logger of MasterSchedulerService */ private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class); /** * dolphinscheduler database interface */ @Autowired private ProcessService processService; /** * zookeeper master client */ @Autowired private MasterRegistryClient masterRegistryClient; /** * master config */ @Autowired private MasterConfig masterConfig; /** * alert manager */ @Autowired private ProcessAlertManager processAlertManager; /** * netty remoting client */ private NettyRemotingClient nettyRemotingClient; /** * master exec service */ private ThreadPoolExecutor masterExecService;
可以看出它有一个ProcessService
这个属性集成了很多mappers类,提供数据dao服务。还有一个ProcessAlertManager
告警的管理器,另外也有netty的客户端和一个线程池。
初始化方法
@PostConstruct public void init() { this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); }
对线程池进行赋值,并且创建一个netty的客户端。
既然是线程类,就必须有run方法,我们查看一下run方法
/** * run of MasterSchedulerService */ @Override public void run() { logger.info("master scheduler started"); while (Stopper.isRunning()) { try { boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); if (!runCheckFlag) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } scheduleProcess(); } catch (Exception e) { logger.error("master scheduler thread error", e); } } }
该run方法会先检查一下资源,看是否有空闲资源,如果没有就让线程睡眠一会儿然后重新检查资源,当有了足够的资源就开始执行scheduleProcess
方法
我们向下追踪scheduleProcess
方法
private void scheduleProcess() throws Exception { try { masterRegistryClient.blockAcquireMutex(); int activeCount = masterExecService.getActiveCount(); // make sure to scan and delete command table in one transaction Command command = processService.findOneCommand(); if (command != null) { logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType()); try { ProcessInstance processInstance = processService.handleCommand(logger, getLocalAddress(), this.masterConfig.getMasterExecThreads() - activeCount, command); if (processInstance != null) { logger.info("start master exec thread , split DAG ..."); masterExecService.execute( new MasterExecThread( processInstance , processService , nettyRemotingClient , processAlertManager , masterConfig)); } } catch (Exception e) { logger.error("scan command error ", e); processService.moveToErrorCommand(command, e.toString()); } } else { //indicate that no command ,sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); } } finally { masterRegistryClient.releaseLock(); } }
首先获取一个master的分布式锁
查看master的executor线程池中有多少active状态的线程。
从数据库中拿到一个command命令
然后构造一个该命令对应的实体类ProcessInstance
然后在该类的线程池内执行MasterExecThread线程【点击打开MasterExecThread】。
这篇关于dolphinscheduler源码解析-MasterSchedulerService的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-06-26终极指南:Scrum中如何设置需求优先级
- 2024-06-26AI大模型企业应用实战(25)-为Langchain Agent添加记忆功能
- 2024-06-26小白家庭 nas 搭建方案-icode9专业技术文章分享
- 2024-06-23AI大模型企业应用实战(14)-langchain的Embedding
- 2024-06-23AI大模型企业应用实战(15)-langchain核心组件
- 2024-06-23AI大模型企业应用实战(16)-langchain核心组件
- 2024-06-23AI 大模型企业应用实战(06)-初识LangChain
- 2024-06-19EntBot.ai: AI Website Chatbot for Product Guides and Development Doc
- 2024-06-17zero-shot-learning-definition-examples-comparison
- 2024-06-06Package Easy(基于 NSIS 的打包exe安装包工具)使用方法-icode9专业技术文章分享