Flink Table和SQL的TableEnvironment的创建、程序结构、简单使用示例
2021/12/31 2:08:48
本文主要是介绍Flink Table和SQL的TableEnvironment的创建、程序结构、简单使用示例,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
目录
- 1. pom.xml依赖
- 2. 使用Table处理有界和无界数据流例子
- 3. 使用SQL处理有界和无界数据流例子
- 3. Table API和SQL的概念
- 3.1 TableEnvironment的创建
- 3.2 Table API和SQL的程序结构
- 3.3 Table API和SQL的简单使用
- 3.3.1 聚合查询
1. pom.xml依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>1.14.0</version> <scope>provided</scope> </dependency> <!-- IDEA运行需要 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.14.0</version> <scope>provided</scope> </dependency> <!-- 实现custom format or connector需要 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.14.0</version> <scope>provided</scope> </dependency>
2. 使用Table处理有界和无界数据流例子
- 处理有界数据流
apiTest\TableBatchDemo.scala
package apiTest import org.apache.flink.table.api.DataTypes.{ROW, FIELD, BIGINT, STRING, INT} import org.apache.flink.table.api.{$, EnvironmentSettings, TableEnvironment, row} import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal, AnyWithOperations} object TableBatchDemo { def main(args: Array[String]): Unit = { val settings = EnvironmentSettings.newInstance(). useBlinkPlanner().inBatchMode().build() val tEnv = TableEnvironment.create(settings) // 定义数据类型 val MyOrder = ROW(FIELD("id", BIGINT()), FIELD("product", STRING()), FIELD("amount", INT())) val table = tEnv.fromValues(MyOrder, row(1L, "BMW", 1), row(2L, "Tesla", 8), row(2L, "Tesla", 8), row(3L, "BYD", 20)) val filtered = table.where($("amount").isGreaterOrEqual(8)) // 调用execute,数据被collect到Job Manager filtered.execute().print() } }
结果如下:
+----------------------+--------------------------------+-------------+ | id | product | amount | +----------------------+--------------------------------+-------------+ | 2 | Tesla | 8 | | 2 | Tesla | 8 | | 3 | BYD | 20 | +----------------------+--------------------------------+-------------+ 3 rows in set
- 处理无界数据流
apiTest\TableStreamDemo.scala
package apiTest import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.table.api.{$, AnyWithOperations, EnvironmentSettings, ExplainDetail, TableEnvironment, string2Literal} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.api.common.typeinfo.Types.{ROW, STRING} object TableStreamDemo { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner().inStreamingMode().build() val tEnv = StreamTableEnvironment.create(senv, bsSettings) // 此方式定义的tEnv不能使用fromDataStream函数 // val tEnv = TableEnvironment.create(bsSettings) var dataStream: DataStream[String] =senv.addSource(new WordSourceFunction()) val table = tEnv.fromDataStream(dataStream, $("word")) val filtered = table.where($("word").like("%t%")) val explain = filtered.explain(ExplainDetail.JSON_EXECUTION_PLAN) println(explain) // 定义隐式值给toAppendStream函数 implicit val row_string_type = ROW(STRING) tEnv.toAppendStream(filtered) .print("table") senv.execute() } }
结果如下:
== Abstract Syntax Tree == LogicalFilter(condition=[LIKE($0, _UTF-16LE'%t%')]) +- LogicalTableScan(table=[[Unregistered_DataStream_1]]) == Optimized Physical Plan == Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')]) +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word]) == Optimized Execution Plan == Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')]) +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word]) == Physical Execution Plan == { "nodes" : [ { "id" : 1, "type" : "Source: Custom Source", "pact" : "Data Source", "contents" : "Source: Custom Source", "parallelism" : 1 }, { "id" : 2, "type" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])", "pact" : "Operator", "contents" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])", "parallelism" : 1, "predecessors" : [ { "id" : 1, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 3, "type" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])", "pact" : "Operator", "contents" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])", "parallelism" : 1, "predecessors" : [ { "id" : 2, "ship_strategy" : "FORWARD", "side" : "second" } ] } ] } table:6> +I[stream] table:7> +I[table] table:8> +I[batch] table:1> +I[batch] ......省略部分......
3. 使用SQL处理有界和无界数据流例子
- 处理有界数据流
apiTest\SqlBatchDemo.scala
package apiTest import org.apache.flink.table.api.DataTypes.{BIGINT, FIELD, INT, ROW, STRING} import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, row} import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal} object SqlBatchDemo { def main(args: Array[String]): Unit = { val settings = EnvironmentSettings.newInstance() .useBlinkPlanner().inBatchMode().build() val tEnv = TableEnvironment.create(settings) val MyOrder = ROW(FIELD("id", BIGINT()), FIELD("product", STRING()), FIELD("amount", INT()) ) val input = tEnv.fromValues(MyOrder, row(1L, "BMW", 1), row(2L, "Tesla", 8), row(2L, "Tesla", 8), row(3L, "BYD", 20)) tEnv.createTemporaryView("myOrder",input) val table = tEnv.sqlQuery("select product, sum(amount) as amount from myOrder group by product") table.execute().print() } }
结果如下:
+--------------------------------+-------------+ | product | amount | +--------------------------------+-------------+ | Tesla | 16 | | BYD | 20 | | BMW | 1 | +--------------------------------+-------------+ 3 rows in set
- 处理无界数据流
apiTest\SqlStreamDemo.scala
package apiTest import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.table.api.{$, EnvironmentSettings} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.api.common.typeinfo.Types.{ROW, STRING} object SqlStreamDemo { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner().inStreamingMode().build() val tEnv = StreamTableEnvironment.create(senv,bsSettings) val stream:DataStream[String] = senv.addSource(new WordSourceFunction()) val table = tEnv.fromDataStream(stream, $("word")) val result = tEnv.sqlQuery("select * from " + table + " where word like '%t%'") implicit val row_string_type = ROW(STRING) tEnv.toAppendStream(result).print() println(senv.getExecutionPlan) senv.execute() } }
结果如下:
{ "nodes" : [ { "id" : 1, "type" : "Source: Custom Source", "pact" : "Data Source", "contents" : "Source: Custom Source", "parallelism" : 1 }, { "id" : 2, "type" : "SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[word])", "pact" : "Operator", "contents" : "SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[word])", "parallelism" : 1, "predecessors" : [ { "id" : 1, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 3, "type" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])", "pact" : "Operator", "contents" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%t%')])", "parallelism" : 1, "predecessors" : [ { "id" : 2, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 4, "type" : "SinkConversionToRow", "pact" : "Operator", "contents" : "SinkConversionToRow", "parallelism" : 1, "predecessors" : [ { "id" : 3, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 5, "type" : "Sink: Print to Std. Out", "pact" : "Data Sink", "contents" : "Sink: Print to Std. Out", "parallelism" : 8, "predecessors" : [ { "id" : 4, "ship_strategy" : "REBALANCE", "side" : "second" } ] } ] } 7> +I[batch] 8> +I[table] 1> +I[stream] 2> +I[stream] 3> +I[table] ......省略部分......
3. Table API和SQL的概念
3.1 TableEnvironment的创建
- Table API和SQL都是基于Table接口,catalog相同
- Table API和SQL都先用Apache Calcite来解析优化等,再用Blink planner进行解析优化等,Stream模式和Batch模式最后都转化成DataStream API的Transformation
- EnvironmentSettings方式
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} val settings = EnvironmentSettings .newInstance() .inStreamingMode() // 用于stream模式 //.inBatchMode() // 用于batch模式 .build() val tEnv = TableEnvironment.create(settings)
- 不能和DataStream进行交互
- 不支持用户自定义聚合函数(UDAF)、用户自定义表值函数(UDTF)
- StreamTableEnvironment方式
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment val senv = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(senv)
- 可以和DataStream进行交互
- 支持用户自定义聚合函数(UDAF)、用户自定义表值函数(UDTF)
3.2 Table API和SQL的程序结构
mysql中表user1的结构和数据,和表user2的结构如下:
mysql> mysql> show create table flink_test.user1; +-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Table | Create Table | +-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | user1 | CREATE TABLE `user1` ( `id` bigint NOT NULL, `name` varchar(128) DEFAULT NULL, `age` int DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci | +-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row in set (0.00 sec) mysql> mysql> select * from flink_test.user1; +----+------+------+ | id | name | age | +----+------+------+ | 1 | yi | 1 | | 2 | er | 2 | | 3 | san | 3 | +----+------+------+ 3 rows in set (0.00 sec) mysql> mysql> show create table flink_test.user2; +-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Table | Create Table | +-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | user2 | CREATE TABLE `user2` ( `id` bigint NOT NULL, `name` varchar(128) DEFAULT NULL, `age` int DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci | +-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row in set (0.01 sec) mysql>
TableSqlTest.scala
package TableApiTest import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} object TableSqlTest { def main(args: Array[String]): Unit = { // 定义Table环境 val settings = EnvironmentSettings .newInstance() .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) val table_str = """ |create temporary table %s( | id bigint, | name string, | age int, | primary key (id) not enforced |) with ( | 'connector' = 'jdbc', | 'url' = 'jdbc:mysql://192.168.23.33:3306/flink_test', | 'driver' = 'com.mysql.cj.jdbc.Driver', | 'table-name' = '%s', | 'username' = 'root', | 'password' = 'Root_123' |) |""".stripMargin // 在catalog注册表 tEnv.executeSql(table_str.format("user1", "user1")) tEnv.executeSql(table_str.format("user2", "user2")) // =====================读取源表数据===================== // val user1 = tEnv.from("user1") // 方式一 // val user1 = tEnv.sqlQuery("select * from user1 limit 2") // 方式二 // =====================向目标表插入数据===================== // user1.executeInsert("user2") // 方式一 val stmtSet = tEnv.createStatementSet() // stmtSet.addInsert("user2", user1) // 方式二 stmtSet.addInsertSql("insert into user2 select * from user1 limit 2") // 方式三 stmtSet.execute() } }
- Batch模式只能插入BatchTableSink
- Streaming模式可以插入AppendStreamTableSink、RetractStreamTableSink、UpsertStreamTableSink
执行TableSqlTest.scala,查询mysql中的表user2数据
mysql> mysql> select * from user2; +----+------+------+ | id | name | age | +----+------+------+ | 1 | yi | 1 | | 2 | er | 2 | +----+------+------+ 2 rows in set (0.00 sec) mysql>
3.3 Table API和SQL的简单使用
3.3.1 聚合查询
代码示例:
package TableApiTest import org.apache.flink.table.api.Expressions.{$, lit, row} import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} object TableSqlTest { def main(args: Array[String]): Unit = { // 定义Table环境 val settings = EnvironmentSettings .newInstance() .inStreamingMode() .build() val tEnv = TableEnvironment.create(settings) val table = tEnv.fromValues( row(10, "A"), row(20, "A"), row(100, "B"), row(200, "B") ).renameColumns($("f0").as("amount"), $("f1").as("name")) tEnv.createTemporaryView("tmp_table", table) val table_result = table .filter($("amount").isGreater(lit(0))) .groupBy($("name")) .select($("name"), $("amount").sum().as("amount")) table_result.execute().print() val sql_result = tEnv.sqlQuery("select name, sum(amount) as amount from tmp_table where amount > 0 group by name") sql_result.execute().print() } }
执行结果:
+----+--------------------------------+-------------+ | op | name | amount | +----+--------------------------------+-------------+ | +I | A | 10 | | -U | A | 10 | | +U | A | 30 | | +I | B | 100 | | -U | B | 100 | | +U | B | 300 | +----+--------------------------------+-------------+ 6 rows in set +----+--------------------------------+-------------+ | op | name | amount | +----+--------------------------------+-------------+ | +I | A | 10 | | -U | A | 10 | | +U | A | 30 | | +I | B | 100 | | -U | B | 100 | | +U | B | 300 | +----+--------------------------------+-------------+ 6 rows in set
- 聚合方式为迭代聚合,其中-U为更新前的值,+U为更新后的值
这篇关于Flink Table和SQL的TableEnvironment的创建、程序结构、简单使用示例的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南