FlinkSQL自定义函数(UDF)维表转换
2021/9/7 19:07:42
本文主要是介绍FlinkSQL自定义函数(UDF)维表转换,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
前言
Table和SQL的关系:SQL是Table的继承封装(这点在Flink的概念有所体现),比如说:StreamTableEnvironment继承自TableEnvironment便可体现出来。故官文中Table的使用均可在SQL中体现出来,比如说自定义函数
,Table API & SQL下的自定义函数中只给出了Table方式的TableEnvironment 创建自定义函数,我们可以修改为ste对象实现在SQL中。
应用场景
利用FlinkSQL进行Redis维表信息转换。redis获取维表信息后存储在函数中。
代码
producer代码引用:FlinkSQL使用DDL创建Kafka生产和消费者其中的生产数据类型由json改为csv(此文中补充有)。
或者使用Table的方式:Flink SQL & Table简单实例
模拟生产数据
生产者DDL:
String ddl = "CREATE TABLE CbryProduce(\n" + "phoneNum STRING,\n" + "rechargeNum STRING,\n" + "provinceCode STRING,\n" + "cityCode STRING,\n" + "rechargeChannelCode STRING\n" + ") WITH(\n" + "'connector.type'='kafka',\n" + "'connector.version'='universal',\n" + "'connector.properties.bootstrap.servers'='KafkaClusterURL:ip:port,ip2:port',\n" + //"'connector.properties.bootstrap.servers'='localhost:9092',\n" + "'connector.topic'='event_topic_1',\n" + "'format.type'='csv',\n" + "'format.field-delimiter'='|'\n" + ")\n" ;
DML:
String insert2 = "insert into CbryProduce(phoneNum,rechargeNum,provinceCode,cityCode,rechargeChannelCode)" + "values('1024','100','051','0750','2')";
生成Redis维表信息
如何生成JedisCluster对象插入数据:
Redis(一) Jedis单机和集群连接
Redis(三)redisTemplate实操和五种基础数据类型
// 模拟数据创建 // Map<String, String> cityDimensionMap = new HashedMap(); // cityDimensionMap.put("0020", "广州"); // cityDimensionMap.put("0750", "深圳"); // // Map<String, String> rechargeChannelsMap = new HashedMap(); // rechargeChannelsMap.put("1", "手机app充值"); // rechargeChannelsMap.put("2", "营业厅充值"); // // jedisCluster.hmset("CityCode", cityDimensionMap); // jedisCluster.hmset("RechargeChannels", rechargeChannelsMap); // System.out.println(jedisCluster.hgetAll("CityCode")); // System.out.println(jedisCluster.hgetAll("RechargeChannels")); // System.out.println(jedisCluster.get("testttt")); //空值返回null
自定义SQL函数
如何使用FlinkSQL:FlinkSQL使用DDL创建Kafka生产和消费者或者使用Table的方式:Flink SQL & Table简单实例
这里在ScalarFunction我们只要通过自定义/重载一个eval方法
即可:
如下:我们对自定义函数传入一个cityNum
返回cityCodeMap对应的值。
// define function logic // 自定义SQL函数 public static class cityCodeTranslateFunction extends ScalarFunction{ Map<String, String> cityCodeMap = jedisCluster.hgetAll("CityCode"); public String eval(String cityNum) { String res = cityCodeMap.get(cityNum); return res == null ? "Error" : res; } }
引入自定义函数
将我们的自定义函数引入SQL的StreamTableEnvironment执行环境中
//StreamTableEnvironment继承自TableEnvironment ste.createTemporarySystemFunction("cityTranslate", cityCodeTranslateFunction.class); ste.createTemporarySystemFunction("rechargeChannelTranslate", rechargeChannelTranslateFunction.class);
执行打印
Table queryTable = ste.sqlQuery("select phoneNum,rechargeNum,cityCode,cityTranslate(cityCode), provinceCode,rechargeChannelCode, rechargeChannelTranslate(rechargeChannelCode)" + " from CbryConsumer"); DataStream<Row> result = ste.toAppendStream(queryTable, Row.class); result.printToErr();
输出结果
1> 1024,100,0750,深圳,051,2,营业厅充值
1> 1024,100,0020,广州,051,1,手机app充值
整体代码
public class UserDefinedFuctions { static JedisCluster jedisCluster; private static GenericObjectPoolConfig getGenericObjectPoolConfig() { GenericObjectPoolConfig genericObjectPool = new GenericObjectPoolConfig(); genericObjectPool.setMaxIdle(10); genericObjectPool.setMaxTotal(100); genericObjectPool.setMinEvictableIdleTimeMillis(30000); // 逐出连接的最小空闲时间 30s genericObjectPool.setSoftMinEvictableIdleTimeMillis(60000); // 空闲逐出时间1分钟 return genericObjectPool; } static { HostAndPort hostAndPort = new HostAndPort("ip", 7000); HostAndPort hostAndPort2 = new HostAndPort("ip", 7001); HostAndPort hostAndPort3 = new HostAndPort("ip", 7000); HostAndPort hostAndPort4 = new HostAndPort("ip", 7001); HostAndPort hostAndPort5 = new HostAndPort("ip", 7000); HostAndPort hostAndPort6 = new HostAndPort("ip", 7001); Set<HostAndPort> hostAndPortSet = new HashSet<>(); hostAndPortSet.add(hostAndPort); hostAndPortSet.add(hostAndPort2); hostAndPortSet.add(hostAndPort3);hostAndPortSet.add(hostAndPort4);hostAndPortSet.add(hostAndPort5);hostAndPortSet.add(hostAndPort6); jedisCluster = new JedisCluster(hostAndPortSet, 6000, 6000, 10, password,UserDefinedFuctions.getGenericObjectPoolConfig()); // 模拟数据创建 // Map<String, String> cityDimensionMap = new HashedMap(); // cityDimensionMap.put("0020", "广州"); // cityDimensionMap.put("0750", "深圳"); // // Map<String, String> rechargeChannelsMap = new HashedMap(); // rechargeChannelsMap.put("1", "手机app充值"); // rechargeChannelsMap.put("2", "营业厅充值"); // // jedisCluster.hmset("CityCode", cityDimensionMap); // jedisCluster.hmset("RechargeChannels", rechargeChannelsMap); // System.out.println(jedisCluster.hgetAll("CityCode")); // System.out.println(jedisCluster.hgetAll("RechargeChannels")); // System.out.println(jedisCluster.get("testttt")); //空值返回null } // define function logic // 自定义SQL函数 public static class cityCodeTranslateFunction extends ScalarFunction{ Map<String, String> cityCodeMap = jedisCluster.hgetAll("CityCode"); public String eval(String cityNum) { String res = cityCodeMap.get(cityNum); return res == null ? "Error" : res; } } public static class rechargeChannelTranslateFunction extends ScalarFunction{ Map<String, String> rechargeChannelsMap = jedisCluster.hgetAll("RechargeChannels"); public String eval(String rechargeChannel) { String res = rechargeChannelsMap.get(rechargeChannel); return res == null ? "Error" : res; } } public static void main(String[] args) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode() // .useOldPlanner() // flink .useBlinkPlanner() // blink .build(); StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings); String ddl = "CREATE TABLE CbryConsumer(\n" + "phoneNum String,\n" + "rechargeNum String,\n" + "provinceCode String,\n" + "cityCode String,\n" + "rechargeChannelCode String\n" + ") WITH(\n" + "'connector.type'='kafka',\n" + "'connector.version'='universal',\n" + "'connector.properties.group.id'='g2_group',\n" + "'connector.properties.bootstrap.servers'='KafkaClusterURL:ip:port,ip2:port',\n" + "'connector.topic'='event_topic_1',\n" + "'connector.startup-mode' = 'latest-offset',\n" + "'format.type'='csv',\n" + "'format.field-delimiter'='|'\n" + ")\n" ; ste.executeSql(ddl); //StreamTableEnvironment继承自TableEnvironment ste.createTemporarySystemFunction("cityTranslate", cityCodeTranslateFunction.class); ste.createTemporarySystemFunction("rechargeChannelTranslate", rechargeChannelTranslateFunction.class); Table queryTable = ste.sqlQuery("select phoneNum,rechargeNum,cityCode,cityTranslate(cityCode), provinceCode,rechargeChannelCode, rechargeChannelTranslate(rechargeChannelCode)" + " from CbryConsumer"); DataStream<Row> result = ste.toAppendStream(queryTable, Row.class); result.printToErr(); try { env.execute(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
带交互的实现
在实际使用下,我们不可能说实现一个函数写一次代码,如何实现”交互“形态的自定义函数呢? 答曰:使用java的多态进行重载构造函数:
// define function logic // 自定义SQL函数 public static class AutoAdaptaMapDefineFunction extends ScalarFunction { Map<String, String> redisMap; public AutoAdaptaMapDefineFunction(String dimensionName) { redisMap = jedisCluster.hgetAll(dimensionName); } public String eval(String dimensionKey) { String res = redisMap.get(dimensionKey); return res == null ? "Error" : res; } } //ste.createTemporarySystemFunction("cityTranslate", new AutoAdaptaMapDefineFunction("CityCodeDimensionMapKey"));
对于交互式会话,还可以在使用或注册函数之前对其进行参数化。在这种情况下,可以将函数实例而不是函数类用作临时函数。
它要求参数是可序列化的
,以便将函数实例传送到集群。
PS:有两种注入函数的方式:一个是传对象,一个是传class对象
void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass); void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance);
这也就给我们提供了自定义类加载器,指定特定class对象进行函数注入Flink作业的可能。
这篇关于FlinkSQL自定义函数(UDF)维表转换的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-16使用vue3+springboot构建简单Web应用教程
- 2024-11-15全栈开发项目实战:从入门到初级项目的实现
- 2024-11-15数据库项目实战:从入门到初级应用教程
- 2024-11-15IDEA项目实战入门教程
- 2024-11-15IT编程项目实战:新手入门的全面指南
- 2024-11-15Java开发项目实战:新手入门与初级技巧
- 2024-11-15Java零基础项目实战:从入门到独立开发
- 2024-11-15MyBatis Plus教程:入门与基础操作详解
- 2024-11-15MyBatis-Plus教程:新手入门与实战技巧
- 2024-11-15MyBatis教程:从入门到实践