作为基础任务的窗口差值
2021/5/24 18:57:02
本文主要是介绍作为基础任务的窗口差值,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
package com.mz.iot.test; import com.mz.iot.utils.DateUtil; import com.mz.iot.utils.FlinkUtils; import com.mz.iot.utils.LogUtils; import lombok.*; import org.apache.commons.compress.utils.Lists; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Comparator; /** * 求窗口内最大元素与最小元素的差值 * 注意点:最小元素是前一个窗口的最大值 * 适用于窗口很短,没有触发器的窗口,作为基础任务 */ public class TestWindowDiff2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = FlinkUtils.createEnv(""); /** * 输入数据格式 * a,2020-10-01 00:12:01,1 * a,2020-10-01 00:12:02,3 * a,2020-10-01 00:12:06,5 * * a,2020-10-01 00:12:10,10 * a,2020-10-01 00:12:18,12 * * a,2020-10-01 00:12:28,15 * * a,2020-10-01 00:12:33,18 * a,2020-10-01 00:12:38,20 * * a,2020-10-01 00:13:05,28 * a,2020-10-01 00:13:08,38 * * a,2020-10-01 00:13:18,40 * */ DataStream<String> socket = env.socketTextStream("mz-hadoop-03", 7777); socket.print("socket stream"); SingleOutputStreamOperator<Event> mainStream = socket.map(new MapFunction<String, Event>() { @Override public Event map(String value) throws Exception { String[] arr = value.split(","); return new Event(arr[0].trim(), arr[1], Float.parseFloat(arr[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(5)) { @Override public long extractTimestamp(Event element) { return DateUtil.getMillsFromString(element.getTime()); } }); SingleOutputStreamOperator<WindowResult> windowResultStream = mainStream.keyBy("kind") .window(TumblingEventTimeWindows.of(Time.seconds(10), Time.minutes(0))) .process(new ProcessWindowFunction<Event, WindowResult, Tuple, TimeWindow>() { ValueState<Float> lastMaxState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); lastMaxState = getRuntimeContext().getState(new ValueStateDescriptor<>("last-max", Types.FLOAT)); } @Override public void process(Tuple tuple, Context context, Iterable<Event> elements, Collector<WindowResult> out) throws Exception { String t_start = DateUtil.getDateStrFromMill(context.window().getStart()); String t_end = DateUtil.getDateStrFromMill(context.window().getEnd()); String wm = DateUtil.getDateStrFromMill(context.currentWatermark()); ArrayList<Event> events = Lists.newArrayList(elements.iterator()); events.sort(new Comparator<Event>() { @Override public int compare(Event o1, Event o2) { return new BigDecimal(o2.getValue() + "").compareTo(new BigDecimal(o1.getValue() + "")); } }); Event event_max = events.get(0); Event event_min = events.get(events.size() - 1); LogUtils.info("窗口边界:[" + t_start + "," + t_end + "),水位线:" + wm + ",最大值:" + event_max + ",最小值:" + event_min); /** * 第一个窗口批次数据 */ if (lastMaxState.value() == null) { lastMaxState.update(event_min.getValue()); } /** * 取出state,使用 */ float lastValue = lastMaxState.value(); System.out.println("上一个窗口的最大值为:" + lastValue); /** * 更新state */ lastMaxState.update(event_max.getValue()); /** * 写出数据,补充window信息以及差值diff */ WindowResult result = new WindowResult(); result.setKind(event_max.getKind()); result.setValue(event_max.getValue()); result.setWindowStart(context.window().getStart()); result.setDiff(new BigDecimal(String.valueOf(event_max.getValue())).subtract(new BigDecimal(String.valueOf(lastValue))).floatValue()); out.collect(result); } }); windowResultStream.print("resultStream====>"); env.execute("test window diff with socket stream"); } @Data @NoArgsConstructor @AllArgsConstructor public static class Event { private String kind; private String time; private float value; } @Getter @Setter @NoArgsConstructor @AllArgsConstructor public static class WindowResult { private String kind; private float value;//当前批次的实时值 private long windowStart;//当前批次所在窗口起始 private float diff;// @Override public String toString() { return "WindowResult{" + "kind='" + kind + '\'' + ", value=" + value + ", useCnt=" + diff + ", windowStart=" + DateUtil.getDateStrFromMill(windowStart) + '}'; } } }
这篇关于作为基础任务的窗口差值的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-09-28AI给的和自己写的Python代码,都无法改变输入框的内容,替换也不行
- 2024-09-27Sentinel配置限流资料:新手入门教程
- 2024-09-27Sentinel配置限流资料详解
- 2024-09-27Sentinel限流资料:新手入门教程
- 2024-09-26Sentinel限流资料入门详解
- 2024-09-26Springboot框架资料:初学者入门教程
- 2024-09-26Springboot框架资料详解:新手入门教程
- 2024-09-26Springboot企业级开发资料:新手入门指南
- 2024-09-26SpringBoot企业级开发资料新手指南
- 2024-09-26Springboot微服务资料入门教程