PySpark之Structured Streaming基本操作
2021/5/16 10:55:16
本文主要是介绍PySpark之Structured Streaming基本操作,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
PySpark之Structured Streaming基本操作
思想:将实时数据流视为一张正在不断添加的数据的表,可以把流计算等同于在一个静态表上的批处理查询,Spark会在不断添加数据的无界输入表上运行计算,并进行增量查询。
编写Structured Streaming程序的基本步骤包括:
- 导入pyspark模块
- 创建SparkSession对象
- 创建输入数据源
- 定义流计算过程
- 启动流计算并输出结果
两种处理模型:
(1) 微批处理
(2) 持续处理
词频统计
目标:一个包含很多英文语句的数据流远远不断到达,Structured Streaming程序对每行英文语句进行拆分,并统计每个单词出现的频率。
%%writefile structurednetworkwordcount.py import findspark findspark.init() from pyspark.sql import SparkSession from pyspark import SparkConf from pyspark import Row from pyspark.sql.functions import split, length from pyspark.sql.functions import explode import os ROOT = "file://" + os.getcwd() # 创建SparkSession对象 spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate() spark.sparkContext.setLogLevel("WARN") sc = spark.sparkContext # 创建输入数据源 lines = spark.readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load() # 定义流计算过程 words = lines.select(explode(split(lines.value, " ")).alias("word")) word_count = words.filter(length("word") == 5) # 启动流计算并输出结果 query = word_count \ .writeStream \ .outputMode("complete") \ .format("csv") \ .option("path", ROOT + "/data/filesink") \ .option("checkpointLocation", ROOT + "/data/file-sink-cp") \ .trigger(processingTime="8 seconds") \ .start() query.awaitTermination()
Overwriting structurednetworkwordcount.py
文件读写
import os import shutil import random import time from tqdm import tqdm TEST_DATA_TEMP_DIR = "./data/tmp/" TEST_DATA_DIR = "./data/tmp/testdata/" ACTION_DEF = ["login", "logout", "purchase"] DISTRICT_DEF = ["fujian", "beijin", "shanghai", "guangzhou"] JSIN_LINE_PATTERN = '{{"eventTime": {}, "action": {}, "district":{}}}\n' def testSetUp(): """创建临时文件目录""" if os.path.exists(TEST_DATA_DIR): shutil.rmtree(TEST_DATA_DIR, ignore_errors=True) os.mkdir(TEST_DATA_DIR) def testTearDown(): """恢复测试环境""" if os.path.exists(TEST_DATA_DIR): shutil.rmtree(TEST_DATA_DIR, ignore_errors=True) def writeAndMove(filename, data): """生成测试文件""" with open(TEST_DATA_TEMP_DIR + filename, "wt", encoding="utf-8") as f: f.write(data) shutil.move(TEST_DATA_TEMP_DIR + filename, TEST_DATA_DIR + filename) if __name__ == "__main__": testSetUp() for i in tqdm(range(500)): filename = 'e-mall-{}.json'.format(i) content = '' rndcount = list(range(100)) random.shuffle(rndcount) for _ in rndcount: content += JSIN_LINE_PATTERN.format(str(int(time.time())) , random.choice(ACTION_DEF) , random.choice(DISTRICT_DEF) ) writeAndMove(filename, content) time.sleep(1) # testTearDown()
100%|██████████| 500/500 [08:22<00:00, 1.00s/it]
%%writefile spark_ss_filesource.py import findspark findspark.init() from pyspark.sql import SparkSession from pyspark import SparkConf from pyspark import Row from pyspark.sql.functions import split from pyspark.sql.functions import explode from pyspark.sql.types import StructField, StructType from pyspark.sql.types import StringType, TimestampType from pyspark.sql.functions import window, asc if __name__ == "__main__": import os TEST_DATA_DIR_SPARK = "file://" + os.getcwd() + "/data/tmp/testdata/" schema = StructType([StructField("eventTime", TimestampType(), True) , StructField("action", StringType(), True) , StructField("district", StringType(), True) ]) # 创建SparkSession对象 spark = SparkSession.builder.appName("StructuredEmallPurchaseCount.py").getOrCreate() spark.sparkContext.setLogLevel("WARN") sc = spark.sparkContext # 创建输入数据源 lines = spark.readStream \ .format("json") \ .schema(schema) \ .option("maxFilesPerTrigger", 100) \ .load(TEST_DATA_DIR_SPARK) # 定义流计算过程 # 定义窗口:在指定的窗口时间内进行统计 windowDuration = "1 minutes" words = lines.filter(lines.action == "purchase").groupBy("district", window("eventTime", windowDuration)).count().sort(asc("window")) # 启动流计算并输出结果 query = words \ .writeStream \ .outputMode("complete") \ .format("console") \ .option("truncate", "false") \ .trigger(processingTime="10 seconds") \ .start() query.awaitTermination()
Overwriting spark_ss_filesource.py
知识点
explode的用法
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})]) eDF.show()
+---+---------+--------+ | a| intlist|mapfield| +---+---------+--------+ | 1|[1, 2, 3]|{a -> b}| +---+---------+--------+
eDF.select(explode(eDF.intlist).alias("anInt")).show()
+-----+ |anInt| +-----+ | 1| | 2| | 3| +-----+
eDF.filter(eDF.a == 1).show()
+---+---------+--------+ | a| intlist|mapfield| +---+---------+--------+ | 1|[1, 2, 3]|{a -> b}| +---+---------+--------+
参考
Hadoop上传文件报错could only be written to 0 of the 1 minReplication nodes.
这篇关于PySpark之Structured Streaming基本操作的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享