SPARK SQL中自定义udf,udaf函数统计uv(使用bitmap)
2021/12/14 19:20:18
本文主要是介绍SPARK SQL中自定义udf,udaf函数统计uv(使用bitmap),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
在实际工作中统计uv时,一般会使用count(distinct userId)的方式去统计人数,但这样效率不高,假设你是统计多个维度的数据,当某天你想要上卷维度,此时又需要从原始层开始统计,如果数据量大的时候将会耗费很多时间,此时便可以使用最细粒度的聚合结果进行上卷统计,即需要自定义聚合函数进行统计,将bitmap序列化为一个字节数组。
1)一次聚合
package org.shydow.UDF import org.apache.spark.sql.{Encoder, Encoders} import org.apache.spark.sql.expressions.Aggregator import org.roaringbitmap.RoaringBitmap /** * @author shydow * @date 2021/12/13 22:55 */ class BitmapGenUDAF extends Aggregator[Int, Array[Byte], Array[Byte]] { override def zero: Array[Byte] = { // 构造一个空的bitmap val bm: RoaringBitmap = RoaringBitmap.bitmapOf() // 将bitmap序列化为字节数组 BitmapUtil.serBitmap(bm) } override def reduce(b: Array[Byte], a: Int): Array[Byte] = { // 将buff反序列化为bitmap val bitmap: RoaringBitmap = BitmapUtil.deSerBitmap(b) bitmap.add(a) BitmapUtil.serBitmap(bitmap) } override def merge(b1: Array[Byte], b2: Array[Byte]): Array[Byte] = { val bitmap1: RoaringBitmap = BitmapUtil.deSerBitmap(b1) val bitmap2: RoaringBitmap = BitmapUtil.deSerBitmap(b2) bitmap1.or(bitmap2) BitmapUtil.serBitmap(bitmap1) } override def finish(reduction: Array[Byte]): Array[Byte] = reduction override def bufferEncoder: Encoder[Array[Byte]] = Encoders.BINARY override def outputEncoder: Encoder[Array[Byte]] = Encoders.BINARY }
package org.shydow.UDF import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} import org.roaringbitmap.RoaringBitmap /** * @author shydow * @date 2021/12/13 22:45 */ object BitmapUtil { /** * 序列化bitmap */ def serBitmap(bm: RoaringBitmap): Array[Byte] = { val stream = new ByteArrayOutputStream() val dataOutput = new DataOutputStream(stream) bm.serialize(dataOutput) stream.toByteArray } /** * 反序列bitmap */ def deSerBitmap(bytes: Array[Byte]): RoaringBitmap = { val bm: RoaringBitmap = RoaringBitmap.bitmapOf() val stream = new ByteArrayInputStream(bytes) val inputStream = new DataInputStream(stream) bm.deserialize(inputStream) bm } }
package org.shydow.UDF import org.apache.spark.sql.{DataFrame, SparkSession, TypedColumn} import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} import org.roaringbitmap.RoaringBitmap /** * @author shydow * @date 2021/12/13 22:25 */ object TestBehaviorAnalysis { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("analysis") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val schema = StructType(Seq( StructField("id", LongType), StructField("eventType", StringType), StructField("code", StringType), StructField("timestamp", LongType)) ) val frame: DataFrame = spark.read.schema(schema).csv("data/OrderLog.csv") frame.createOrReplaceTempView("order_log") /** * 使用distinct count 计算uv */ spark.sql( s""" |select | eventType, | count(1) as pv, | count(distinct id) as uv |from order_log |group by eventType |""".stripMargin).show() /** * 自定义UDAF计算uv */ import org.apache.spark.sql.functions.udaf spark.udf.register("gen_bitmap", udaf(new BitmapGenUDAF)) // 这个函数出来的是字节数组,如果要计算具体的基数得写一个udf def card(byteArray: Array[Byte]): Int = { val bitmap: RoaringBitmap = BitmapUtil.deSerBitmap(byteArray) bitmap.getCardinality } spark.udf.register("get_card", card _) spark.sql( s""" |select | eventType, | count(1) as pv, | gen_bitmap(id) as uv_arr, | get_card(gen_bitmap(id)) as uv |from order_log |group by eventType |""".stripMargin).show() spark.close() } }
这篇关于SPARK SQL中自定义udf,udaf函数统计uv(使用bitmap)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-15鸿蒙生态设备数量超8亿台
- 2024-05-13TiDB + ES:转转业财系统亿级数据存储优化实践
- 2024-05-09“2024鸿蒙零基础快速实战-仿抖音App开发(ArkTS版)”实战课程已上线
- 2024-05-09聊聊如何通过arthas-tunnel-server来远程管理所有需要arthas监控的应用
- 2024-05-09log4j2这么配就对了
- 2024-05-09nginx修改Content-Type
- 2024-05-09Redis多数据源,看这篇就够了
- 2024-05-09Google Chrome驱动程序 124.0.6367.62(正式版本)去哪下载?
- 2024-05-09有没有大佬知道这种数据应该怎么抓取呀?
- 2024-05-09这种运行结果里的10.100000001,怎么能最快改成10.1?