spark-streaming
2021/6/27 23:24:29
本文主要是介绍spark-streaming,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} //二次运行会从头读,因为只有获取偏移量没有提交偏移量 object DemoOffset01 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) val kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "g00003", "auto.offset.reset" -> "earliest" , "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics: Array[String] =Array("test02") val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) kafkaDstream.foreachRDD(rdd=>{ //kafkaRDD实现了HasOffsetRanges的特质,只有kafkardd中有偏移量 val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (range <- offsetRange) { println(s"topic:{range.topic},partition:{range.partition},fromoffset:{range.fromoffset},utiloffset:{range.utiloffset}") } val res: RDD[String] = rdd.map(_.value()) res.foreach(println) }) ssc.start() ssc.awaitTermination() //对sparkstreaming编程就是对RDD进行编程 } }
升级版2.0:
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} //二次运行会从头读,因为只有获取偏移量没有提交偏移量 object DemoOffset01 { def main(args: Array[String]): Unit = { //val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]") val conf: SparkConf = new SparkConf().setAppName("AggregateOperator").setMaster("local").set("spark.testing.memory", "512000000") val ssc = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("WARN") val kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "g00003", "auto.offset.reset" -> "earliest" , "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics: Array[String] =Array("helloword") val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) //forearchRDD传入的函数在driver端被不停地周期地运行 kafkaDstream.foreachRDD(rdd=>{ if(!rdd.isEmpty()){ //kafkaRDD实现了HasOffsetRanges的特质,只有kafkardd中有偏移量 val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (range <- offsetRange) { println(s"topic:${range.topic},partition:${range.partition},fromoffset:${range.fromOffset},utiloffset:${range.untilOffset}") } //对数据进行处理 val res: RDD[String] = rdd.map(_.value()) res.foreach(println) //将偏移量提交到kafka特殊的topic__consumer_offsets中 val offsetres: Unit = kafkaDstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRange) } }) ssc.start() ssc.awaitTermination() //对sparkstreaming编程就是对RDD进行编程 } }
补充:
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} object CommitOffsetDemo02 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("AggregateOperator").setMaster("local").set("spark.testing.memory", "512000000") val ssc = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("WARN") val kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "g00004", "auto.offset.reset" -> "earliest" , "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics: Array[String] =Array("helloword") val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) //对 kafkaDstream进行transformation得到的是MapPARTITOONRDD,所以要想获取偏移量只能从第一手DirectKafkaInputDStream里获取 val lines: DStream[String] = kafkaDstream.map(_.value()) lines.foreachRDD(rdd=>{ val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (range <- offsetRanges) { println(s"topic:${range.topic},partition:${range.partition},fromoffset:${range.fromOffset},utiloffset:${range.untilOffset}") } rdd.foreach(x=>println(x)) }) } }
补充:
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} object CommitOffsetDemo02 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("AggregateOperator").setMaster("local").set("spark.testing.memory", "512000000") val ssc = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("WARN") val kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "g00004", "auto.offset.reset" -> "earliest" , "enable.auto.commit" -> (false: java.lang.Boolean)//在executor端提交 ) val topics: Array[String] =Array("helloword") val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) //DStream调用transformation会生成一个新的DStream,但是调用ation不会生成一个新的DStream //对 kafkaDstream进行transformation得到的是MapPARTITOONRDD,所以要想获取偏移量只能从第一手DirectKafkaInputDStream里获取 //kafkaclient(driver端)负责从topic中获取偏移量(决定了一个批次的客户端读多少数据),生成的tasks将会被序列化到executor里面的线程池,所以在executor中才读取kafka cluster中的数据,提交偏移量到__consumer_offsets这个topic中 val lines: DStream[String] = kafkaDstream.map(_.value()) lines.foreachRDD(rdd=>{ val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (range <- offsetRanges) { println(s"topic:${range.topic},partition:${range.partition},fromoffset:${range.fromOffset},utiloffset:${range.untilOffset}") } rdd.foreach(x=>println(x)) }) } }
import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} //从kafka中读取数据,完成聚合类操作,将偏移量和计算好的聚合类结果同时写入到mysql中,mysql支持事务,保证计算好的聚合结果和偏移量同时写入成功 object CommitOffsetDemo03 { def main(args: Array[String]): Unit = { val AppName: String =args(0) val groupid: String =args(1) //val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]") val conf: SparkConf = new SparkConf().setAppName(AppName).setMaster("local[*]").set("spark.testing.memory", "512000000") val ssc = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("WARN") val kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupid, "auto.offset.reset" -> "earliest" , "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics: Array[String] =Array("helloword") val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) //forearchRDD传入的函数在driver端被不停地周期地运行 kafkaDstream.foreachRDD(rdd=>{ if(!rdd.isEmpty()){ //kafkaRDD实现了HasOffsetRanges的特质,只有kafkardd中有偏移量 val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //对数据进行处理,调用RDD的transaction和action是在driver调用的,里面的恶函数是在executor调用的 val res: RDD[(String, Int)] = rdd.map(_.value()).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //将计算好的结果收集到driver端 val result: Array[(String, Int)] = res.collect() var connection:Connection = null var pstm1:PreparedStatement = null var pstm2:PreparedStatement= null try { //创建一个JDBC链接,导入jdbc的依赖 val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mq01?characterEncoding=utf_8", "root", "123456") //开启事务 connection.setAutoCommit(false) //分别将计算好的偏移量(driver)和计算好的结果(收集到driver端 )写入到mysql端,两者都在一个进程里,用一个链接开启一个事务把他俩全写进去 //创建preparestatement val pstm1: PreparedStatement = connection .prepareStatement("INSERT INTO t_wordcount(word,count) VALUES (?,?) ON DUPLICATE KEY UPDATE COUNT=COUNT +?;") for (tp <- result) { pstm1.setString(1,tp._1) pstm1.setInt(2,tp._2) pstm1.setInt(3,tp._2) //没确认的数据就是脏数据 pstm1.executeUpdate() } val pstm2: PreparedStatement =connection.prepareStatement("INSERT INTO t_kafuka_offset VALUES(?,?,?) ON DUPLICATE KEY UPDATE OFFSET=?;") for (range <- offsetRange) { val topic: String = range.topic val partition: Int = range.partition //无需获取fromoffset val offset: Long = range.untilOffset pstm2.setString(1,AppName+"_"+groupid) pstm2.setString(2,topic+"_"+partition) pstm2.setLong(3,offset) pstm2.setLong(4,offset) pstm2.executeUpdate() } //提交事务 connection.commit() } catch { case e:Exception => { connection.rollback() throw e //提交事务失败就要回滚事务 //停止程序 ssc.stop(true) } } finally { if(pstm1!=null){ pstm1.close() } if(pstm2!=null){ pstm2.close() } if(connection!=null){ connection.close() } } } }) ssc.start() ssc.awaitTermination() //对sparkstreaming编程就是对RDD进行编程 } }
这篇关于spark-streaming的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南