Flink Sql With 1.14 Queries 查询-概览(译)
2022/2/24 2:22:16
本文主要是介绍Flink Sql With 1.14 Queries 查询-概览(译),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
查询 #
SELECT
语句和VALUES
语句是sqlQuery()
用TableEnvironment
. 该方法将 SELECT 语句(或 VALUES 语句)的结果作为Table
. ATable
可用于后续 SQL 和 Table API 查询,转换为 DataStream或写入 TableSink。SQL 和 Table API 查询可以无缝混合,并进行整体优化并转换为单个程序。
为了在 SQL 查询中访问表,它必须在 TableEnvironment 中注册。可以从TableSource、Table、CREATE TABLE 语句、DataStream注册表。或者,用户也可以在 TableEnvironment 中注册目录以指定数据源的位置。
为方便起见,Table.toString()
自动在其唯一名称下注册表TableEnvironment
并返回该名称。因此,Table
可以将对象直接内联到 SQL 查询中,如下面的示例所示。
注意:包含不受支持的 SQL 功能的查询会导致TableException
. 以下部分列出了批处理表和流表上支持的 SQL 功能。
指定查询 #
以下示例显示如何在已注册表和内联表上指定 SQL 查询。
Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // ingest a DataStream from an external source DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...); // SQL query with an inlined (unregistered) table Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount")); Table result = tableEnv.sqlQuery( "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'"); // SQL query with a registered table // register the DataStream as view "Orders" tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount")); // run a SQL query on the Table and retrieve the result as a new Table Table result2 = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); // create and register a TableSink final Schema schema = Schema.newBuilder() .column("product", DataTypes.STRING()) .column("amount", DataTypes.INT()) .build(); final TableDescriptor sinkDescriptor = TableDescriptor.forConnector("filesystem") .schema(schema) .format(FormatDescriptor.forFormat("csv") .option("field-delimiter", ",") .build()) .build(); tableEnv.createTemporaryTable("RubberOrders", sinkDescriptor); // run an INSERT SQL on the Table and emit the result to the TableSink tableEnv.executeSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
Python
env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) # SQL query with an inlined (unregistered) table # elements data type: BIGINT, STRING, BIGINT table = table_env.from_elements(..., ['user', 'product', 'amount']) result = table_env \ .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table) # create and register a TableSink schema = Schema.new_builder() .column("product", DataTypes.STRING()) .column("amount", DataTypes.INT()) .build() sink_descriptor = TableDescriptor.for_connector("filesystem") .schema(schema) .format(FormatDescriptor.for_format("csv") .option("field-delimiter", ",") .build()) .build() t_env.create_temporary_table("RubberOrders", sink_descriptor) # run an INSERT SQL on the Table and emit the result to the TableSink table_env \ .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
回到顶部
执行查询 #
可以通过该TableEnvironment.executeSql()
方法执行SELECT语句或VALUES语句将内容收集到本地。该方法将 SELECT 语句(或 VALUES 语句)的结果作为TableResult
. 类似于 SELECT 语句,Table
可以使用该Table.execute()
方法执行对象,将查询的内容收集到本地客户端。 TableResult.collect()
方法返回一个可关闭的行迭代器。除非已收集所有结果数据,否则选择作业不会完成。我们应该主动关闭作业,以避免通过该CloseableIterator#close()
方法泄漏资源。TableResult.print()
我们也可以通过该方法将选择结果打印到客户端控制台。中的结果数据TableResult
只能访问一次。因此,collect()
不得print()
互相调用。
TableResult.collect()
并且TableResult.print()
在不同的检查点设置下具有略微不同的行为(要为流式作业启用检查点,请参阅检查点配置)。
- 对于没有检查点的批处理作业或流式作业,
TableResult.collect()
既TableResult.print()
没有完全一次也没有至少一次保证。查询结果一旦生成,客户端就可以立即访问,但是当作业失败并重新启动时会抛出异常。 - 用于具有一次性检查点的流式作业,
TableResult.collect()
并TableResult.print()
保证端到端的一次性记录交付。只有在相应的检查点完成后,客户端才能访问结果。 - 用于具有至少一次检查点的流式作业,
TableResult.collect()
并TableResult.print()
保证端到端的至少一次记录交付。查询结果一旦生成,客户端就可以立即访问,但可能会多次传递相同的结果。
Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)"); // execute SELECT statement TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders"); // use try-with-resources statement to make sure the iterator will be closed automatically try (CloseableIterator<Row> it = tableResult1.collect()) { while(it.hasNext()) { Row row = it.next(); // handle row } } // execute Table TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute(); tableResult2.print();
Python
env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env, settings) # enable checkpointing table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE") table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s") table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)") # execute SELECT statement table_result1 = table_env.execute_sql("SELECT * FROM Orders") table_result1.print() # execute Table table_result2 = table_env.sql_query("SELECT * FROM Orders").execute() table_result2.print()
回到顶部
语法 #
Flink 使用支持标准 ANSI SQL 的Apache Calcite解析 SQL。
以下 BNF 语法描述了批处理和流式查询中支持的 SQL 功能的超集。操作部分显示了受支持功能的示例,并指出了哪些功能仅支持批处理或流式查询。
语法↕Flink SQL 使用类似于 Java 的标识符(表、属性、函数名)的词法策略:
- 无论是否引用标识符,都会保留标识符的大小写。
- 之后,标识符会区分大小写。
- 与 Java 不同,反引号允许标识符包含非字母数字字符(例如)。
“SELECT a AS
my field
FROM t”
字符串文字必须用单引号括起来(例如,SELECT 'Hello World'
)。复制单引号以进行转义(例如,SELECT 'It''s me'
)。
Flink SQL> SELECT 'Hello World', 'It''s me'; +-------------+---------+ | EXPR$0 | EXPR$1 | +-------------+---------+ | Hello World | It's me | +-------------+---------+ 1 row in set
字符串文字中支持 Unicode 字符。如果需要显式 unicode 代码点,请使用以下语法:
- 使用反斜杠 (
\
) 作为转义字符(默认):SELECT U&'\263A'
- 使用自定义转义字符:
SELECT U&'#263A' UESCAPE '#'
回到顶部
操作 #
- WITH子句
- SELECT & WHERE
- SELECT DISTINCT
- 开窗 TVF
- 窗口聚合
- 分组聚合
- Over 聚合
- Joins
- Set (集合)操作
- ORDER BY 子句
- Limit 子句
- Top-N
- 开窗 Top-N
- 数据去重
- 模式检测(CEP复杂事件处理)
from flink website url:Flink SQL 查询 概览 | Apache Flink
----------------------------------------------------------- 禁止转载 --------------------------------------------------------
这篇关于Flink Sql With 1.14 Queries 查询-概览(译)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南