作为基础任务的窗口差值

2021/5/24 18:57:02

本文主要是介绍作为基础任务的窗口差值,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

package com.mz.iot.test;

import com.mz.iot.utils.DateUtil;
import com.mz.iot.utils.FlinkUtils;
import com.mz.iot.utils.LogUtils;
import lombok.*;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Comparator;


/**
 * 求窗口内最大元素与最小元素的差值
 * 注意点:最小元素是前一个窗口的最大值
 * 适用于窗口很短,没有触发器的窗口,作为基础任务
 */
public class TestWindowDiff2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = FlinkUtils.createEnv("");

        /**
         * 输入数据格式
         * a,2020-10-01 00:12:01,1
         * a,2020-10-01 00:12:02,3
         * a,2020-10-01 00:12:06,5
         *
         * a,2020-10-01 00:12:10,10
         * a,2020-10-01 00:12:18,12
         *
         * a,2020-10-01 00:12:28,15
         *
         * a,2020-10-01 00:12:33,18
         * a,2020-10-01 00:12:38,20
         *
         * a,2020-10-01 00:13:05,28
         * a,2020-10-01 00:13:08,38
         *
         * a,2020-10-01 00:13:18,40
         *
         */
        DataStream<String> socket = env.socketTextStream("mz-hadoop-03", 7777);

        socket.print("socket stream");

        SingleOutputStreamOperator<Event> mainStream = socket.map(new MapFunction<String, Event>() {
            @Override
            public Event map(String value) throws Exception {
                String[] arr = value.split(",");
                return new Event(arr[0].trim(), arr[1], Float.parseFloat(arr[2]));
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(5)) {
            @Override
            public long extractTimestamp(Event element) {
                return DateUtil.getMillsFromString(element.getTime());
            }
        });

        SingleOutputStreamOperator<WindowResult> windowResultStream = mainStream.keyBy("kind")
                .window(TumblingEventTimeWindows.of(Time.seconds(10), Time.minutes(0)))
                .process(new ProcessWindowFunction<Event, WindowResult, Tuple, TimeWindow>() {

                    ValueState<Float> lastMaxState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        lastMaxState = getRuntimeContext().getState(new ValueStateDescriptor<>("last-max", Types.FLOAT));
                    }

                    @Override
                    public void process(Tuple tuple, Context context, Iterable<Event> elements, Collector<WindowResult> out) throws Exception {

                        String t_start = DateUtil.getDateStrFromMill(context.window().getStart());
                        String t_end = DateUtil.getDateStrFromMill(context.window().getEnd());


                        String wm = DateUtil.getDateStrFromMill(context.currentWatermark());

                        ArrayList<Event> events = Lists.newArrayList(elements.iterator());

                        events.sort(new Comparator<Event>() {
                            @Override
                            public int compare(Event o1, Event o2) {
                                return new BigDecimal(o2.getValue() + "").compareTo(new BigDecimal(o1.getValue() + ""));
                            }
                        });

                        Event event_max = events.get(0);
                        Event event_min = events.get(events.size() - 1);

                        LogUtils.info("窗口边界:[" + t_start + "," + t_end + "),水位线:" + wm + ",最大值:" + event_max + ",最小值:" + event_min);

                        /**
                         * 第一个窗口批次数据
                         */
                        if (lastMaxState.value() == null) {
                            lastMaxState.update(event_min.getValue());
                        }

                        /**
                         * 取出state,使用
                         */
                        float lastValue = lastMaxState.value();
                        System.out.println("上一个窗口的最大值为:" + lastValue);

                        /**
                         * 更新state
                         */
                        lastMaxState.update(event_max.getValue());

                        /**
                         * 写出数据,补充window信息以及差值diff
                         */
                        WindowResult result = new WindowResult();
                        result.setKind(event_max.getKind());
                        result.setValue(event_max.getValue());
                        result.setWindowStart(context.window().getStart());
                        result.setDiff(new BigDecimal(String.valueOf(event_max.getValue())).subtract(new BigDecimal(String.valueOf(lastValue))).floatValue());

                        out.collect(result);
                    }
                });

        windowResultStream.print("resultStream====>");

        env.execute("test window diff with socket stream");

    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Event {
        private String kind;
        private String time;
        private float value;
    }

    @Getter
    @Setter
    @NoArgsConstructor
    @AllArgsConstructor
    public static class WindowResult {

        private String kind;
        private float value;//当前批次的实时值
        private long windowStart;//当前批次所在窗口起始
        private float diff;//

        @Override
        public String toString() {
            return "WindowResult{" +
                    "kind='" + kind + '\'' +
                    ", value=" + value +
                    ", useCnt=" + diff +
                    ", windowStart=" + DateUtil.getDateStrFromMill(windowStart) +
                    '}';
        }
    }
}

 



这篇关于作为基础任务的窗口差值的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程