FLINK基础(142):DS流与表转换(8) Handling of Changelog Streams(3) toChangelogStream
2021/8/30 6:07:46
本文主要是介绍FLINK基础(142):DS流与表转换(8) Handling of Changelog Streams(3) toChangelogStream,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
The following code shows how to use toChangelogStream
for different scenarios.
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.data.StringData; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import static org.apache.flink.table.api.Expressions.*; // create Table with event-time tableEnv.executeSql( "CREATE TABLE GeneratedTable " + "(" + " name STRING," + " score INT," + " event_time TIMESTAMP_LTZ(3)," + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND" + ")" + "WITH ('connector'='datagen')"); Table table = tableEnv.from("GeneratedTable"); // === EXAMPLE 1 === // convert to DataStream in the simplest and most general way possible (no event-time) Table simpleTable = tableEnv .fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12)) .as("name", "score") .groupBy($("name")) .select($("name"), $("score").sum()); tableEnv .toChangelogStream(simpleTable) .executeAndCollect() .forEachRemaining(System.out::println); // prints: // +I[Bob, 12] // +I[Alice, 12] // -U[Alice, 12] // +U[Alice, 14] // === EXAMPLE 2 === // convert to DataStream in the simplest and most general way possible (with event-time) DataStream<Row> dataStream = tableEnv.toChangelogStream(table); // since `event_time` is a single time attribute in the schema, it is set as the // stream record's timestamp by default; however, at the same time, it remains part of the Row dataStream.process( new ProcessFunction<Row, Void>() { @Override public void processElement(Row row, Context ctx, Collector<Void> out) { // prints: [name, score, event_time] System.out.println(row.getFieldNames(true)); // timestamp exists twice assert ctx.timestamp() == row.<Instant>getFieldAs("event_time").toEpochMilli(); } }); env.execute(); // === EXAMPLE 3 === // convert to DataStream but write out the time attribute as a metadata column which means // it is not part of the physical schema anymore DataStream<Row> dataStream = tableEnv.toChangelogStream( table, Schema.newBuilder() .column("name", "STRING") .column("score", "INT") .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") .build()); // the stream record's timestamp is defined by the metadata; it is not part of the Row dataStream.process( new ProcessFunction<Row, Void>() { @Override public void processElement(Row row, Context ctx, Collector<Void> out) { // prints: [name, score] System.out.println(row.getFieldNames(true)); // timestamp exists once System.out.println(ctx.timestamp()); } }); env.execute(); // === EXAMPLE 4 === // for advanced users, it is also possible to use more internal data structures for efficiency // note that this is only mentioned here for completeness because using internal data structures // adds complexity and additional type handling // however, converting a TIMESTAMP_LTZ column to `Long` or STRING to `byte[]` might be convenient, // also structured types can be represented as `Row` if needed DataStream<Row> dataStream = tableEnv.toChangelogStream( table, Schema.newBuilder() .column( "name", DataTypes.STRING().bridgedTo(StringData.class)) .column( "score", DataTypes.INT()) .column( "event_time", DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class)) .build()); // leads to a stream of Row(name: StringData, score: Integer, event_time: Long)
For more information about which conversions are supported for data types in Example 4, see the Table API’s Data Types page.
The behavior of toChangelogStream(Table).executeAndCollect()
is equal to calling Table.execute().collect()
. However, toChangelogStream(Table)
might be more useful for tests because it allows to access the produced watermarks in a subsequent ProcessFunction
in DataStream API.
这篇关于FLINK基础(142):DS流与表转换(8) Handling of Changelog Streams(3) toChangelogStream的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-09CMS内容管理系统是什么?如何选择适合你的平台?
- 2025-01-08CCPM如何缩短项目周期并降低风险?
- 2025-01-08Omnivore 替代品 Readeck 安装与使用教程
- 2025-01-07Cursor 收费太贵?3分钟教你接入超低价 DeepSeek-V3,代码质量逼近 Claude 3.5
- 2025-01-06PingCAP 连续两年入选 Gartner 云数据库管理系统魔力象限“荣誉提及”
- 2025-01-05Easysearch 可搜索快照功能,看这篇就够了
- 2025-01-04BOT+EPC模式在基础设施项目中的应用与优势
- 2025-01-03用LangChain构建会检索和搜索的智能聊天机器人指南
- 2025-01-03图像文字理解,OCR、大模型还是多模态模型?PalliGema2在QLoRA技术上的微调与应用
- 2025-01-03混合搜索:用LanceDB实现语义和关键词结合的搜索技术(应用于实际项目)