大数据:Spark实战经验总结(python版)
2022/3/1 1:22:24
本文主要是介绍大数据:Spark实战经验总结(python版),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
人工智能
- 大数据,Spark,Hadoop,python,pyspark
-
大数据:Spark实战经验总结 - 1. RDD持久化
- 1)RDD的惰性机制:
- 2)RDD持久化 --- (解决惰性机制的效率问题):
- (1)效率低的背景:
- (2)增加持久化(缓存):
- (3)实际开发中,持久化(缓存)写法:
大数据,Spark,Hadoop,python,pyspark
大数据:Spark实战经验总结
1. RDD持久化
说RDD持久化之前,先来了解一下惰性机制。
1)RDD的惰性机制:
RDD在设计时采用了惰性机制的特性,指的是转换RDD的过程先记录而不发生真正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算。举例说明:
假设/mnt/下又一个文件word.txt,内容如下:
Hadoop is good Spark is fast Spark is better
代码:
from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("My App") sc = SparkContext(conf=conf) lines = sc.textFile("file:///mnt/word.txt") # 记录,并不执行 lineLengths = lines.map(lambda s:len(s)) # 记录,并不执行 totalLength = lineLengths.reduce(lambda a, b: a + b) # 开始执行!
为了看着更清晰,代码不妨写成:
# 记录,并不执行。 rdd1 = sc.textFile("file:///mnt/word.txt") # 是一个RDD对象。 # 记录,并不执行。 rdd2 = rdd1.map(lambda s:len(s)) # 是一个RDD对象。 # 开始执行! totalLength = rdd2.reduce(lambda a, b: a+b) # 是个数字。 """ 打印验证 """ print(rdd1) # file:///mnt/word.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0 print(rdd2) # PythonRDD[5] at RDD at PythonRDD.scala:53 print(totalLength) # 42 # 调用RDD自带的函数,来取出rdd1和rdd2对象中的值 rdd1.foreach(print) # Spark is better Hadoop is good Spark is fast rdd2.foreach(print) # 14 13 15
上面代码中,
(i)第1行语句中的textFile()是一个转换操作(函数返回一个RDD对象),系统只会记录这次转换,并不会真正读取word.txt文件的数据到内存中;
(ii)第2行语句的map()也是一个转换操作(函数返回一个RDD对象),系统只是记录这次转换,不会真正执行map()方法;
(iii)而第3行语句的reduce()是一个“行动”类型的操作(函数返回一个整型数字),这时系统会生成一个作业,触发真正的计算。也就是说,这时才会加载word.txt的数据到内存,生成RDD。
2)RDD持久化 — (解决惰性机制的效率问题):
(1)效率低的背景:
在Spark中,RDD采用惰性求值的机制。导致每次遇到“行动”操作,都会从头开始执行计算(即每次调用行动操作,都会触发一次从头开始的计算),这对于迭代计算而言,代价是很大的,影响效率(因为迭代计算经常需要多次重复使用同一组数据)。下面是多次计算同一个RDD的例子:
li = ["Hadoop", "Spark", "Hive"] rdd = sc.parallelize(li) # 记录操作。生成一个RDD print(rdd.count()) # 行动操作,触发一次真正的从头到尾的计算。运行结果:3 print(','.join(rdd.collect())) # 行动操作,再触发一次真正的从头到尾的计算。运行结果:'Hadoop', 'Spark', 'Hive' # 注:rrd.collect()是以数组形式返回数据集中的所有元素。结果:['Hadoop', 'Spark', 'Hive']
(2)增加持久化(缓存):
为了避免这种重复计算的开销,可以使用RDD的持久化(缓存),方法是使用persist()函数将一个RDD标记为持久化。注意:之所以“标记为持久化”,是因为出现persist()语句的地方并不会马上计算生成RDD并把它持久化,而是要对等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化。持久化后的RDD将会被保留在计算节点的内存中,被后面的行动操作重复时候。
persist()使用的时候有两种参数供选择:
- persist(MEMORY_ONLY):仅内存,超出内存则覆盖(LRU原则)。表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。 ---- 默认这种。
- persist(MEMORY_AND_DISK):内存+磁盘,超出内存则存硬盘。表示将RDD作为反序列化的 对象存储于JVM中,如果内存不足,超出的分区将会被存储在硬盘上。
这两种,默认参数是persist(MEMORY_ONLY):仅内存,超出内存则覆盖(LRU原则),因为效率第一,另一种超出就放在硬盘上不但会影响效率,还会造成资源浪费(尤其数据量巨大的时候)。
上面的例子,增加持久化缓存语句:
from pyspark import SparkConf, SparkContext from pyspark.storagelevel import StorageLevel # 创建SparkConf对象,并给对象赋值 conf = SparkConf().setMaster("local").setAppName("My app") # 创建SparkContext对象,不妨命名为sc sc = SparkContext(conf=conf) """ spark创建的sc,其功能之一是调用自带的parallelize()函数来加载自定义的变量来创建RDD,如下面的 sc.parallelize: (sc还有如加载文件textFile()等其他很多函数和功能) """ li = ["hadoop", "spark", "hive"] rdd = sc.parallelize(li) # 以仅内存方式标记RDD。将名为rdd的这个RDD对象标记持久化缓存 rdd.persist() # 默认MEMORY_ONLY。--仅内存,超出内存则覆盖(LRU原则)方式。 # 等价 rdd.persist(storageLevel=StorageLevel.MEMORY_ONLY) # rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK) # --内存+磁盘,超出内存则存硬盘方式。 # 第一次行动操作,触发一次真正的从头到尾的计算,这时上面的rdd.persist()才会执行,把这个rdd放到缓存中。 print(rdd.count()) # 第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd print(','.join(rdd.collect())) # 解除标记。释放名为rdd的RDD对象在内存中的缓存空间 rdd.unpersist()
注:持久化RDD会占用内存空间,当不再需要一个RDD时,就可以使用unpersist()函数手动地把持久化的RDD从缓存中移除,释放内存空间。
注意,上面标记为仅内存执行rdd.persist() 或 rdd.persist(storageLevel=StorageLevel.MEMORY_ONLY) 后,要想重新标记为内存+磁盘执行 rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK) ,需要先执行rdd.unpersist()释放标记!!!否则报错!
(3)实际开发中,持久化(缓存)写法:
实际开发中,我们使用cache()方法就会自动调用persist(MEMORY_ONLY),我们一般用rdd.cache()或rdd.persist()即可,不用再导包from pyspark.storagelevel import StorageLevel
来传参,通过查看cache()和persist()源码,可以看到这两个方法会自动导入包。
重点!!RDD持久化 实际开发代码,一般写法如下:
from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("My app") sc = SparkContext(conf=conf) li = ["hadoop", "spark", "hive"] rdd = sc.parallelize(li) # 以仅内存方式标记RDD。将名为rdd的这个RDD对象标记持久化缓存 rdd.cache() # 会调用persist(MEMORY_ONLY) # 或 rdd.persist() # 默认MEMORY_ONLY。--仅内存,超出内存则覆盖(LRU原则)方式。 # 第一次行动操作,触发一次真正的从头到尾的计算,这时上面的rdd.persist()才会执行,把这个rdd放到缓存中。 print(rdd.count()) # 第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd print(','.join(rdd.collect())) # 解除标记。释放名为rdd的RDD对象在内存中的缓存空间 rdd.unpersist()
附:cache()和persist()函数的源码。在Anaconda的site-packages/pyspark/rdd.py文件:
这篇关于大数据:Spark实战经验总结(python版)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-14获取参数学习:Python编程入门教程
- 2024-11-14Python编程基础入门
- 2024-11-14Python编程入门指南
- 2024-11-13Python基础教程
- 2024-11-12Python编程基础指南
- 2024-11-12Python基础编程教程
- 2024-11-08Python编程基础与实践示例
- 2024-11-07Python编程基础指南
- 2024-11-06Python编程基础入门指南
- 2024-11-06怎么使用python 计算两个GPS的距离功能-icode9专业技术文章分享