自定义聚合函数(统计每一个商品的四种行为出现次数)
2022/9/5 23:54:09
本文主要是介绍自定义聚合函数(统计每一个商品的四种行为出现次数),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
要求:统计每一个商品的四种行为出现次数
案例
package SparkSQL.fun.project import org.apache.spark.SparkConf import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** * 统计每一个商品的四种行为出现次数, * 效果:每种商品如果某个行为不存在,那么用0来表示,最后返回如下结果 * * 自定义聚合函数完成--累加类型的聚合函数 * 1、输入的参数是behavior * 2、输出的是一个Map * goodsId,total_times * |72 |[pv -> 2, buy -> 0, cart -> 0, fav -> 0] | * |81 |[pv -> 13, buy -> 1, cart -> 1, fav -> 0]| */ object BehaviorCode1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("project01").setMaster("local[*]") val session = SparkSession.builder().config(sparkConf).getOrCreate() val map = Map("mode" -> "dropMalformed", "inferSchema" -> "true") val frame = session.read.options(map).csv("G:\\shixunworkspace\\sparkcode\\src\\main\\java\\SparkSQL\\fun\\project\\b.csv") val frame1 = frame.toDF("userId", "goodsId", "categoryId", "behavior", "time") frame1.show() import session.implicits._ // 将DataFrame转换成Dataset,一般Dataset中类型是Bean类型 val dataset: Dataset[UserBehaviorBean] = frame1.map((row) => { UserBehaviorBean(row.getAs[Int](0), row.getInt(1), row.getInt(2), row.getString(3), row.getInt(4) ) }) dataset.createTempView("temp") session.udf.register("time", new BehaviorTimesFun) // 当前sql语句的问题:如果某个商品没有某个行为的话,不会记录 val frame2 = session.sql("select goodsId, time(behavior) count from temp group by goodsId") frame2.show(100, false) session.stop() } } class BehaviorTimesFun extends UserDefinedAggregateFunction { override def inputSchema: StructType = { StructType(Array( StructField("input", DataTypes.StringType) )) } override def bufferSchema: StructType = { StructType(Array( StructField("sum", DataTypes.createMapType(DataTypes.StringType, DataTypes.LongType)) )) } override def dataType: DataType = { DataTypes.createMapType(DataTypes.StringType, DataTypes.LongType) } override def deterministic: Boolean = true override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = Map("pv"->0L, "buy"->0L, "cart"->0L, "fav"->0L) } override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val str: String = input.getString(0) // str为行为,如pv, fav, 等 val map = buffer.getMap[String, Long](0) val map1 = map.updated(str, map.getOrElse(str, 0L) + 1L) buffer(0) = map1 } override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { val map = buffer2.getMap[String, Long](0) for (elem <- map) { val map1 = buffer1.getMap[String, Long](0) val map2 = map1.updated(elem._1, map1.getOrElse(elem._1, 0L) + elem._2) buffer1(0) = map2 } } override def evaluate(buffer: Row): Any = { buffer.getMap[String, Long](0) } }
这篇关于自定义聚合函数(统计每一个商品的四种行为出现次数)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-09“2024鸿蒙零基础快速实战-仿抖音App开发(ArkTS版)”实战课程已上线
- 2024-05-09聊聊如何通过arthas-tunnel-server来远程管理所有需要arthas监控的应用
- 2024-05-09log4j2这么配就对了
- 2024-05-09nginx修改Content-Type
- 2024-05-09Redis多数据源,看这篇就够了
- 2024-05-09Google Chrome驱动程序 124.0.6367.62(正式版本)去哪下载?
- 2024-05-09有没有大佬知道这种数据应该怎么抓取呀?
- 2024-05-09这种运行结果里的10.100000001,怎么能最快改成10.1?
- 2024-05-09企业src漏洞挖掘-有意思的命令执行
- 2024-05-08阿里云域名注册流程,分享给第一次购买域名的新手站长!