spark源码(七)Worker receive 方法
2022/9/13 14:16:45
本文主要是介绍spark源码(七)Worker receive 方法,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
receive 方法其实是大量的case,分别对应处理不同的场景
case msg: RegisterWorkerResponse case SendHeartbeat case WorkDirCleanup case MasterChanged case ReconnectWorker case LaunchExecutor case executorStateChanged: ExecutorStateChanged case KillExecutor(masterUrl, appId, execId) case LaunchDriver(driverId, driverDesc, resources_) case KillDriver(driverId) case driverStateChanged @ DriverStateChanged(driverId, state, exception) case ReregisterWithMaster case ApplicationFinished(id) case DecommissionWorker case WorkerSigPWRReceived
一. RegisterWorkerResponse 详解
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { msg match { case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress, duplicate) => val preferredMasterAddress = if (preferConfiguredMasterAddress) { masterAddress.toSparkURL } else { masterRef.address.toSparkURL } if (duplicate) { logWarning(s"Duplicate registration at master $preferredMasterAddress") } logInfo(s"Successfully registered with master $preferredMasterAddress") registered = true changeMaster(masterRef, masterWebUiUrl, masterAddress)/*更新master信息*/ forwardMessageScheduler.scheduleAtFixedRate( () => Utils.tryLogNonFatalError { self.send(SendHeartbeat) }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)/*启动一个定时任务 开始心跳*/ if (CLEANUP_ENABLED) { logInfo( s"Worker cleanup enabled; old application directories will be deleted in: $workDir") forwardMessageScheduler.scheduleAtFixedRate( () => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) }, //给自己发送一个清理目录的消息??这是干嘛的 CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) } val execs = executors.values.map { e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state) } //给master发送一个当前状态的信息 masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq)) case RegisterWorkerFailed(message) => if (!registered) { logError("Worker registration failed: " + message) System.exit(1) } case MasterInStandby => // Ignore. Master not yet ready. } }
二. SendHeartbeat 详解
if (connected) { sendToMaster(Heartbeat(workerId, self)) } private def sendToMaster(message: Any): Unit = { master match { case Some(masterRef) => masterRef.send(message)//给master发送一个心跳信息 case None => logWarning( s"Dropping $message because the connection to master has not yet been established") } }
三. WorkDirCleanup 详解
//所有的executors + drivers 目录 val appIds = (executors.values.map(_.appId) ++ drivers.values.map(_.driverId)).toSet try { val cleanupFuture: concurrent.Future[Unit] = concurrent.Future { val appDirs = workDir.listFiles() if (appDirs == null) { throw new IOException("ERROR: Failed to list files in " + appDirs) } appDirs.filter { dir => val appIdFromDir = dir.getName val isAppStillRunning = appIds.contains(appIdFromDir) //当前是一个目录,并且不运行了,APP_DATA_RETENTION_SECONDS 是过了这个时间就清理目录配置项 dir.isDirectory && !isAppStillRunning && !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS) }.foreach { dir => logInfo(s"Removing directory: ${dir.getPath}") Utils.deleteRecursively(dir) if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && conf.get(config.SHUFFLE_SERVICE_ENABLED)) { //是移除shuffle服务中的文件的 并不是是清理所用文件的 //我也记得是任务kill的时候会看到清理目录的日志的 shuffleService.applicationRemoved(dir.getName) } } }(cleanupThreadExecutor) cleanupFuture.failed.foreach(e => logError("App dir cleanup failed: " + e.getMessage, e) )(cleanupThreadExecutor) } catch { case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown => logWarning("Failed to cleanup work dir as executor pool was shutdown") }
四. MasterChanged 详解
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) //worker节点只是 被动接受主节点改变的事实 改变自身的配置即可 //不存在 消息发送 changeMaster(masterRef, masterWebUiUrl, masterRef.address) val executorResponses = executors.values.map { e => WorkerExecutorStateResponse(new ExecutorDescription( e.appId, e.execId, e.cores, e.state), e.resources) } val driverResponses = drivers.keys.map { id => WorkerDriverStateResponse(id, drivers(id).resources)} //把当前任务的每个状态给 master汇报一下就行了 masterRef.send(WorkerSchedulerStateResponse( workerId, executorResponses.toList, driverResponses.toSeq))
五. ReconnectWorker 详解
registerWithMaster() //这个方法上面有介绍的
这篇关于spark源码(七)Worker receive 方法的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-04安装 VPrix Desktop 的系统要求-icode9专业技术文章分享
- 2024-05-01巧用 TiCDC Syncpoint 构建银行实时交易和准实时计算一体化架构
- 2024-05-01银行核心背后的落地工程体系丨Oracle - TiDB 数据迁移详解
- 2024-04-26高性能表格工具VTable总体构成-icode9专业技术文章分享
- 2024-04-16软路由代理问题, tg 无法代理问题-icode9专业技术文章分享
- 2024-04-16程序猿用什么锅-icode9专业技术文章分享
- 2024-04-16自建 NAS 的方案-icode9专业技术文章分享
- 2024-04-14ansible 在远程主机上执行脚本,并传入参数-icode9专业技术文章分享
- 2024-04-14ansible 在远程主机上执行脚本,并传入参数, 加上remote_src: yes 配置-icode9专业技术文章分享
- 2024-04-14ansible 检测远程主机的8080端口,如果关闭,则echo 进程已关闭-icode9专业技术文章分享