spark streaming整合kafka中聚合类运算如何和kafka保持exactly once一致性语义(mysql方式,利用事务)
2022/4/6 2:19:24
本文主要是介绍spark streaming整合kafka中聚合类运算如何和kafka保持exactly once一致性语义(mysql方式,利用事务),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
/** * 从Kafka读取数据,实现ExactlyOnce,偏移量保存到MySQL中 * 1.将聚合好的数据,收集到Driver端, * 2.然后建计算好的数据和偏移量在一个事物中同时保存到MySQL中 * 3.成功了提交事物 * 4.失败了让这个任务重启 * * MySQL数据库中有两张表:保存计算好的结果、保存偏移量 */ object ExactlyOnceWordCountOffsetStoreInMySQL { def main(args: Array[String]): Unit = { //true a1 g1 ta,tb val Array(isLocal, appName, groupId, allTopics) = args val conf = new SparkConf() .setAppName(appName) if (isLocal.toBoolean) { conf.setMaster("local[*]") } //创建StreamingContext,并指定批次生成的时间 val ssc = new StreamingContext(conf, Milliseconds(5000)) //设置日志级别 ssc.sparkContext.setLogLevel("WARN") //SparkStreaming 跟kafka进行整合 //1.导入跟Kafka整合的依赖 //2.跟kafka整合,创建直连的DStream【使用底层的消费API,效率更高】 val topics = allTopics.split(",") //SparkSteaming跟kafka整合的参数 //kafka的消费者默认的参数就是每5秒钟自动提交偏移量到Kafka特殊的topic中: __consumer_offsets val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "group.id" -> groupId, "auto.offset.reset" -> "earliest" //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读 , "enable.auto.commit" -> (false: java.lang.Boolean) //消费者不自动提交偏移量 ) //在创建KafkaDStream之前要先读取MySQL数据库,查询历史偏移量,没有就从头读,有就接着读 //offsets: collection.Map[TopicPartition, Long] val offsets: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromMySQL(appName, groupId) //跟Kafka进行整合,需要引入跟Kafka整合的依赖 //createDirectStream更加高效,使用的是Kafka底层的消费API,消费者直接连接到Kafka的Leader分区进行消费 //直连方式,RDD的分区数量和Kafka的分区数量是一一对应的【数目一样】 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, //调度task到Kafka所在的节点 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets) //指定订阅Topic的规则 ) kafkaDStream.foreachRDD(rdd => { //判断当前批次的RDD是否有数据 if (!rdd.isEmpty()) { //获取RDD所有分区的偏移量 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //实现WordCount业务逻辑 val words: RDD[String] = rdd.flatMap(_.value().split(" ")) val wordsAndOne: RDD[(String, Int)] = words.map((_, 1)) val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey(_ + _) //将计算好的结果收集到Driver端再写入到MySQL中【保证数据和偏移量写入在一个事物中】 //触发Action,将数据收集到Driver段 val res: Array[(String, Int)] = reduced.collect() //创建一个MySQL的连接【在Driver端创建】 //默认MySQL自动提交事物 var connection: Connection = null var ps1: PreparedStatement = null var ps2: PreparedStatement = null try { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "123456") //不要自动提交事物 connection.setAutoCommit(false) ps1 = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = counts + ?") //将计算好的WordCount结果写入数据库表中,但是没有提交事物 for (tp <- res) { ps1.setString(1, tp._1) ps1.setLong(2, tp._2) ps1.setLong(3, tp._2) ps1.executeUpdate() //没有提交事物,不会讲数据真正写入到MySQL } //(app1_g001, wc_0) -> 1000 ps2 = connection.prepareStatement("INSERT INTO t_kafka_offset (app_gid, topic_partition, offset) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE offset = ?") //将偏移量写入到MySQL的另外一个表中,也没有提交事物 for (offsetRange <- offsetRanges) { //topic名称 val topic = offsetRange.topic //topic分区编号 val partition = offsetRange.partition //获取结束偏移量 val untilOffset = offsetRange.untilOffset //将结果写入MySQL ps2.setString(1, appName + "_" + groupId) ps2.setString(2, topic + "_" + partition) ps2.setLong(3, untilOffset) ps2.setLong(4, untilOffset) ps2.executeUpdate() } //提交事物 connection.commit() } catch { case e: Exception => { //回滚事物 connection.rollback() //让任务停掉 ssc.stop() } } finally { if(ps2 != null) { ps2.close() } if(ps1 != null) { ps1.close() } if(connection != null) { connection.close() } } } }) ssc.start() ssc.awaitTermination() } }
这篇关于spark streaming整合kafka中聚合类运算如何和kafka保持exactly once一致性语义(mysql方式,利用事务)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-06-25MySQL报错Duplicate entry '0' for key 'PRIMARY'
- 2024-05-29阿里 Canal 实时同步 MySQL 增量数据至 ClickHouse 数据库
- 2024-05-24在Linux下管理MySQL的大小写敏感性
- 2024-04-26MySQL查出时间比实际晚8小时的解决方案
- 2024-04-01JPA不识别MySQL的枚举类型
- 2024-03-30mysql数据库表卡死解决方法
- 2024-03-15MySQL多数据源笔记5-ShardingJDBC实战
- 2024-03-11natural join mysql
- 2024-03-11关于VS2017,VS2015 中利用 EF使用Mysql 不显示数据源问题解决方案
- 2024-02-26mysql 阿里云xb后缀备份文件恢复-icode9专业技术文章分享