大数据:Spark实战经验总结(python版)

2022/3/1 1:22:24

本文主要是介绍大数据:Spark实战经验总结(python版),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

人工智能

  • 大数据,Spark,Hadoop,python,pyspark
  • 大数据:Spark实战经验总结
    • 1. RDD持久化
      • 1)RDD的惰性机制:
      • 2)RDD持久化 --- (解决惰性机制的效率问题):
        • (1)效率低的背景:
        • (2)增加持久化(缓存):
        • (3)实际开发中,持久化(缓存)写法:

大数据,Spark,Hadoop,python,pyspark

大数据:Spark实战经验总结

1. RDD持久化

说RDD持久化之前,先来了解一下惰性机制。

1)RDD的惰性机制:

RDD在设计时采用了惰性机制的特性,指的是转换RDD的过程先记录而不发生真正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算。举例说明:
假设/mnt/下又一个文件word.txt,内容如下:

Hadoop is good
Spark is fast
Spark is better

代码:

from pyspark import SparkConf, SparkContext


conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)

lines = sc.textFile("file:///mnt/word.txt")   # 记录,并不执行
lineLengths = lines.map(lambda s:len(s))     # 记录,并不执行
totalLength = lineLengths.reduce(lambda a, b: a + b)  # 开始执行!

为了看着更清晰,代码不妨写成:

# 记录,并不执行。
rdd1 = sc.textFile("file:///mnt/word.txt")   # 是一个RDD对象。
# 记录,并不执行。
rdd2 = rdd1.map(lambda s:len(s))     # 是一个RDD对象。
# 开始执行! 
totalLength = rdd2.reduce(lambda a, b: a+b)  # 是个数字。


"""
打印验证
"""
print(rdd1)  # file:///mnt/word.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0
print(rdd2)   # PythonRDD[5] at RDD at PythonRDD.scala:53
print(totalLength)   # 42

# 调用RDD自带的函数,来取出rdd1和rdd2对象中的值
rdd1.foreach(print)  # Spark is better  Hadoop is good  Spark is fast
rdd2.foreach(print)  # 14 13 15

上面代码中,
(i)第1行语句中的textFile()是一个转换操作(函数返回一个RDD对象),系统只会记录这次转换,并不会真正读取word.txt文件的数据到内存中;
(ii)第2行语句的map()也是一个转换操作(函数返回一个RDD对象),系统只是记录这次转换,不会真正执行map()方法;
(iii)而第3行语句的reduce()是一个“行动”类型的操作(函数返回一个整型数字),这时系统会生成一个作业,触发真正的计算。也就是说,这时才会加载word.txt的数据到内存,生成RDD。

2)RDD持久化 — (解决惰性机制的效率问题):

(1)效率低的背景:

在Spark中,RDD采用惰性求值的机制。导致每次遇到“行动”操作,都会从头开始执行计算(即每次调用行动操作,都会触发一次从头开始的计算),这对于迭代计算而言,代价是很大的,影响效率(因为迭代计算经常需要多次重复使用同一组数据)。下面是多次计算同一个RDD的例子:

li = ["Hadoop", "Spark", "Hive"]
rdd = sc.parallelize(li)  # 记录操作。生成一个RDD
print(rdd.count())   # 行动操作,触发一次真正的从头到尾的计算。运行结果:3
print(','.join(rdd.collect()))  # 行动操作,再触发一次真正的从头到尾的计算。运行结果:'Hadoop', 'Spark', 'Hive'
# 注:rrd.collect()是以数组形式返回数据集中的所有元素。结果:['Hadoop', 'Spark', 'Hive']

(2)增加持久化(缓存):

为了避免这种重复计算的开销,可以使用RDD的持久化(缓存),方法是使用persist()函数将一个RDD标记为持久化。注意:之所以“标记为持久化”,是因为出现persist()语句的地方并不会马上计算生成RDD并把它持久化,而是要对等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化。持久化后的RDD将会被保留在计算节点的内存中,被后面的行动操作重复时候。
persist()使用的时候有两种参数供选择:

  • persist(MEMORY_ONLY):仅内存,超出内存则覆盖(LRU原则)。表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。 ---- 默认这种。
  • persist(MEMORY_AND_DISK):内存+磁盘,超出内存则存硬盘。表示将RDD作为反序列化的 对象存储于JVM中,如果内存不足,超出的分区将会被存储在硬盘上。
    这两种,默认参数是persist(MEMORY_ONLY):仅内存,超出内存则覆盖(LRU原则),因为效率第一,另一种超出就放在硬盘上不但会影响效率,还会造成资源浪费(尤其数据量巨大的时候)。

上面的例子,增加持久化缓存语句:

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel


# 创建SparkConf对象,并给对象赋值
conf = SparkConf().setMaster("local").setAppName("My app")
# 创建SparkContext对象,不妨命名为sc
sc = SparkContext(conf=conf)
"""
spark创建的sc,其功能之一是调用自带的parallelize()函数来加载自定义的变量来创建RDD,如下面的 sc.parallelize:
(sc还有如加载文件textFile()等其他很多函数和功能)
"""

li = ["hadoop", "spark", "hive"]
rdd = sc.parallelize(li)

# 以仅内存方式标记RDD。将名为rdd的这个RDD对象标记持久化缓存
rdd.persist() # 默认MEMORY_ONLY。--仅内存,超出内存则覆盖(LRU原则)方式。
# 等价 rdd.persist(storageLevel=StorageLevel.MEMORY_ONLY)
# rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK) # --内存+磁盘,超出内存则存硬盘方式。

# 第一次行动操作,触发一次真正的从头到尾的计算,这时上面的rdd.persist()才会执行,把这个rdd放到缓存中。
print(rdd.count())

# 第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
print(','.join(rdd.collect()))

# 解除标记。释放名为rdd的RDD对象在内存中的缓存空间
rdd.unpersist()

:持久化RDD会占用内存空间,当不再需要一个RDD时,就可以使用unpersist()函数手动地把持久化的RDD从缓存中移除,释放内存空间。
注意,上面标记为仅内存执行rdd.persist() 或 rdd.persist(storageLevel=StorageLevel.MEMORY_ONLY) 后,要想重新标记为内存+磁盘执行 rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK) ,需要先执行rdd.unpersist()释放标记!!!否则报错!

(3)实际开发中,持久化(缓存)写法:

实际开发中,我们使用cache()方法就会自动调用persist(MEMORY_ONLY),我们一般用rdd.cache()或rdd.persist()即可,不用再导包from pyspark.storagelevel import StorageLevel来传参,通过查看cache()和persist()源码,可以看到这两个方法会自动导入包。
重点!!RDD持久化 实际开发代码,一般写法如下:

from pyspark import SparkConf, SparkContext


conf = SparkConf().setMaster("local").setAppName("My app")
sc = SparkContext(conf=conf)

li = ["hadoop", "spark", "hive"]
rdd = sc.parallelize(li)

# 以仅内存方式标记RDD。将名为rdd的这个RDD对象标记持久化缓存
rdd.cache()   # 会调用persist(MEMORY_ONLY)
# 或 rdd.persist() # 默认MEMORY_ONLY。--仅内存,超出内存则覆盖(LRU原则)方式。

# 第一次行动操作,触发一次真正的从头到尾的计算,这时上面的rdd.persist()才会执行,把这个rdd放到缓存中。
print(rdd.count())

# 第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
print(','.join(rdd.collect()))

# 解除标记。释放名为rdd的RDD对象在内存中的缓存空间
rdd.unpersist()

附:cache()和persist()函数的源码。在Anaconda的site-packages/pyspark/rdd.py文件:
在这里插入图片描述在这里插入图片描述



这篇关于大数据:Spark实战经验总结(python版)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程