flink 代码编写建议

2021/12/2 23:13:09

本文主要是介绍flink 代码编写建议,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

目录
  • 概述
  • 注意明细项
    • Doris建表
    • 自定义SINK
    • 自定义各种函数
    • 流式任务配置化
    • hive on hbase
    • 一些不错的工具类

概述

本人主要分享日常工作过程中值得关切的 flink代码编写技巧。

注意明细项

Doris建表

以下字段必备
created_time datetime
modified_time datetime
is_deleted smallint(6) NULL COMMENT "是否删除,1是0否",
doris_delete tinyint(4) NULL DEFAULT "0" COMMENT "删除标记" 此字段是业务逻辑删除字段,当发现数据被物理删除时,使用此字段标记为1

alter table uc_student_bak enable feature "BATCH_DELETE"; 此处很关键标记表可以批量删除,这样当 merge标记为 delete时, doris_delete =1 的数据会被删除掉

自定义SINK

实现 RichSinkFunction 接口 , 各种不同的 sink 存储支持 最好都单独搞一个class 继承 RichSinkFunction 接口
附属贴上 sink doris代码

public class DorisSinkFunction extends RichSinkFunction<String> {

    private static final Logger log = LoggerFactory.getLogger(DorisSinkFunction.class);

    //累加器对象
    private final LongCounter counter = new LongCounter();

    private HttpClientBuilder builder;
    private String loadUrl;
    private String authorization;
    private String username;
    private String password;

    private String profile;
    private String mergeType;
    private String dbName;
    private String tableName;


    public static DorisSinkFunction of(String profile, String mergeType, String dbName, String tableName) {
        return new DorisSinkFunction(profile, mergeType, dbName, tableName);
    }

    private DorisSinkFunction(String profile, String mergeType, String dbName, String tableName) {
        this.profile = profile;
        this.mergeType = mergeType;
        this.dbName = dbName;
        this.tableName = tableName;
    }

    private String basicAuthHeader() {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }

    private HttpClientBuilder httpClientBuilder() {
        return HttpClients.custom()
                //添加重试策略
                .setRetryHandler(new HttpRequestRetryHandler() {
                    @Override
                    public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
                        //异常超过3次停止重试
                        if (executionCount > 3) {
                            return false;
                        }
                        try {
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            return false;
                        }
                        return true;
                    }
                })
                //支持重定向
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                });
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //获取配置
        final Properties props = PropertyUtils.getDorisProps(profile);
        this.username = props.getProperty("username");
        this.password = props.getProperty("password");

        //初始化authorization
        this.authorization = basicAuthHeader();

        //初始化http client builder
        this.builder = httpClientBuilder();

        //构建doris stream load url
        this.loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
                props.getProperty("feHost"),
                props.getProperty("httpPort"),
                dbName, tableName);

        //注册累加器
        getRuntimeContext().addAccumulator("counter", counter);
    }

    @Override
    public void close() throws Exception {
        log.warn(" write doris http client execute close, dt: {}", DateTimeUtils.getCurrentDt());
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        try (CloseableHttpClient client = this.builder.build()) {
            //创建put对象
            final HttpPut put = new HttpPut(loadUrl);
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, this.authorization);
            put.setHeader("strip_outer_array", "true");//批量插入传json数组必须要有此配置
            put.setHeader("format", "json");
            put.setHeader("label", UUID.randomUUID().toString().replace("-", ""));
            //添加doris batch delete header
            if ("MERGE".equalsIgnoreCase(mergeType)) {
                put.setHeader("merge_type", "MERGE");
                put.setHeader("delete", "doris_delete=1");
            }
            put.setEntity(new StringEntity(value, "UTF-8"));

            //execute
            try (final CloseableHttpResponse response = client.execute(put)) {
                String loadResult = null;
                String loadStatus = null;
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                    loadStatus = JSONObject.parseObject(loadResult).getString("Status");
                }
                final int statusCode = response.getStatusLine().getStatusCode();

                if (statusCode != 200 || !"Success".equalsIgnoreCase(loadStatus)) {
                    String msg = String.format(" stream_load_failed, statusCode=%s loadResult=%s", statusCode, loadResult);
                    log.error(msg);
                    throw new RuntimeException(msg);
                }
            } catch (Exception e) {
                log.error(" stream_load_to_doris execute error: {}", e);
                throw new RuntimeException(" stream_load_to_doris execute error. ", e);
            }
        } catch (Exception e) {
            log.error(" stream_load_to_doris invoke error: {}", e);
            throw new RuntimeException(" stream_load_to_doris invoke error. ", e);
        }

        //累加器自加
        counter.add(1);
    }
}

自定义各种函数

  1. 简单的 map转换 实现 MapFunction 或者 FlatMapFunction 接口 , 建议只要有业务含义,那怕这是就基本的字符串转换也要搞成单独的 class
    单独的class 中的构造函数 可以传入需要的参数 。 最常用场景 JSONObject 中设置必要的值
    MapFunction 主要方法如下 O map(T value) throws Exception; 可以发现该方法只支持最基本的一个元素到一个元素的转换
    FlatMapFunction 主要方法如下 void flatMap(T value, Collector out) throws Exception; 可以发现该方法支持一个参数 转换为 多个参数的转换。
    ProcessWindowFunction 应用于 keyed 的窗口,核心方法 process(KEY key, Context context, Iterable elements, Collector out) , 通过该方法就能知道他是基于 KEYBy之后的窗口函数。
    ProcessAllWindowFunction 应用于 non-keyed 的窗口函数 process(Context context, Iterable elements, Collector out) throws Exception 通过该方法知道是基于非KEYBY 的全局窗口。

流式任务配置化

专注于配置化解决任务运行问题
可以每个实时任务 一个 properties文件记录关键元数据,每个任务基于这些元数据进行任务运行;常用的properties格式如下:
job-config.properties 一般和运行脚本在一个目录

小节: 以下从指定的CK启动,则不会管 kafka source 的 StartFrom设置的是啥,只会从CK state中存储的 offset开始消费;
要设置env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);不然cancel后cp就清除了

# 运行环境 prod 生产  test 测试
profile=prod  
# 任务名称
jobName=dwd-rt-xx
# 消费组
groupId=flink_dwd_rt_xx
# 消费topic
sourceTopic=topic1
# sink topic
sinkTopic=topic2
# 待存储的表, 可能是 TIDB CK DORIS  HBASE ORACLE  MYSQL 等各种数据存储类型
sinkTable=table1
# 待存储的topic 
sinkTopic= topic1
# sink 支持的数据库类型
sink=doris,tidb
# 以下主要定义维表 join是 比如根据 b1 从维度表中获取对应的字段值,然后执行 然后执行 map merge 就是维表join的结果,最终map中key变更为 b2
xxx_mapping=a, b1->b2

以上任务配置完之后,需要运行flink 任务在没有实时计算平台的情况下, 最简单的办法就是搞个提交堡垒机 ,在该堡垒机上需要设置如下信息:

  1. 设置专门提交flink 任务的 租户
  2. 指定目录存储各种脚本
  3. 编写脚本运行
    脚本模板参考下面
    run-job.sh
    cp 代表从指定的 checkpoint开始启动
    sp 代表从指定的 savepoint 开始启动
    默认 start 适合第一次启动

以下脚本运行时可以把公共变量抽象到单独的脚本中去,比如 common_env.sh
然后在运行脚本中 source ../../common_env.sh 引入这个公共脚本。

#!/bin/bash

# flink 启动客户端地址
flink_bin=/xxx/flink-1.2.1-yarn/flink-1.12.1/bin/flink
# ck 地址 
cp_root_path=hdfs://xxxcluster/flink_realedw/cluster/checkpoints
# 提交yarn任务实时队列
queue=xxxqueue

# 任务名称
app_name="ods-gk-xxx"
# 实时任务 main-class
class_ref=cn.xxx.xxx.OdsGroupSummary
# 当前实时任务jar包地址
jar_dir=../xxx-1.0-SNAPSHOT-shaded.jar
# 当前作业依赖配置文件相对路径,真实任务中会解析该配置文件
config_path=job-config.properties
# 实时任务默认并行度
parallelism=1

#  grep JobID log.log | cut -d" " -f7 从flink启动日志中解析出  JobID
#  grep 'yarn application -kill' log.log |cut -d" " -f5  从启动日志中解析出  yarn-job-ID
#  --config-path $config_path 此参数对应业务配置文件的地址

case $1 in
"cancel"){
    echo "================  cancel flilk job  ================"
    yarn_app_id=`grep 'yarn application -kill' log.log |cut -d" " -f5`
    job_id=`grep JobID log.log | cut -d" " -f7`
    echo "LastAppId: $yarn_app_id   LastJobID: $job_id"

    $flink_bin cancel -m yarn-cluster -yid $yarn_app_id $job_id
};;
"cp"){
    echo "================  start from checkpoint  ================"
    job_id=`grep JobID log.log | cut -d" " -f7`
    cp_path=`hadoop fs -ls ${cp_root_path}/${job_id}/ | grep chk- | awk -F" " '{print$8}' |sort -nr |head -1`
    echo "LastJobID: $job_id   CheckpointPath: $cp_path"

    nohup $flink_bin run -d -t yarn-per-job \
    -Dyarn.application.queue=$queue \
    -Dyarn.application.name=$app_name \
    -p $parallelism \
    -c $class_ref \
    -s $cp_path \
    $jar_dir \
    --config-path $config_path >./log.log 2>&1 &
};;
"sp"){
    echo "================  start from savepoint  ================"
    echo "SavepointPath: $2"

    nohup $flink_bin run -d -t yarn-per-job \
    -Dyarn.application.queue=$queue \
    -Dyarn.application.name=$app_name \
    -p $parallelism \
    -c $class_ref \
    -s $2 \
    $jar_dir \
    --config-path $config_path >./log.log 2>&1 &
};;
*){
    echo "================  start  ================"
    nohup $flink_bin run -d -t yarn-per-job \
    -Dyarn.application.queue=$queue \
    -Dyarn.application.name=$app_name \
    -p $parallelism \
    -c $class_ref \
    $jar_dir \
    --config-path $config_path >./log.log 2>&1 &
};;
esac
# 将shell脚本进程ID写入  pid.pid 文件
echo $! > ./pid.pid

hive on hbase

对于HBASE的操作,可以使用建立外表基于 HBASE创建,通过对外表 hive的操作达到操作 HBASE的目的。

## hbase DDL
create 'rtdw:dwd_rt_dim_ecproductdb_product_xxx', 'cf'

## hive on hbase DDL
CREATE EXTERNAL TABLE dwd.dwd_rt_dim_ecproductdb_product_xxx_hb (
     key                         String,
     id                          String,
     product_id                  String
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    WITH SERDEPROPERTIES ("hbase.columns.mapping" =
        ":key,cf:id,cf:product_id")
TBLPROPERTIES ("hbase.table.name" = "rtdw:dwd_rt_dim_ecproductdb_product_xxx");

## hive sql
insert overwrite table dwd.dwd_rt_dim_ecproductdb_product_course_hb values('111','222','xxx1'),('222','333','xxx2')

## 以上通过hive插入之后,会发现最近效果显现在HBASE中,HBASE也有结果了

一些不错的工具类

TypeUtils.castToString TypeUtils 类中包含了很多强制将 Object 类型 转换为其他类型的方法。
拼装SQL中的各种类型信息可以直接参考 java.sql.Types 类下面定义了所有的类型信息。



这篇关于flink 代码编写建议的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程