Flink 基石、Flink Time、事件时间、Watermark水位线
2022/3/20 23:29:07
本文主要是介绍Flink 基石、Flink Time、事件时间、Watermark水位线,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Flink 基石、Flink Time、事件时间、Watermark水位线
目录- Flink 基石、Flink Time、事件时间、Watermark水位线
- Flink 基石
- Flink Time
- 事件时间
- Watermark
Flink 基石
Flink Time
事件时间
代码示例
package com.shujia.flink.core import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object Demo5EventTIme { def main(args: Array[String]): Unit = { /* 用户id,事件时间 001,1647676561000 001,1647676562000 001,1647676563000 001,1647676565000 001,1647676564000 001,1647676566000 001,1647676567000 001,1647676568000 001,1647676569000 001,1647676570000 001,1647676575000 */ /** * 使用事件时间划分窗口 * 1、设置事件模式为事件时间 * 2、指定时间字段 */ /** * 每隔5秒统计用户出现的次数 * */ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //这里需要将并行度设置为1 //因为这里存在一个时间戳对齐的问题,多并行度的时候会对不齐 //不会触发事件时间的计算 env.setParallelism(1) //设置时间模式 //默认是处理时间 //TimeCharacteristic.EventTime -- 事件时间 //TimeCharacteristic.IngestionTime -- 接收时间 //TimeCharacteristic.ProcessingTime -- 处理时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val linesDS: DataStream[String] = env.socketTextStream("master", 8888) val eventDS: DataStream[(String, Long)] = linesDS.map(line => { val split: Array[String] = line.split(",") (split(0), split(1).toLong) }) //设置时间字段 val assDS: DataStream[(String, Long)] = eventDS.assignAscendingTimestamps(_._2) /** * 事件时间窗口触发条件 * 1、窗口内有数据 * 2、最新数据的事件时间大于等于窗口的结束数据的时间 * 但是这样会有一个问题,就是数据的事件时间是乱序的,这样怎么办呢? */ val countDS: DataStream[(String, Int)] = assDS .map(kv => (kv._1, 1)) .keyBy(_._1) .timeWindow(Time.seconds(5)) .sum(1) countDS.print() env.execute() } }
但是这样会有一个问题,就是数据的事件时间是乱序的,这样怎么办呢?
窗口如果被计算了,之后再来一条属于这个窗口的数据会丢数据
Watermark
水位线
package com.shujia.flink.core import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object Demo5EventTIme { def main(args: Array[String]): Unit = { /* 001,1647676561000 001,1647676562000 001,1647676563000 001,1647676565000 001,1647676564000 001,1647676566000 001,1647676567000 001,1647676568000 001,1647676569000 001,1647676570000 001,1647676575000 */ /** * 使用事件事件划分窗口 * 1、设置事件模式为事件时间 * 2、指定时间字段 */ /** * 每隔5秒统计用户出现的次数 * */ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //设置时间模式 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val linesDS: DataStream[String] = env.socketTextStream("master", 8888) val eventDS: DataStream[(String, Long)] = linesDS.map(line => { val split: Array[String] = line.split(",") (split(0), split(1).toLong) }) //设置时间字段,水位线默认等于最新数据的时间戳,水位线只增加不减少 // val assDS: DataStream[(String, Long)] = eventDS.assignAscendingTimestamps(_._2) //设置水位线和时间字段 val assDS: DataStream[(String, Long)] = eventDS.assignTimestampsAndWatermarks( //执行水位线前移的时间 new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) { //指定时间戳字段 override def extractTimestamp(element: (String, Long)): Long = element._2 } ) /** * 事件时间窗口触发条件 * 1、窗口内有数据 * 2、最新数据的时间大于等于窗口的结束数据 * */ val countDS: DataStream[(String, Int)] = assDS .map(kv => (kv._1, 1)) .keyBy(_._1) .timeWindow(Time.seconds(5)) .sum(1) countDS.print() env.execute() } }
学习一个新框架,会看官网很重要
这篇关于Flink 基石、Flink Time、事件时间、Watermark水位线的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-09-28微服务架构中API版本控制的实践
- 2024-09-28AI给的和自己写的Python代码,都无法改变输入框的内容,替换也不行
- 2024-09-27Sentinel配置限流资料:新手入门教程
- 2024-09-27Sentinel配置限流资料详解
- 2024-09-27Sentinel限流资料:新手入门教程
- 2024-09-26Sentinel限流资料入门详解
- 2024-09-26Springboot框架资料:初学者入门教程
- 2024-09-26Springboot框架资料详解:新手入门教程
- 2024-09-26Springboot企业级开发资料:新手入门指南
- 2024-09-26SpringBoot企业级开发资料新手指南