Flink SQl 语法(hint,with,select,分组窗口聚合,时间属性(处理,事件))
2022/8/1 2:23:09
本文主要是介绍Flink SQl 语法(hint,with,select,分组窗口聚合,时间属性(处理,事件)),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
6、Flink SQl 语法
1、查询语句
1、hint
在对表进行查询的是偶动态修改表的属性
-- 创建表 CREATE TABLE word ( lines STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'word', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据 'format' = 'csv', 'csv.field-delimiter'='\t' ) -- 加载hive函数 LOAD MODULE hive WITH ('hive-version' = '1.2.1'); --统计单词的数量 --不动态指定开始读取的参数 select word,count(1) from word, lateral table(explode(split(lines,','))) as t(word) group by word -- OPTIONS 动态指定参数 select word,count(1) from word /*+ OPTIONS('scan.startup.mode'='latest-offset') */ , lateral table(explode(split(lines,','))) as t(word) group by word
3、WITH
-- temp可以在后面的sql中使用多次 with temp as ( select word from word, lateral table(explode(split(lines,','))) as t(word) ) select * from temp union all select * from temp
4、SELECT
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)
5、分组窗口聚合
老版本语法,新版本中不推荐使用
-- PROCTIME(): 获取处理时间的函数 CREATE TABLE words_window ( lines STRING, proc_time as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'words', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据 'format' = 'csv', 'csv.field-delimiter'='\t' ) -- TUMBLE:滚动窗口 -- HOP": 滑动黄口 -- SESSION: 会话窗口 --TUMBLE:处理时间的滑动窗口 select word, TUMBLE_START(proc_time, INTERVAL '5' SECOND) as s, -- 窗口开始时间 TUMBLE_END(proc_time, INTERVAL '5' SECOND) as e, -- 窗口开始使时间 count(1) as c from words_window, lateral table(explode(split(lines,','))) as t(word) group by word, TUMBLE(proc_time, INTERVAL '5' SECOND) -- 每5秒计算一次
-
会话窗口
一段时间没有数据开始计算
暂时只能在老板本api中使用
CREATE TABLE words_window ( lines STRING, proc_time as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'words', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据 'format' = 'csv', 'csv.field-delimiter'='\t' ) select word, SESSION_START(proc_time, INTERVAL '5' SECOND) as s, -- 窗口开始时间 SESSION_END(proc_time, INTERVAL '5' SECOND) as e, -- 窗口结束使时间 count(1) as c from words_window, lateral table(explode(split(lines,','))) as t(word) group by word, SESSION(proc_time, INTERVAL '5' SECOND) -- 会话超过5秒中没有发送消息,就开始进行计算
6、TVFs(重点)
- 滚动窗口函数
CREATE TABLE words_window ( lines STRING, proc_time as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'words', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据 'format' = 'csv', 'csv.field-delimiter'='\t' ) -- TUMBLE(TABLE words_window, DESCRIPTOR(proc_time), INTERVAL '5' SECOND) -- TUMBLE: 窗口函数,可以给原表增加床i偶开始时间,窗口的结束时间,窗口时间 -- TABLE words_window : 指定原表 -- DESCRIPTOR(proc_time) 指定时间字段,可以处理时间,也可以是事件时间 -- INTERVAL '5' SECOND 指定窗口大小 SELECT lines,proc_time,window_start,window_end,window_time FROM TABLE( TUMBLE(TABLE words_window, DESCRIPTOR(proc_time), INTERVAL '5' SECOND) ); -- 在划分和窗口之后进行聚合计算 SELECT word,window_start,count(1) as c FROM TABLE( TUMBLE(TABLE words_window, DESCRIPTOR(proc_time), INTERVAL '5' SECOND) ), lateral table(explode(split(lines,','))) as t(word) group by word,window_start
-
滑动窗口函数
一条数据会出现在多个窗口中,所以输入一条数据,会输出多条数据
CREATE TABLE words_window ( lines STRING, proc_time as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'words', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据 'format' = 'csv', 'csv.field-delimiter'='\t' ) -- HOP: 滑动窗口函数,需要指定窗口大小和滑动时间 -- 输入一条数据会输出多条数据 with temp as ( select * from words_window /*+ OPTIONS('scan.startup.mode'='latest-offset') */ ) SELECT * FROM TABLE( HOP(TABLE temp , DESCRIPTOR(proc_time), INTERVAL '5' SECOND, INTERVAL '15' SECOND) ) ; -- 窗口止呕进行聚合 with temp as ( select * from words_window /*+ OPTIONS('scan.startup.mode'='latest-offset') */ ) SELECT word ,window_start,count(1) as c FROM TABLE( HOP(TABLE temp, DESCRIPTOR(proc_time), INTERVAL '5' SECOND, INTERVAL '15' SECOND)), lateral table(explode(split(lines,','))) as t(word) group by word,window_start ;
7、时间属性
1、处理时间
使用PROCTIME()函数给表增加一个时间字段
CREATE TABLE student_kafka_proc_time ( id STRING, name STRING, age INT, gender STRING, clazz STRING, proc as PROCTIME() -- 处理时间字段 ) WITH ( 'connector' = 'kafka', 'topic' = 'student', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv', 'csv.field-delimiter'=',', -- csv格式数据的分隔符 'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null 'csv.allow-comments'='true'--跳过#注释行 ) -- 使用处理时间可以做窗口统计 SELECT clazz,window_start,count(1) as c FROM TABLE( TUMBLE(TABLE student_kafka_proc_time, DESCRIPTOR(proc), INTERVAL '5' SECOND) ) group by clazz,window_start
2、事件时间
-
测试数据
1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:10 1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:11 1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:12 1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:20 1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:15 1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:25
-
创建表指定时间字段和水位线
-- TIMESTAMP(3) flink的时间戳类型 -- ts - INTERVAL '5' SECOND 水位线前移5秒 CREATE TABLE student_kafka_event_time ( id STRING, name STRING, age INT, gender STRING, clazz STRING, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 指定时间字段和水位线 ) WITH ( 'connector' = 'kafka', 'topic' = 'student_event_time', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ) -- 使用事件时间 做窗口函数统计 -- 每一条数据都会计算出一个结果,会取更新之前已经输出的结果 -- 不存在数据丢失问题 -- 需要将统计结果保存在状态中 SELECT clazz,window_start,count(1) as c FROM TABLE( TUMBLE(TABLE student_kafka_event_time, DESCRIPTOR(ts), INTERVAL '5' SECOND) ) group by clazz,window_start -- 分钟窗口统计 -- 如果数据乱序可能会丢失数据 -- 不需要将统计的结果保存在状态中 select clazz, TUMBLE_START(ts, INTERVAL '5' SECOND) as s, -- 窗口开始时间 TUMBLE_END(ts, INTERVAL '5' SECOND) as e, -- 窗口开始使时间 count(1) as c from student_kafka_event_time group by clazz, TUMBLE(ts, INTERVAL '5' SECOND) -- 没4秒计算一次 -- 生产数据 kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student_event_time
练习
统计单词的数量,
每隔5秒统计一次
每个窗口中取单词数量最多个两个单词
CREATE TABLE words_window_demo ( lines STRING, proc_time as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'words', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据 'format' = 'csv', 'csv.field-delimiter'='\t' ) -- 在夫林卡 sql 流处理中row_number()必须要取topN select * from ( select word, window_start, c, row_number() over(partition by window_start order by c desc) as r from ( select word,window_start,count(1) as c from TABLE( TUMBLE(TABLE words_window_demo, DESCRIPTOR(proc_time), INTERVAL '5' SECOND) ), lateral table(explode(split(lines,','))) as t(word) group by word,window_start ) as a ) as b where r <= 2
- 统计每个城市中每个区县的车流量
- 每隔5分钟统计一次,统计最近15分钟的数据
- 每个城市中取车流量最大的前2个区县
- 将统计好的结果保存到数据库中
-- 数据 { "car": "皖AK0H90", "city_code": "340100", "county_code": "340111", "card": 117303031813010, "camera_id": "00004", "orientation": "北", "road_id": 34130440, "time": 1614799929, "speed": 84.51 } -- TIMESTAMP(3) flink的时间戳类型 -- ts - INTERVAL '5' SECOND 水位线前移5秒 -- 创建表读取kafka中的json数据 CREATE TABLE cars_kafka_event_time ( car STRING, city_code STRING, county_code STRING, card BIGINT, camera_id STRING, orientation STRING, road_id BIGINT, `time` BIGINT, speed DOUBLE, ts_ltz AS TO_TIMESTAMP_LTZ(`time`, 3), WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 指定时间字段和水位线 ) WITH ( 'connector' = 'kafka', 'topic' = 'car_test', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'carGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ) -- 测试一下是否存在数据 select * from cars_kafka_event_time -- 统计每个城市中每个区县的车流量,每隔5分钟统计一次,统计最近15分钟的数据,每个城市中取车流量最大的前2个区县 select * from ( select county_code ,city_code ,window_start , c ,row_number() over(partition by window_start order by c desc) as r from ( with temp as ( select * from cars_kafka_event_time /*+ OPTIONS('scan.startup.mode'='latest-offset') */ ) SELECT county_code ,city_code ,window_start ,count(1) as c FROM TABLE( HOP(TABLE temp, DESCRIPTOR(ts_ltz), INTERVAL '5' SECOND, INTERVAL '15' SECOND)) group by county_code,city_code,window_start ) as b ) as h where r <= 2; -- 创建mysql的sink表 CREATE TABLE clazz_num_mysql ( country_city_r_count STRING, window_start STRING, PRIMARY KEY (country_city_r_count) NOT ENFORCED -- 按照主键更新数据 ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'city_top_2', -- 需要手动到数据库中创建表 'username' = 'root', 'password' = '123456' ); -- 发送到mysql中 insert into clazz_num_mysql select concat_ws('_',county_code,city_code,r,c) country_city_r_count ,window_start from ( select cast(county_code as STRING) county_code ,cast(city_code as STRING) city_code ,cast(window_start as STRING) window_start ,cast(c as STRING) c ,cast(row_number() over(partition by window_start order by c desc) as STRING) as r from ( with temp as ( select * from cars_kafka_event_time ) SELECT county_code ,city_code ,window_start ,count(1) as c FROM TABLE( HOP(TABLE temp, DESCRIPTOR(ts_ltz), INTERVAL '5' SECOND, INTERVAL '15' SECOND)) group by county_code,city_code,window_start ) as b ) as h where r <= 2; -- mysql 中的查询方法如下(笨方法) select SUBSTRING_INDEX(country_city_r_count,'_',1) as country ,SUBSTRING_INDEX(SUBSTRING_INDEX(country_city_r_count,'_',2),'_',1)as city,SUBSTRING_INDEX(SUBSTRING_INDEX(country_city_r_count,'_',3) ,'_',-1) as topn , SUBSTRING_INDEX(country_city_r_count,'_',-1) as count_car ,window_start from city_top_2
这篇关于Flink SQl 语法(hint,with,select,分组窗口聚合,时间属性(处理,事件))的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享