Spark核心编程系列(六)——共享变量
2021/6/13 22:21:40
本文主要是介绍Spark核心编程系列(六)——共享变量,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
目录
共享变量
累加器(分布式只写变量)
广播变量(分布式只读变量)
广播变量的一些原理
参考
共享变量
Spark提供的两种共享变量(广播变理和累加器)的一种。为什么要使用共享变量呢?通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传,也就是说有结果Driver程序是拿不到的!共享变量就是为了解决这个问题。
累加器(分布式只写变量)
累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。累加器只能够增加。 只有driver能获取到Accumulator的值(使用value方法),Task(excutor)只能对其做增加操作(使用 +=,当然加负数就是减法操作)。并一般称累加器为分布式共享只写变量。
Spark内置了三种类型的Accumulator,分别是LongAccumulator用来累加整数型,DoubleAccumulator用来累加浮点型,CollectionAccumulator用来累加集合元素。当内置的Accumulator无法满足要求时,可以继承AccumulatorV2实现自定义的累加器。
注意:内置的累加器LongAccumulator、DoubleAccumulator、CollectionAccumulator和自定义累加器,它们都有一个共同的特点,就是最终的结果不受累加数据顺序的影响(对于CollectionAccumulator来说,可以简单的将结果集看做是一个无序Set),就相当于开了一百个线程,每个线程随机sleep若干毫秒然后往StringBuffer中追加字符,最后追加出来的字符串是无法被预测的。总结一下就是累加器的最终结果应该不受累加顺序的影响,否则就要重新审视一下这个累加器的设计是否合理。
val rdd = sc.makeRDD(List(1,2,3,4)) val sumAcc = sc.longAccumulator("sum") // 如果改为map算子,会出现少加的情况 rdd.foreach( num=>{ // 使用累加器 sumAcc.add(num) } ) // 如果再执行collect操作,则出现多加的情况 rdd.collect().foreach(println) // 获取累加器的值 println(sumAcc.value)
但是,累加器存在多加以及少加的情况。
少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行,值依旧是初始值;
多加:如果多次调用行动算子,且没有缓存的情况下,将会多次进行累加计算;
所以,一般情况下,累加器会放置在行动算子中进行操作。
自定义变量
当内置的Accumulator无法满足要求时,可以继承AccumulatorV2实现自定义的累加器。实现自定义累加器的步骤:
1. 继承AccumulatorV2,定义泛型,实现相关方法
2. 创建自定义Accumulator的实例,然后在SparkContext上注册它
// 累加器实现wordCount class MyAccumulator extends AccumulatorV2[String, mutable.Map[String,Long]] { private var wcMap = mutable.Map[String, Long]() //判断是否为初始状态 override def isZero: Boolean = {wcMap.isEmpty} override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = { new MyAccumulator() } override def reset(): Unit = { wcMap.clear() } override def add(word:String):Unit = { val newCnt = wcMap.getOrElse(word, 0L) + 1 wcMap.update(word, newCnt) } override def merge(other:AccumulatorV2[String, mutable.Map[String,Long]]):Unit = { val map1 = this.wcMap val map2 = other.value map2.foreach{ case(word, count) => { val newCnt = map1.getOrElse(word, 0L) + count map1.update(word, newCnt) } } } override def value:mutable.Map[String,Long] = { wcMap } }
广播变量(分布式只读变量)
当分区数量大于Executor的数量时,任务的执行就相当于是进行了并发的操作,而非并行操作。所以,数据全部进入同一个Executor导致数据的倾斜内存不足,即闭包数据都是以Task为单位发送的,每个任务中包含闭包数据,这样可能导致一个Executor中含有大量的重复数据,并且占用大量的内存。
Executor其实就是一个JVM,所以在启动时,会自动分配内存。完全可以将任务中的闭包数据放置在Executor的内存中,达到共享数据的目的。但是该数据无法进行更改,即Spark中的广播变量就可以将闭包的数据保存到Executor的内存中,Spark中的广播变量不能更改。
// bc就是我们传的广播变量 val bc:Broadcast[mutable.Map[String, Int]] = sc.broadcast(map) // 读取广播变量 bc.value
广播变量的一些原理
广播变量,初始的时候,就在Drvier上有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager(负责管理某个Executor对应的内存和磁盘上的数据)中,尝试获取变量副本;如果本地没有,那么就从Driver远程拉取变量副本,并保存在本地的BlockManager中;此后这个executor上的task,都会直接使用本地的BlockManager中的副本。executor的BlockManager除了从driver上拉取,也可能从其他节点的BlockManager上拉取变量副本。
参考
https://www.bilibili.com/video/BV11A411L7CK?p=111
这篇关于Spark核心编程系列(六)——共享变量的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-09-28微服务架构中API版本控制的实践
- 2024-09-28AI给的和自己写的Python代码,都无法改变输入框的内容,替换也不行
- 2024-09-27Sentinel配置限流资料:新手入门教程
- 2024-09-27Sentinel配置限流资料详解
- 2024-09-27Sentinel限流资料:新手入门教程
- 2024-09-26Sentinel限流资料入门详解
- 2024-09-26Springboot框架资料:初学者入门教程
- 2024-09-26Springboot框架资料详解:新手入门教程
- 2024-09-26Springboot企业级开发资料:新手入门指南
- 2024-09-26SpringBoot企业级开发资料新手指南