Flink流处理-Task之BaseTask
2021/11/8 6:12:28
本文主要是介绍Flink流处理-Task之BaseTask,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
BaseTask
package pers.aishuang.flink.streaming.task; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.io.IOException; import java.util.Properties; import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS; /** * FLink读取kafka数据写入到HDFS中 * 开发步骤: * 1、读取本地配置文件 key-value * 2、抽象出来获取当前流环境 * 3、抽取读取kafka中的数据流 */ public abstract class BaseTask { //1. 读取本地配置文件 key-value //-- 设置参数,读取conf.properties配置文件 static ParameterTool parameterTool = null; static { try { parameterTool = ParameterTool.fromPropertiesFile( BaseTask.class.getClassLoader().getResourceAsStream("conf.properties") ); } catch (IOException e) { e.printStackTrace(); } } /** * 抽象出来获取当前流环境 * @param taskName * @return */ protected static StreamExecutionEnvironment getEnv(String taskName) { //1. 模拟当前用户root或hdfs读取hdfs集群 System.setProperty("HADOOP_USER_NAME", "root"); //2. 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //-- 设置当前任务的全局参数可见 env.getConfig().setGlobalJobParameters(parameterTool); //-- 设置并行度 env.setParallelism(1); //-- 设置流式数据的参照时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //3. 开启checkpoint功能,设置checkpoint env.enableCheckpointing(30*1000L); //-- 设置检查点模式:仅一次 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //-- 设置checkpoint容忍失败次数 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10); //-- 设置checkpoint最大并行度 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //-- 设置checkpoint最短间隔时间(设置如果有多个checkpoint,两个checkpoint之间为500毫秒) env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); //-- 设置checkpoint最大的超时时间 env.getCheckpointConfig().setCheckpointTimeout(60*1000L); //-- 设置取消任务时,保留checkpoint(默认会删除) env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION ); //-- 设置执行Job过程中,checkpoint失败时,job不失败 //env.getCheckpointConfig().setFailOnCheckpointingErrors(false); //-- 设置后端保存的位置:RocksDBStateBackend 内嵌的数据库,将state保存到数据库中,异步刷写到hdfs上 try { env.setStateBackend(new RocksDBStateBackend( parameterTool.get("hdfsUri") + "/flink-checkpoints" + taskName, true )); } catch (IOException e) { e.printStackTrace(); } //3. 开启重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, Time.seconds(10) )); //4. 返回设置好的执行环境 return env; } /** * 读取kafka中的数据,形成数据流 * 通过流执行环境、消费者组、反序列化的方式获取DataStreamSource对象 */ protected static <T> DataStreamSource<T> getKafkaStream(StreamExecutionEnvironment env, String groupid, Class<? extends DeserializationSchema> clazz){ //1. 创建消费者配置 Properties props = new Properties(); //-- 设置连接的服务端 props.setProperty( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, parameterTool.getRequired("bootstrap.servers") ); //-- 设置消费者组ID props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupid); //-- 设置分区自动发现 props.setProperty( KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, parameterTool.getRequired("key.partition.discovery.interval.millis") ); //2. 新建FlinkKafkaConsumer读取kafka FlinkKafkaConsumer<T> consumer = null; try { consumer = new FlinkKafkaConsumer<T>( parameterTool.getRequired("kafka.topic"), clazz.newInstance(), props ); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } //-- 设置kafka的offset提交给flink来管理 consumer.setCommitOffsetsOnCheckpoints(true); //-- 设置分区消费策略 consumer.setStartFromEarliest(); //3. 加载数据源 DataStreamSource<T> source = env.addSource(consumer); //4. 返回数据流 return source; } /** * 将数据流写入到HDFS */ public static StreamingFileSink<String> getSink( String prefix, String suffix, String path, String bucketAssignerFormat ){ //1. 输出文件配置 OutputFileConfig fileConfig = OutputFileConfig.builder() .withPartPrefix(prefix) .withPartSuffix(suffix) .build(); //2. 创建流文件终端 StreamingFileSink<String> fileSink = StreamingFileSink .forRowFormat( new Path(parameterTool.getRequired("hdfsUri") + "/apps/hive/warehouse/ods.db/" + path), new SimpleStringEncoder<String>("utf-8") ).withBucketAssigner( new DateTimeBucketAssigner<>(bucketAssignerFormat) ).withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(10000L) .withInactivityInterval(3000L) .withMaxPartSize(64 * 1024 * 1024) .build() ).withOutputFileConfig(fileConfig) .build(); //3. 返回流数据终端 return fileSink; } }
这篇关于Flink流处理-Task之BaseTask的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南