2021年大数据Flink(三十七):???????Table与SQL ??????案例四
2021/5/3 2:25:16
本文主要是介绍2021年大数据Flink(三十七):???????Table与SQL ??????案例四,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
目录
案例四
需求
代码实现
案例四
需求
从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka
{"user_id": "1", "page_id":"1", "status": "success"} {"user_id": "1", "page_id":"1", "status": "success"} {"user_id": "1", "page_id":"1", "status": "success"} {"user_id": "1", "page_id":"1", "status": "success"} {"user_id": "1", "page_id":"1", "status": "fail"}
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic input_kafka /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic output_kafka /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic input_kafka /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic output_kafka --from-beginning
代码实现
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html
package cn.itcast.sql; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * Author itcast * Desc */ public class FlinkSQL_Table_Demo06 { public static void main(String[] args) throws Exception { //1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2.Source TableResult inputTable = tEnv.executeSql( "CREATE TABLE input_kafka (\n" + " `user_id` BIGINT,\n" + " `page_id` BIGINT,\n" + " `status` STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'input_kafka',\n" + " 'properties.bootstrap.servers' = 'node1:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" ); TableResult outputTable = tEnv.executeSql( "CREATE TABLE output_kafka (\n" + " `user_id` BIGINT,\n" + " `page_id` BIGINT,\n" + " `status` STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'output_kafka',\n" + " 'properties.bootstrap.servers' = 'node1:9092',\n" + " 'format' = 'json',\n" + " 'sink.partitioner' = 'round-robin'\n" + ")" ); String sql = "select " + "user_id," + "page_id," + "status " + "from input_kafka " + "where status = 'success'"; Table ResultTable = tEnv.sqlQuery(sql); DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class); resultDS.print(); tEnv.executeSql("insert into output_kafka select * from "+ResultTable); //7.excute env.execute(); } }
这篇关于2021年大数据Flink(三十七):???????Table与SQL ??????案例四的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-01服务器购买课程:新手入门必备指南
- 2024-11-01动态路由表学习详解
- 2024-11-01服务器购买学习:新手入门指南
- 2024-11-01动态路由表入门详解
- 2024-11-01服务器购买入门:新手必读指南
- 2024-10-31云服务实战:新手入门教程
- 2024-10-31云服务资料入门教程:轻松上手云服务
- 2024-10-31初学者指南:服务器购买全流程解析
- 2024-10-30在Snowflake上运行SFTP服务器的窍门
- 2024-10-30Snowflake数据度量函数在数据质量验证中的应用技巧