Flink--Table Api 和 sql 之 watermark开窗间属性(二)
2021/10/19 19:09:33
本文主要是介绍Flink--Table Api 和 sql 之 watermark开窗间属性(二),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1. Flink 设置watermak
这里说下这个时间时间的取值,本来我kafka的数据是clickhouse 查询时间特意处理成时间戳。然后使用 TO_TIMESTAMP(date_time) 来设置watermark。 阿里云官网 blink 是支持的,但是这个实际中却不支持。
真的有点狗了。。。。
解决办法如下写法。
public static final String SOURCE_KAFKA_SNAPSHOT = "CREATE TABLE tableName (\n" + "`date_time` BIGINT ,\n" + "`hs_security_id` VARCHAR ,\n" + "`security_id` VARCHAR ,\n" + "`pre_close_px` DECIMAL,\n" + "`open_px` DECIMAL,\n" + "`high_px` DECIMAL ,\n" + "`low_px` DECIMAL,\n" + "`last_px` DECIMAL,\n" + "`num_trades` DECIMAL,\n" + "`volume` BIGINT,\n" + "`amount` DECIMAL,\n" + "`phase_code` BIGINT,\n" + "bid_price VARCHAR,\n" + "bid_qty VARCHAR,\n" + "offer_price VARCHAR,\n" + "offer_qty VARCHAR,\n" + "ts AS TO_TIMESTAMP(FROM_UNIXTIME(date_time / 1000, 'yyyy-MM-dd HH:mm:ss'))," + " WATERMARK FOR ts AS ts - INTERVAL '10' SECOND \n" + ")WITH (\n" + " 'connector' = 'kafka', \n" + " 'topic'='xxx',\n" + " 'properties.bootstrap.servers' = 'xxx.xxx.xx.xx:9092', \n" + " 'format' = 'json',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + "'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'" + ")";
2.设置开窗
public class OfflineDataAggregationTableApi implements Serializable { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); String sourceDDL = CustomTable.SOURCE_KAFKA_SNAPSHOT; // String sinkDDL = CustomTable.SNAPSHOT_PRINT; //注册source和sink tableEnv.executeSql(sourceDDL); // tableEnv.executeSql(sinkDDL); Table sourceTable = tableEnv.from("snapshot"); Table timeTable = tableEnv.sqlQuery("select \n" + "TUMBLE_START(ts, INTERVAL '15' SECOND), \n" + " hs_security_id,\n" + " security_id,\n" + " MAX(pre_close_px) as pre_close_px, \n" + " MAX(open_px) as open_px, \n" + " MAX(high_px) as high_px, \n" + " FIRST_VALUE(phase_code) as phase_code, \n" + " FIRST_VALUE(bid_price) as bid_price, \n" + " FIRST_VALUE(bid_qty) as bid_qty, \n" + " FIRST_VALUE(offer_price) as offer_price, \n" + " FIRST_VALUE(offer_qty) as offer_qty \n" + " from " + sourceTable + " group by TUMBLE(ts, INTERVAL '15' SECOND),hs_security_id,security_id"); TableResult tableResult = tableEnv.executeSql(" select * from " + timeTable); tableResult.print(); env.execute("快照数据读取"); } }
这篇关于Flink--Table Api 和 sql 之 watermark开窗间属性(二)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南