Flink Windows
2022/4/9 7:20:57
本文主要是介绍Flink Windows,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Windows 属性
Windows 就是基于ListState 和 AggregateState来做的存储,Windows里面有三个重要的属性,Assigner、Trigger、Evictor (非必须)。
WindowsAssigner
TumblingEventTimeWindows
protected TumblingEventTimeWindows(long size, long offset, WindowStagger windowStagger) { if (Math.abs(offset) >= size) { throw new IllegalArgumentException( "TumblingEventTimeWindows parameters must satisfy abs(offset) < size"); } this.size = size; this.globalOffset = offset; this.windowStagger = windowStagger; }
WindowsAssigner 是什么,WindowsAssigner就是给定一条数据,根据定义的Assigner 把这条数据分配到某一个窗口。例如 TumblingEventTimeWindows,定义需要指定 窗口大小,窗口的offset。
public Collection<TimeWindow> assignWindows( Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { if (staggerOffset == null) { staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size); } // Long.MIN_VALUE is currently assigned when no timestamp is present long start = TimeWindow.getWindowStartWithOffset( timestamp, (globalOffset + staggerOffset) % size, size); return Collections.singletonList(new TimeWindow(start, start + size)); } else { throw new RuntimeException( "Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } } public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }
分配窗口其实很简单,就是 timestamp - (timestamp - offset + windowSize) % windowSize
假如当前时间是 10:30 用户定义的 windoSize 是 1h,没有指定offset ,窗口就是 [10:00,11:00) ,但是有特殊需求就想看[10:15,11:15) ,这时候就需要指定 offset 为 15min.
例如滑动窗口,一条数据可能属于多个窗口,所以这边返回是List [ Window ]
Trigger
Trigger 就是触发器,可以触发窗口进行计算。
TriggerResult
public enum TriggerResult { /** No action is taken on the window. */ CONTINUE(false, false), /** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */ FIRE_AND_PURGE(true, true), /** * On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged, * though, all elements are retained. */ FIRE(true, false), /** * All elements in the window are cleared and the window is discarded, without evaluating the * window function or emitting any elements. */ PURGE(false, true); // ------------------------------------------------------------------------ private final boolean fire; private final boolean purge; TriggerResult(boolean fire, boolean purge) { this.purge = purge; this.fire = fire; } public boolean isFire() { return fire; } public boolean isPurge() { return purge; } }
TriggerResult 触发器有几个状态,
CONTINUE :不做任何东西
FIRE:触发窗口计算,不清空状态
PURGE:不触发窗口计算,清空状态
FIRE_AND_PURGE:触发窗口计算,清空状态
EventTimeTrigger
public class EventTimeTrigger extends Trigger<Object, TimeWindow> { @Override public TriggerResult onElement( Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { // only register a timer if the watermark is not yet past the end of the merged window // this is in line with the logic in onElement(). If the watermark is past the end of // the window onElement() will fire and setting a timer here would fire the window twice. long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } }
onElement 是来一条数据调用一次,EventTimeTrigger 是获取数据时间和窗口时间最大值比较,如果超过了,触发窗口。
onEventTime 是当使用触发器上下文设置的事件时间计时器触发时调用,是事件时间计时器触发时时间和窗口时间最大值比较,如果超过了,触发窗口。
onProcessingTime 是当使用触发器上下文设置的处理时间计时器触发时调用
clear 是清除触发器上下文设置的事件时间计时器的时间
onMerge 是针对Session Window的方法
这边有一个细节就是:onElement 里面有一步,其实注册窗口触发的时间,目的是在Watermark超过这个时间的时候可以触发窗口。
ctx.registerEventTimeTimer(window.maxTimestamp());
Evictor
evictor 是删除器,在出发窗口计算的前后,针对iterator 进行删除数据
CountEvictor
public class CountEvictor<W extends Window> implements Evictor<Object, W> { private static final long serialVersionUID = 1L; private final long maxCount; private final boolean doEvictAfter; private CountEvictor(long count, boolean doEvictAfter) { this.maxCount = count; this.doEvictAfter = doEvictAfter; } private CountEvictor(long count) { this.maxCount = count; this.doEvictAfter = false; } @Override public void evictBefore( Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) { if (!doEvictAfter) { evict(elements, size, ctx); } } @Override public void evictAfter( Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) { if (doEvictAfter) { evict(elements, size, ctx); } } private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) { if (size <= maxCount) { return; } else { int evictedCount = 0; for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) { iterator.next(); evictedCount++; if (evictedCount > size - maxCount) { break; } else { iterator.remove(); } } } } /** * Creates a {@code CountEvictor} that keeps the given number of elements. Eviction is done * before the window function. * * @param maxCount The number of elements to keep in the pane. */ public static <W extends Window> CountEvictor<W> of(long maxCount) { return new CountEvictor<>(maxCount); } /** * Creates a {@code CountEvictor} that keeps the given number of elements in the pane Eviction * is done before/after the window function based on the value of doEvictAfter. * * @param maxCount The number of elements to keep in the pane. * @param doEvictAfter Whether to do eviction after the window function. */ public static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter) { return new CountEvictor<>(maxCount, doEvictAfter); } }
例如这个Flink CountEvictor ,只计算窗口的后N条数据。evictBefore 是计算前剔除,evictAfter 是计算后剔除。
WindowedStream
WindowedStream
public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) { this.input = input; this.builder = new WindowOperatorBuilder<>( windowAssigner, windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()), input.getExecutionConfig(), input.getType(), input.getKeySelector(), input.getKeyType()); } public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) { builder.trigger(trigger); return this; } public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) { outputTag = input.getExecutionEnvironment().clean(outputTag); builder.sideOutputLateData(outputTag); return this; } public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) { builder.evictor(evictor); return this; } public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T, ACC, V> aggregateFunction, ProcessWindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType, TypeInformation<V> aggregateResultType, TypeInformation<R> resultType) { checkNotNull(aggregateFunction, "aggregateFunction"); checkNotNull(windowFunction, "windowFunction"); checkNotNull(accumulatorType, "accumulatorType"); checkNotNull(aggregateResultType, "aggregateResultType"); checkNotNull(resultType, "resultType"); if (aggregateFunction instanceof RichFunction) { throw new UnsupportedOperationException( "This aggregate function cannot be a RichFunction."); } // clean the closures windowFunction = input.getExecutionEnvironment().clean(windowFunction); aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction); final String opName = builder.generateOperatorName(aggregateFunction, windowFunction); OneInputStreamOperator<T, R> operator = builder.aggregate(aggregateFunction, windowFunction, accumulatorType); return input.transform(opName, resultType, operator); } public <R> SingleOutputStreamOperator<R> process( ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { function = input.getExecutionEnvironment().clean(function); final String opName = builder.generateOperatorName(function, null); OneInputStreamOperator<T, R> operator = builder.process(function); return input.transform(opName, resultType, operator); }
WindowedStream 里面会构造一个 WindowOperatorBuilder 传入 assigner,trigger、sideOutputLateData、evictor。
当后面接 aggregate 、process 时就会构造 WindowOperator,如果存在 evictor 就会构造 EvictorWindowOperator。
构造WindowOperator
Aggregate方式的窗口
WindowOperatorBuilder#aggregate(AggregateFunction<T,ACC,V>, ProcessWindowFunction<V,R,K,W>, TypeInformation)
public <ACC, V, R> WindowOperator<K, T, ?, R, W> aggregate( AggregateFunction<T, ACC, V> aggregateFunction, ProcessWindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType) { Preconditions.checkNotNull(aggregateFunction, "AggregateFunction cannot be null"); Preconditions.checkNotNull(windowFunction, "ProcessWindowFunction cannot be null"); if (aggregateFunction instanceof RichFunction) { throw new UnsupportedOperationException( "This aggregate function cannot be a RichFunction."); } if (evictor != null) { return buildEvictingWindowOperator( new InternalAggregateProcessWindowFunction<>( aggregateFunction, windowFunction)); } else { AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>( WINDOW_STATE_NAME, aggregateFunction, accumulatorType.createSerializer(config)); return buildWindowOperator( stateDesc, new InternalSingleValueProcessWindowFunction<>(windowFunction)); } } private <R> WindowOperator<K, T, Iterable<T>, R, W> buildEvictingWindowOperator( InternalWindowFunction<Iterable<T>, R, K, W> function) { @SuppressWarnings({"unchecked", "rawtypes"}) TypeSerializer<StreamRecord<T>> streamRecordSerializer = (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(inputType.createSerializer(config)); ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>(WINDOW_STATE_NAME, streamRecordSerializer); return new EvictingWindowOperator<>( windowAssigner, windowAssigner.getWindowSerializer(config), keySelector, keyType.createSerializer(config), stateDesc, function, trigger, evictor, allowedLateness, lateDataOutputTag); } private <ACC, R> WindowOperator<K, T, ACC, R, W> buildWindowOperator( StateDescriptor<? extends AppendingState<T, ACC>, ?> stateDesc, InternalWindowFunction<ACC, R, K, W> function) { return new WindowOperator<>( windowAssigner, windowAssigner.getWindowSerializer(config), keySelector, keyType.createSerializer(config), stateDesc, function, trigger, allowedLateness, lateDataOutputTag); }
-
aggregate 对应两种 EvictingWindowOperator和 WindowOperator 。无论是哪种,都需要传入StateDescriptor,这个State 就存放窗口里面数据的状态。
-
如果是WindowOperator 就对应 AggregatingStateDescriptor,如果是 EvictingWindowOperator 则就是 ListStateDescriptor。其实很简单,EvictingWindowOperator需要对整个窗口里的数据进行处理判断是否剔除,如果使用的是 AggregatingStateDescriptor 就无法对聚合后的数据进行剔除。
Process方式的窗口
public <R> WindowOperator<K, T, ?, R, W> process(ProcessWindowFunction<T, R, K, W> function) { Preconditions.checkNotNull(function, "ProcessWindowFunction cannot be null"); return apply(new InternalIterableProcessWindowFunction<>(function)); } private <R> WindowOperator<K, T, ?, R, W> apply( InternalWindowFunction<Iterable<T>, R, K, W> function) { if (evictor != null) { return buildEvictingWindowOperator(function); } else { ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>( WINDOW_STATE_NAME, inputType.createSerializer(config)); return buildWindowOperator(stateDesc, function); } }
- Process 对应两种 EvictingWindowOperator和 WindowOperator 。无论是哪种,都是ListStateDescriptor,这个State 就存放窗口里面数据的状态。
- 就性能来说 AggregateState 肯定是优于 ListStateDescriptor。
窗口处理数据
WindowOperator 处理窗口数据
基于数据时间
WindowOperator
public void processElement(StreamRecord<IN> element) throws Exception { // 根据 windowAssigner 分配窗口 final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); // if element is handled by none of assigned elementWindows boolean isSkippedElement = true; final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { // merge 这边不展开说 ... } else { for (W window : elementWindows) { // 如果数据时间超过 (分配的窗口时间最大时间+允许迟到的时间) 就丢弃 // drop if the window is already late if (isWindowLate(window)) { continue; } isSkippedElement = false; // 这个 windowState 就是之前定义的ListState或者是AggreateState // 因为当前已经是KeyBy 或者是没有Keyed,每个窗口之间的数据都要进行隔离,需要设置namespace 实际上就是 key的作用 windowState.setCurrentNamespace(window); // 数据放入 状态里 windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; // triggerContext 拿到书记,判断是否触发窗口 TriggerResult triggerResult = triggerContext.onElement(element); // 如果是触发,把状态里的数据进行计算 // 调用我们自己写的ProcessWindowFunction if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); } // 如果是purge 就清理窗口 if (triggerResult.isPurge()) { windowState.clear(); } // 清理计时器 registerCleanupTimer(window); } } // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than // element timestamp // 迟到数据侧输出 if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null) { sideOutput(element); } else { this.numLateRecordsDropped.inc(); } } } // 调用我们自己写的ProcessWindowFunction private void emitWindowContents(W window, ACC contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); processContext.window = window; userFunction.process( triggerContext.key, window, processContext, contents, timestampedCollector); }
- 以上代码是基于事件驱动,如果不来数据窗口是没法触发的。
基于Watermark
InternalTimerServiceImpl#advanceWatermark
public void advanceWatermark(long time) throws Exception { currentWatermark = time; InternalTimer<K, N> timer; while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { eventTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer); } }
eventTimeTimersQueue 里面就是注册窗口最大的时间戳,这边进行比较,如果超过了就去触发注册的事件。
WindowOperator#onEventTime
@Override public void onEventTime(InternalTimer<K, W> timer) throws Exception { triggerContext.key = timer.getKey(); triggerContext.window = timer.getNamespace(); MergingWindowSet<W> mergingWindows; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(triggerContext.window); if (stateWindow == null) { // Timer firing for non-existent window, this can only happen if a // trigger did not clean up timers. We have already cleared the merging // window and therefore the Trigger state, however, so nothing to do. return; } else { windowState.setCurrentNamespace(stateWindow); } } else { windowState.setCurrentNamespace(triggerContext.window); mergingWindows = null; } TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp()); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents != null) { emitWindowContents(triggerContext.window, contents); } } if (triggerResult.isPurge()) { windowState.clear(); } if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { clearAllState(triggerContext.window, windowState, mergingWindows); } if (mergingWindows != null) { // need to make sure to update the merging state in state mergingWindows.persist(); } }
这里就是触发窗口进行计算
EvictingWindowOperator 处理窗口数据
EvictingWindowOperator#emitWindowContents
private void emitWindowContents( W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); // Work around type system restrictions... FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = FluentIterable.from(contents) .transform( new Function<StreamRecord<IN>, TimestampedValue<IN>>() { @Override public TimestampedValue<IN> apply(StreamRecord<IN> input) { return TimestampedValue.from(input); } }); evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); FluentIterable<IN> projectedContents = recordsWithTimestamp.transform( new Function<TimestampedValue<IN>, IN>() { @Override public IN apply(TimestampedValue<IN> input) { return input.getValue(); } }); processContext.window = triggerContext.window; userFunction.process( triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector); evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); // work around to fix FLINK-4369, remove the evicted elements from the windowState. // this is inefficient, but there is no other way to remove elements from ListState, which // is an AppendingState. windowState.clear(); for (TimestampedValue<IN> record : recordsWithTimestamp) { windowState.add(record.getStreamRecord()); } }
- 其实刚刚看了WIndowOperator 之后,加上 evictor 也比较简单,就是拿到状态里的窗口数据,在计算前后进行数据剔除就可以了
- 还有一个细节需要注意:就是剔除后会对窗口状态清除,将剔除后的数据放入状态中,也就是下次触发的时候就没有剔除后的数据了。
这篇关于Flink Windows的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-27消息中间件底层原理资料详解
- 2024-11-27RocketMQ底层原理资料详解:新手入门教程
- 2024-11-27MQ底层原理资料详解:新手入门教程
- 2024-11-27MQ项目开发资料入门教程
- 2024-11-27RocketMQ源码资料详解:新手入门教程
- 2024-11-27本地多文件上传简易教程
- 2024-11-26消息中间件源码剖析教程
- 2024-11-26JAVA语音识别项目资料的收集与应用
- 2024-11-26Java语音识别项目资料:入门级教程与实战指南
- 2024-11-26SpringAI:Java 开发的智能新利器