spark源码之环境准备
2021/10/14 20:15:23
本文主要是介绍spark源码之环境准备,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
我们使用yarn集群作为研究
spark环境准备
- yarn环境准备
- 启动Driver
- 启动Executor
yarn环境准备
spark的入口类是SparkSubmit
,在这里,我们开始提交参数
这里的args就是--class
这些的。
解析好这些参数后,我们会返回一个SparkSubmitArguments
的一个属性action
的值并进行模式匹配。
我们可以看到,action
默认就是SUBMIT
。
于是走submit分支:
假设我们没有使用--proxy-user
参数,于是走runMain(args, uninitLog)
。
走进这个方法,首先他要准备提交的环境,并且我们关注返回元组中的childMainClass
。
进入prepareSubmitEnvironment(args)
。
因为我们是yarn集群模式,所以childMainClass = YARN_CLUSTER_SUBMIT_CLASS
。
而YARN_CLUSTER_SUBMIT_CLASS
是一个全类名:
拿到这个全类名后我们问他是不是SparkApplication
的子类,如果是就用反射new出来,然后再转成SparkApplication
。
所以YarnClusterApplication
就创建出来了。
创建出来后我们调用它的start
方法。
在start
方法中我们要new一个客户端,进去:
他会创建一个YarnClient
。这时候我们就要开始和RM建立连接了。
再走run
方法:
现在要开始提交应用了。
yarnClient
完成初始化并启动。
我们想知道它提交的是些什么。
所以,进到创建容器环境的代码:
val containerContext = createContainerLaunchContext(newAppResponse)
它会将一些指令封装好提交给RM,所以我们关注这个amClass
是个什么。
因为我们使用的是集群模式,所以amClass
就是
org.apache.spark.deploy.yarn.ApplicationMaster
所以yarn客户端将启动ApplicationMaster
的指令给RM,RM就会让其中一个NM来启动,因为命令是bin/java ApplicationMaster
,所以起的是一个进程。
要执行ApplicationMaster
这个进程,就是要执行main方法,进到他的main方法去:
在main方法中它new了一个ApplicationMaster
,进去:
在ApplicationMaster
里面又new了一个YarnRMClient
,这个东西就是ApplicationMaster
用来和RM通信的。
再回到ApplicationMaster
的main方法,这个AM创建出来后,去看它的run
方法。
在这个run
方法中,它判断我们是不是集群模式(显然我们是),是的话就走runDriver()
。
启动Driver
进入runDriver()
。
先看
val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS))
他在等待一个SparkContext
,没有等到就阻塞。
所以我们进startUserApplication()
看看发生了什么。
这个--class
就是我们命令行传的主类参数:
拿到我们的main方法之后就是用invoke
调用。
当然这是跑在一个线程中的:
启动的这个线程,就是我们的Driver:
调用我们的main后一定会有SparkContext
,随便的一个spark程序,都会创建SparkContext
:
于是之前阻塞的代码就可以走通了:
启动Executor
我们接着往下走:
首先他要注册AM,注册的含义AM就是要向RM申请资源。
然后要创建分配器,相当于是要分配资源了。
这里的client就是之前的YarnRMClient
,就是用来和RM通信的。
它创建了分配器之后就要开始分配资源了。
如果可使用的容器数量大于0(yarn中资源是以container的形式存在的),就要开始着手处理这些容器了。
然后我们要去跑分配好的容器。
跑的时候他有一个线程池,走到里面的run
方法去。
在run
方法里面它初始化了NM,然后启动容器。
最终是由NM来启动容器,nmClient.startContainer(container.get, ctx)
里面有一个上下文对象,这个ctx
里面封装了一些指令。
我们进到prepareCommand()
里面去查看是哪些指令。
显然,他又是用bin/java
的形式启动了一个进程,这个进程就是YarnCoarseGrainedExecutorBackend
。
接下来我们需要知道这个YarnCoarseGrainedExecutorBackend
是什么。
走进它的run
方法。
他首先要创建一个RpcEndpointRef
类型的driver变量,这个东西就是用来和真正的Driver
做通信的。
然后将自己(YarnCoarseGrainedExecutorBackend
)设置为一个终端。其实就是一个通信的终端,我们看他设置的时候做了什么。
一路跟下去。
他会创建一个DedicatedMessageLoop
。
在这个类中,他会创建一个Inbox
对象,就是一个收件箱。
Inbox
回给自己发一个OnStart
的指令。
他自己匹配到OnStart
之后,就会调用终端onStart()
方法。我们跟进去:
终端会给Driver发一个RegisterExecutor
对象,表示要注册Excutor。
那Driver那边谁接受呢?当然是Driver里面的SparkContext
。
SparkContext
里面有一个SchedulerBackend
,他就是用来接发消息的。
这个实现类相当于是Driver方面的终端。
当他匹配到RegisterExecutor
对象时,
最后返回了true
。
此时Executor端的终端接收到成功的消息后,就给自己发一个消息,说去注册Executor吧。
然后它自己和自己匹配到RegisteredExecutor
后,就完成Executor的注册。
我们最后回到ApplicationMaster
。
刚才是在createAllocator
这里离开的。Executor创建完成后,调用resumeDriver()
来恢复Driver的执行,就是我们写的那些RDD。
这篇关于spark源码之环境准备的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-24怎么修改Kafka的JVM参数?-icode9专业技术文章分享
- 2024-12-23线下车企门店如何实现线上线下融合?
- 2024-12-23鸿蒙Next ArkTS编程规范总结
- 2024-12-23物流团队冬至高效运转,哪款办公软件可助力风险评估?
- 2024-12-23优化库存,提升效率:医药企业如何借助看板软件实现仓库智能化
- 2024-12-23项目管理零负担!轻量化看板工具如何助力团队协作
- 2024-12-23电商活动复盘,为何是团队成长的核心环节?
- 2024-12-23鸿蒙Next ArkTS高性能编程实战
- 2024-12-23数据驱动:电商复盘从基础到进阶!
- 2024-12-23从数据到客户:跨境电商如何通过销售跟踪工具提升营销精准度?