sparkstreaming转换算子--窗口函数
2022/9/2 23:22:54
本文主要是介绍sparkstreaming转换算子--窗口函数,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
window
- 画图理解
- 说明
countByWindow 对每个滑动窗口的数据执行count操作
reduceByWindow 对每个滑动窗口的数据执行reduce操作
reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作
countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作
都需要传入两个核心参数
windowDuration: Duration, 窗口时间长度--一般是batchSize(批次时间)的整数倍
slideDuration: Duration: 滑动时间长度----一般是batchSize(批次时间)的整数倍 - 案例
package SparkStreaming.trans import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream /** * 滑动窗口算子 * 每隔一段时间,对原始DStream中多个批次数据整合 成为新的DStream中一个批次数据 * * Spark Streaming中,一个批次执行一次,不会积攒当前批次的数据。滑动窗口算子可以实现将多个批次数据积攒下来,然后再去做统一的运算 * 窗口算子最为基础核心的算子 window 会给我们返回一个新的DStream,但是这个DStream包含多个未被处理的批次数据 * 窗口函数中需要传递核心参数 * windowDuration: Duration, 窗口时间长度--一般是batchSize(批次时间)的整数倍 * slideDuration: Duration: 滑动时间长度----一般是batchSize(批次时间)的整数倍 */object ByWindow { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[3]").setAppName("transform3") val ssc = new StreamingContext(conf, Milliseconds(10000)) val ds: DStream[String] = ssc.socketTextStream("node1", 44444, StorageLevel.MEMORY_ONLY) // 窗口长度 30s 滑动间隔 10s 每个10s时间将DStream中前30秒的数据 整合为一个批次数据处理 val ds1 = ds.window(Seconds(10), Seconds(10)) ds1.print() ssc.start() ssc.awaitTermination() } }
package SparkStreaming.trans import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Milliseconds, Minutes, Seconds, StreamingContext} /** * 使用window窗口函数算子 严格意义上只负责去对原始DStream进行窗口检测,形成窗口批次数据的DStream,如果我们要对窗口批次数据 * 进行处理的话,还得需要对窗口批次数据的DStream使用转换算子和行动算子计算逻辑 * * windows函数也有一些变种的窗口函数算子:既可以实现窗口批次数据的检测,也可以实现一些相关的计算功能 * countByWindow 对每个滑动窗口的数据执行count操作 * reduceByWindow 对每个滑动窗口的数据执行reduce操作 * reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作 * countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作 */ object ByWindow2 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("state02").setMaster("local[2]") val ssc = new StreamingContext(sparkConf,Milliseconds(10000)) ssc.checkpoint("hdfs://node1:9000/sparkstreaming") val ds:DStream[String] = ssc.socketTextStream("node1", 44444) val ds1 = ds.map((_, 1)) val ds2 = ds1.reduceByKeyAndWindow((a: Int, b: Int)=>(a+b), Seconds(10), Seconds(10)) ds2.print() println(",,,,,,,,,,") val ds3: DStream[Long] = ds1.countByWindow(Seconds(10), Seconds(10)) ds3.print() println(",,,,,,,,,,") val ds4 = ds1.reduceByWindow((a, b) => (a._1+b._1, 0), Seconds(10), Seconds(10)) ds4.print() println(",,,,,,,,,,") // 需要设置检查点 val ds5 = ds1.countByValueAndWindow(Seconds(10), Seconds(10)) ds5.print() ssc.start() ssc.awaitTermination() } }
应用场景:黑名单
package SparkStreaming.trans import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Milliseconds, Minutes, Seconds, StreamingContext} /** * 黑名单统计 * 有市场就有竞争,有竞争就少不来邪门外道 * A厂家投放了广告,广告每点击一次都是有记录的,但是不排初竞争对手的恶意点击 * * 实时统计黑名单用户 * 网站每隔3秒记录一批次用户的点击行为,记录的时候,认定如果在1分钟之内 用户点击次数超过10次 认定这个用户是一个黑名单用户 * 需要把用户IP封掉 * * Spark Streaming去连接端口数据源: * 端口模拟用户的点击行为 发送数字 数字就代表某一个用户id */ object BlackUser { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("state01").setMaster("local[2]") val ssc = new StreamingContext(sparkConf,Milliseconds(3000)) val ds:DStream[String] = ssc.socketTextStream("node1", 44444) val ds1:DStream[String] = ds.window(Minutes(1),Minutes(1)) val ds2:DStream[(String,Int)] = ds1.map((_, 1)).reduceByKey(_ + _) //保留黑名单用户 val ds3:DStream[(String,Int)] = ds2.filter(tuple=>{ if(tuple._2>=10){ true }else{ false } }) ds3.print() ssc.start() ssc.awaitTermination() } }
这篇关于sparkstreaming转换算子--窗口函数的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南