flink sql upsert kafka对于changelogNormalize state解读
2021/7/21 2:05:59
本文主要是介绍flink sql upsert kafka对于changelogNormalize state解读,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
flink sql upsert kafka对于changelogNormalize state解读
原文:https://www.jianshu.com/p/5ffe5aa0dc59
这里说一点:
- flink sql - upsert kafka 去重并非在kafka-connector中实现,而是在这个
DeduplicateFunctionBase
父类中的ValueState
进行keyby状态去重的,因此为何upsert-kafka需要在kafka的message中带有key;
/** * Base class for deduplicate function. * * @param <T> Type of the value in the state. * @param <K> Type of the key. * @param <IN> Type of the input elements. * @param <OUT> Type of the returned elements. */ abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends KeyedProcessFunction<K, IN, OUT> { private static final long serialVersionUID = 1L; // the TypeInformation of the values in the state. protected final TypeInformation<T> typeInfo; protected final long stateRetentionTime; protected final TypeSerializer<OUT> serializer; // state stores previous message under the key. protected ValueState<T> state; public DeduplicateFunctionBase( TypeInformation<T> typeInfo, TypeSerializer<OUT> serializer, long stateRetentionTime) { this.typeInfo = typeInfo; this.stateRetentionTime = stateRetentionTime; this.serializer = serializer; } @Override public void open(Configuration configure) throws Exception { super.open(configure); ValueStateDescriptor<T> stateDesc = new ValueStateDescriptor<>("deduplicate-state", typeInfo); StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime); if (ttlConfig.isEnabled()) { stateDesc.enableTimeToLive(ttlConfig); } state = getRuntimeContext().getState(stateDesc); } }
state进行deduplicate具体实现:
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper
/** * Processes element to deduplicate on keys with process time semantic, sends current element as * last row, retracts previous element if needed. * * @param currentRow latest row received by deduplicate function * @param generateUpdateBefore whether need to send UPDATE_BEFORE message for updates * @param state state of function, null if generateUpdateBefore is false * @param out underlying collector */ static void processLastRowOnProcTime( RowData currentRow, boolean generateUpdateBefore, boolean generateInsert, ValueState<RowData> state, Collector<RowData> out) throws Exception { checkInsertOnly(currentRow); if (generateUpdateBefore || generateInsert) { // use state to keep the previous row content if we need to generate UPDATE_BEFORE // or use to distinguish the first row, if we need to generate INSERT RowData preRow = state.value(); state.update(currentRow); if (preRow == null) { // the first row, send INSERT message currentRow.setRowKind(RowKind.INSERT); out.collect(currentRow); } else { if (generateUpdateBefore) { preRow.setRowKind(RowKind.UPDATE_BEFORE); out.collect(preRow); } currentRow.setRowKind(RowKind.UPDATE_AFTER); out.collect(currentRow); } } else { // always send UPDATE_AFTER if INSERT is not needed currentRow.setRowKind(RowKind.UPDATE_AFTER); out.collect(currentRow); } }
这篇关于flink sql upsert kafka对于changelogNormalize state解读的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-06-30uniAPP 实现全屏左右滚动滚动的效果-icode9专业技术文章分享
- 2024-06-30如何在本地使用授权或插件-icode9专业技术文章分享
- 2024-06-30伪静态规则配置方法汇总-icode9专业技术文章分享
- 2024-06-29易优CMS安装常见问题汇总-icode9专业技术文章分享
- 2024-06-28易优新手必读安装教程-icode9专业技术文章分享
- 2024-06-28忘记eyoucms后台密码怎么办?-icode9专业技术文章分享
- 2024-06-26终极指南:Scrum中如何设置需求优先级
- 2024-06-26AI大模型企业应用实战(25)-为Langchain Agent添加记忆功能
- 2024-06-26小白家庭 nas 搭建方案-icode9专业技术文章分享
- 2024-06-23AI大模型企业应用实战(14)-langchain的Embedding