Flink去重统计-基于自定义布隆过滤器
2021/5/20 10:55:59
本文主要是介绍Flink去重统计-基于自定义布隆过滤器,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一、背景说明
在Flink中对流数据进行去重计算是常有操作,如流量域对独立访客之类的统计,去重思路一般有三个:
- 基于Hashset来实现去重
数据存在内存,容量小,服务重启会丢失。 - 使用状态编程ValueState/MapState实现去重
常用方式,可以使用内存/文件系统/RocksDB作为状态后端存储。 - 结合Redis使用布隆过滤器实现去重
适用对上亿数据量进行去重实现,占用资源少效率高,有小概率误判。
这里以自定义布隆过滤器的方式,实现Flink窗口计算中独立访客的统计,数据集样例如下:
二、布隆过滤器部分说明
布隆过滤器简单点说就是哈希算法+bitmap,如上图,对字符串结合多种哈希算法,基于bitmap作为存储,由于只用0/1存储,所以可以大量节省存储空间,也就特别适合在上百亿数据里面做去重这种动作。在后续要进行字符串查找时,对要查找的字符串同样计算这多个哈希算法,根据在bitmap上的位置,可以确认该字符串一定不在或者极大概率在(由于哈希冲突问题会有极小概率误判)。
引申一下,如上所述,能对哈希冲突进行更好的优化,便能更好解决误判问题,当然也不能无限的增加多种哈希算法的策略,会相应带来计算效率的下降。
在本次开发中,使用自定义的布隆过滤器,其中对哈希算法部分做了几点优化:
- 结合Redis使用,Redis原生支持bitmap
- 对bitmap容量扩容,一般为数据的3-10倍,这里使用2^30,使用2的整数幂,能让后续查找输出使用位与运算,实现比取模查找更高的效率。
myBloomFilter = new MyBloomFilter(1 << 30);
- 优化哈希算法,这里对要查找的id转为char类型,并行单个剔除后基于Unicode编码乘以质数31再相加,来避免不同字符串计算出同样哈希值的问题。
for (char c : value.toCharArray()){ result += result * 31 + c; }
另外,谷歌提供的工具Guava也包含了布隆过滤器,加入相关依赖即可使用,主要参数如下源码,输入要建立的过滤器容器大小及误判概率即可。
public static <T> BloomFilter<T> create(Funnel<? super T> funnel, int expectedInsertions, double fpp) { return create(funnel, (long)expectedInsertions, fpp); }
三、代码部分
package com.test.UVbloomfilter; import bean.UserBehavior; import bean.UserVisitorCount; import java.sql.Timestamp; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import redis.clients.jedis.Jedis; public class UserVisitorTest { public static void main(String[] args) throws Exception { //建立环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.setParallelism(1); //指定时间语义 WatermarkStrategy<UserBehavior> wms = WatermarkStrategy .<UserBehavior>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() { @Override public long extractTimestamp(UserBehavior element, long recordTimestamp) { return element.getTimestamp() * 1000L; } }); //读取数据、映射、过滤 SingleOutputStreamOperator<UserBehavior> userBehaviorDS = env .readTextFile("input/UserBehavior.csv") .map(new MapFunction<String, UserBehavior>() { @Override public UserBehavior map(String value) throws Exception { String[] split = value.split(","); return new UserBehavior(Long.parseLong(split[0]) , Long.parseLong(split[1]) , Integer.parseInt(split[2]) , split[3] , Long.parseLong(split[4])); } }) //.filter(data -> "pv".equals(data.getBehavior())) //lambda表达式写法 .filter(new FilterFunction<UserBehavior>() { @Override public boolean filter(UserBehavior value) throws Exception { if (value.getBehavior().equals("pv")) { return true; }return false; }}) .assignTimestampsAndWatermarks(wms); //去重按全局去重,故使用行为分组,仅为后续开窗使用、开窗 WindowedStream<UserBehavior, String, TimeWindow> windowDS = userBehaviorDS.keyBy(UserBehavior::getBehavior) .window(TumblingEventTimeWindows.of(Time.hours(1))); SingleOutputStreamOperator<UserVisitorCount> processDS = windowDS .trigger(new MyTrigger()).process(new UserVisitorWindowFunc()); processDS.print(); env.execute(); } //自定义触发器:来一条计算一条(访问Redis一次) private static class MyTrigger extends Trigger<UserBehavior, TimeWindow> { @Override public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.FIRE_AND_PURGE; //触发计算和清除窗口元素。 } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { } } private static class UserVisitorWindowFunc extends ProcessWindowFunction<UserBehavior,UserVisitorCount,String,TimeWindow> { //声明Redis连接 private Jedis jedis; //声明布隆过滤器 private MyBloomFilter myBloomFilter; //声明每个窗口总人数的key private String hourUVCountKey; @Override public void open(Configuration parameters) throws Exception { jedis = new Jedis("hadoop102",6379); hourUVCountKey = "HourUV"; myBloomFilter = new MyBloomFilter(1 << 30); //2^30 } @Override public void process(String s, Context context, java.lang.Iterable<UserBehavior> elements, Collector<UserVisitorCount> out) throws Exception { //1.取出数据 UserBehavior userBehavior = elements.iterator().next(); //2.提取窗口信息 String windowEnd = new Timestamp(context.window().getEnd()).toString(); //3.定义当前窗口的BitMap Key String bitMapKey = "BitMap_" + windowEnd; //4.查询当前的UID是否已经存在于当前的bitMap中 long offset = myBloomFilter.getOffset(userBehavior.getUserId().toString()); Boolean exists = jedis.getbit(bitMapKey, offset); //5.根据数据是否存在做下一步操作 if (!exists){ //将对应offset位置改为1 jedis.setbit(bitMapKey,offset,true); //累加当前窗口的综合 jedis.hincrBy(hourUVCountKey,windowEnd,1); } //输出数据 String hget = jedis.hget(hourUVCountKey, windowEnd); out.collect(new UserVisitorCount("UV",windowEnd,Integer.parseInt(hget))); } } private static class MyBloomFilter { //减少哈希冲突优化1:增加过滤器容量为数据3-10倍 //定义布隆过滤器容量,最好传入2的整次幂数据 private long cap; public MyBloomFilter(long cap) { this.cap = cap; } //传入一个字符串,获取在BitMap中的位置 public long getOffset(String value){ long result = 0L; //减少哈希冲突优化2:优化哈希算法 //对字符串每个字符的Unicode编码乘以一个质数31再相加 for (char c : value.toCharArray()){ result += result * 31 + c; } //取模,使用位与运算代替取模效率更高 return result & (cap - 1); }}}
输出结果在Redis查看如下:
学习交流,有任何问题还请随时评论指出交流。
这篇关于Flink去重统计-基于自定义布隆过滤器的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-05小米13T Pro系统合集:性能与摄影的极致融合,值得你升级的系统ROM
- 2024-10-01基于Python+Vue开发的医院门诊预约挂号系统
- 2024-10-01基于Python+Vue开发的旅游景区管理系统
- 2024-10-01RestfulAPI入门指南:打造简单易懂的API接口
- 2024-10-01初学者指南:了解和使用Server Action
- 2024-10-01Server Component入门指南:搭建与配置详解
- 2024-10-01React 中使用 useRequest 实现数据请求
- 2024-10-01使用 golang 将ETH账户的资产平均分散到其他账户
- 2024-10-01JWT用户校验课程:从入门到实践
- 2024-10-01Server Component课程入门指南