flink sql 知其所以然(三)| 自定义 redis 数据汇表(附源码)
2021/8/22 15:06:07
本文主要是介绍flink sql 知其所以然(三)| 自定义 redis 数据汇表(附源码),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
感谢您的关注 + 点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!!
1.序篇-本文结构
-
背景篇-为啥需要 redis 数据汇表
-
目标篇-redis 数据汇表预期效果
-
难点剖析篇-此框架建设的难点、目前有哪些实现
-
维表实现篇-实现的过程
-
总结与展望篇
本文主要介绍了 flink sql redis 数据汇表的实现过程。
大数据羊说
用数据提升美好事物发生的概率~
27篇原创内容
公众号
如果想在本地测试下:
- 在公众号后台回复
-
flink sql 知其所以然(三)| sql 自定义 redis 数据汇表获取源码(源码基于 1.13.1 实现)
-
flink sql 知其所以然(三)| sql 自定义 redis 数据汇表获取源码(源码基于 1.13.1 实现)
-
flink sql 知其所以然(三)| sql 自定义 redis 数据汇表获取源码(源码基于 1.13.1 实现)
-
在你的本地安装并启动 redis-server。
-
执行源码包中的
flink.examples.sql._03.source_sink.RedisSinkTest
测试类,然后使用 redis-cli 执行get a
就可以看到结果了(目前只支持 kv,即 redisset key value
)。
如果想直接在集群环境使用:
-
命令行执行
mvn package -DskipTests=true
打包 -
将生成的包
flink-examples-0.0.1-SNAPSHOT.jar
引入 flink lib 中即可,无需其它设置。
2.背景篇-为啥需要 redis 数据汇表
目前在实时计算的场景中,熟悉 datastream 的同学在很多场景下都会将结果数据写入到 redis 提供数据服务。
举个例子:
-
外存状态引擎:需要把历史所有的 id 存储下来,但是因为 id 会不断增多,仅仅使用 flink 内部状态引擎的话,状态会越来越大,很难去保障其稳定性。那么这时就会选择外部状态引擎,比如 redis。在我们使用 redis 存储所有设备 id 时,除了使用 redis 作为维表去访问 id 是否出现过,还需要将新增的 id 写入到 redis 中以供后续的去重。这时候就需要使用到 redis sink 表。
-
数据服务引擎:在某些大促(双十一)的场景下需要将 flink 计算好的结果直接写入到 redis 中以提供高速数据服务引擎,直接提供给大屏查询使用。
而官方是没有提供 flink sql api 的 redis sink connector 的。如下图,基于 1.13 版本。
ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/
1
阿里云 flink 是提供了这个能力的。
www.alibabacloud.com/help/zh/faq-detail/118038.htm?spm=a2c63.q38357.a3.16.48fa711fo1gVUd
2
因此本文在介绍怎样自定义一个 sql 数据汇表的同时,实现一个 sql redis sink connector 来给大家使用。
3.目标篇-redis 数据汇表预期效果
redis 作为数据汇表在 datastream 中的最常用的数据结构有很多,基本上所有的数据结构都有可能使用到。本文实现主要实现 kv 结构,其他结构大家可以拿到源码之后进行自定义实现。也就多加几行代码就完事了。
预期效果就如阿里云的 flink redis,redis set key value
的预期 flink sql:
CREATE TABLE redis_sink_table ( key STRING, -- redis key,第 1 列为 key `value` STRING -- redis value,第 2 列为 value ) WITH ( 'connector' = 'redis', -- 指定 connector 是 redis 类型的 'hostname' = '127.0.0.1', -- redis server ip 'port' = '6379', -- redis server 端口 'write.mode' = 'string' -- 指定使用 redis `set key value` )
INSERT INTO redis_sink_table
SELECT o.f0 as key, o.f1 as value
FROM leftTable AS o
下面是我在本地跑的结果: ![图片](//img1.sycdn.imooc.com/611a923d0001c22510800591.jpg) 3 首先看下我们的测试输入,`f0` 恒定为 `a`,`f1` 恒定为 `b`,并且每 10ms 写入一次: ![图片](//img1.sycdn.imooc.com/611a923d000192ca10800633.jpg) 4 预期结果是 key 为 `a`,value 会为 `b`,实际结果也相同,使用 redis-cli 查询下,我删除掉也能在 10ms 后写入,所以查询时可以一直查得到: ![图片](//img1.sycdn.imooc.com/611a923e0001cddd02620254.jpg) 5 4.难点剖析篇-目前有哪些实现 =============== 目前可以从网上搜到的实现、以及可以参考的实现有以下两个: 1. https://github.com/jeff-zou/flink-connector-redis。但使用起来有比较多的限制,包括需要在建表时就指定 key-column,value-column 等,其实博主觉得没必要指定这些字段,这些都可以动态调整。其实现是对 apache-bahir-flink https://github.com/apache/bahir-flink 的二次开发,但与 bahir 原生实现有割裂感,因为这个项目几乎参考 bahir redis connector 重新实现了一遍,接口与 bahir 不太相同。 2. 阿里云实现 https://www.alibabacloud.com/help/zh/faq-detail/122722.htm?spm=a2c63.q38357.a3.7.a1227a53TBMuSY。阿里云的实现相对比较动态化,不需要在建表时就指定 hmap 等数据结构的 map key。 因此博主在实现时,定了一个基调。 1. **参考阿里云的 DDL 实现** 2. **高度复用性**:复用 bahir 提供的 redis connnector 3. **简洁性**:目前只实现 kv 结构,后续扩展可以给用户自己实现,扩展其实是非常简单的 5.实现篇-实现的过程 =========== 在实现 redis 数据汇表之前,不得不谈谈 flink 数据汇表加载和使用机制。 5.1.flink 数据汇表原理 ---------------- 其实上节已经详细描述了 flink sql 对于 source\\sink 的加载机制。 [ ![图片](//img1.sycdn.imooc.com/611a923f0001e65e08000444.jpg) flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码) ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488635&idx=1&sn=41817a078ef456fb036e94072b2383ff&chksm=c1549883f623119559c47047c6d2a9540531e0e6f0b58b155ef9da17e37e32a9c486fe50f8e3&scene=21#wechat_redirect) 1. 通过 SPI 机制加载所有的 source\\sink\\format 工厂 `Factory` 2. 过滤出 DynamicTableSinkFactory + connector 标识的 sink 工厂类 3. 通过 sink 工厂类创建出对应的 sink ![图片](//img1.sycdn.imooc.com/611a923f00016de110800651.jpg) 7 ![图片](//img1.sycdn.imooc.com/611a923f00010ff010800740.jpg) 8 如图 source 和 sink 是通过 `FactoryUtil.createTableSource` 和 `FactoryUtil.createTableSink` 创建的 ![图片](//img1.sycdn.imooc.com/611a9240000136ed10800602.jpg) 16 所有通过 SPI 的 source\\sink\\formt 插件都继承自 `Factory`。 整体创建 sink 方法的调用链如下图。 ![图片](//img1.sycdn.imooc.com/611a9240000151b710800704.jpg) 10 5.2.flink 数据汇表实现方案 ------------------ 先看下博主的最终实现。 由于高度复用了 bahir redis connector,所以需要重点实现就只有两个类: 1. `RedisDynamicTableFactory` 2. `RedisDynamicTableSink` ![图片](//img1.sycdn.imooc.com/611a924100016d1310800654.jpg) 6 具体流程: 1. 定义 SPI 的工厂类 `RedisDynamicTableFactory implements DynamicTableSinkFactory`,并且在 resource\\META-INF 下创建 SPI 的插件文件 2. 实现 factoryIdentifier 标识 `redis` 3. 实现 `RedisDynamicTableFactory#createDynamicTableSink` 来创建对应的 source `RedisDynamicTableSink` 4. 定义 `RedisDynamicTableSink implements DynamicTableSink` 5. 实现 `RedisDynamicTableFactory#getSinkRuntimeProvider` 方法,创建具体的维表 UDF `RichSinkFunction<T>`,这里直接服用了 bahir redis 中的 `RedisSink<IN>` 介绍完流程,进入具体实现方案细节: `RedisDynamicTableFactory` 主要创建 sink 的逻辑:
public class RedisDynamicTableFactory implements DynamicTableSinkFactory {
…
@Override public String factoryIdentifier() { // 标识 redis return "redis"; } @Override public DynamicTableSink createDynamicTableSink(Context context) { // either implement your custom validation logic here ... // or use the provided helper utility final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); // validate all options // 所有 option 配置的校验,比如 write.mode 类参数 helper.validate(); // get the validated options final ReadableConfig options = helper.getOptions(); final RedisWriteOptions redisWriteOptions = RedisOptions.getRedisWriteOptions(options); TableSchema schema = context.getCatalogTable().getSchema(); / 创建 RedisDynamicTableSink return new RedisDynamicTableSink(schema.toPhysicalRowDataType() , redisWriteOptions); }
}
resources\\META-INF 文件: ![图片](//img1.sycdn.imooc.com/611a9241000108a510800458.jpg) 11 `RedisDynamicTableSource` 主要创建 table udf 的逻辑:
public class RedisDynamicTableSink implements DynamicTableSink {
…
@Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { // 初始化 redis 客户端配置 FlinkJedisConfigBase flinkJedisConfigBase = new FlinkJedisPoolConfig.Builder() .setHost(this.redisWriteOptions.getHostname()) .setPort(this.redisWriteOptions.getPort()) .build(); RedisMapper<RowData> redisMapper = null; switch (this.redisWriteOptions.getWriteMode()) { case "string": // redis key,value 序列化器 // 从 RowData 转换成 redis 的 key value redisMapper = new SetRedisMapper(); break; default: throw new RuntimeException("其他类型 write mode 请自定义实现"); } // 创建 SinkFunction,注意!!!这里直接复用了 bahir 的实现 return SinkFunctionProvider.of(new RedisSink<>( flinkJedisConfigBase , redisMapper)); }
}
`RedisSink` 执行写入 redis 的主要流程,这里是 bahir 的实现:
public class RedisRowDataLookupFunction extends TableFunction {
…
@Override public void invoke(IN input) throws Exception { String key = redisSinkMapper.getKeyFromData(input); String value = redisSinkMapper.getValueFromData(input); // 根据具体的命令执行具体写入 redis 的命令 switch (redisCommand) { case RPUSH: this.redisCommandsContainer.rpush(key, value); break; case LPUSH: this.redisCommandsContainer.lpush(key, value); break; case SADD: this.redisCommandsContainer.sadd(key, value); break; case SET: this.redisCommandsContainer.set(key, value); break; case PFADD: this.redisCommandsContainer.pfadd(key, value); break; case PUBLISH: this.redisCommandsContainer.publish(key, value); break; case ZADD: this.redisCommandsContainer.zadd(this.additionalKey, value, key); break; case ZREM: this.redisCommandsContainer.zrem(this.additionalKey, key); break; case HSET: this.redisCommandsContainer.hset(this.additionalKey, key, value); break; default: throw new IllegalArgumentException("Cannot process such data type: " + redisCommand); } } @Override public void open(Configuration parameters) throws Exception { try { // 初始化 redis 执行器 this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); this.redisCommandsContainer.open(); } catch (Exception e) { LOG.error("Redis has not been properly initialized: ", e); throw e; } }
}
### 5.2.1.复用 bahir connector 如图是 bahir redis connector 的实现。 ![图片](//img1.sycdn.imooc.com/611a9242000118f510800846.jpg) 15 博主在实现过程中将能复用的都尽力复用。如图是最终实现目录。 ![图片](//img1.sycdn.imooc.com/611a9242000182ce10300542.jpg) 12 可以看到实现非常简单。 其中 `redis 客户端及其配置`、`redis 命令执行器` 和 `redis 命令定义器` 是直接复用了 bahir redis 的。如果你想要在生产环境中进行使用,可以直接将两部分代码合并,成本很低。 源码公众号后台回复**flink sql 知其所以然(三)| sql 自定义 redis 数据汇表**获取。 6.总结与展望篇 ======== 6.1.总结 ------ 本文主要是针对 flink sql redis 数据汇表进行了扩展以及实现,并且复用 bahir redis connector 的配置,具有良好的扩展性。如果你正好需要这么一个 connector,直接公众号后台回复**flink sql 知其所以然(三)| sql 自定义 redis 数据汇表**获取源码吧。 6.2.展望 ------ 当然上述只是 redis 数据汇表一个基础的实现,用于生产环境还有很多方面可以去扩展的。 1. jedis cluster 的扩展:目前 bahir datastream 中已经实现了,可以直接参考,扩展起来非常简单 2. 异常 AOP,alert 等
这篇关于flink sql 知其所以然(三)| 自定义 redis 数据汇表(附源码)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-08阿里云Redis项目实战入门教程
- 2024-11-08阿里云Redis资料:新手入门与初级使用指南
- 2024-11-08阿里云Redis教程:新手入门及实用指南
- 2024-11-07阿里云Redis学习入门:新手必读指南
- 2024-11-07阿里云Redis学习入门:从零开始的操作指南
- 2024-11-07阿里云Redis学习:初学者指南
- 2024-11-06阿里云Redis入门教程:轻松搭建与使用指南
- 2024-11-02Redis项目实战:新手入门教程
- 2024-10-22Redis入门教程:轻松掌握数据存储与操作
- 2024-10-22Redis缓存入门教程:快速掌握Redis缓存基础知识