Spark—算子—spark缓存策略
2022/7/13 6:22:28
本文主要是介绍Spark—算子—spark缓存策略,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Spark—算子—spark缓存策略
转换算子和操作算子
转换算子
转换算子:将一个RDD转换成另一个RDD,转换算子是懒执行,需要action算子来触发执行
操作算子
触发任务执行,一个action算子会触发一次任务执行,同时每一个action算子都会触发前面的代码执行
package com.core.day2 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo16Action { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local") conf.setAppName("Demo16Action") val sc = new SparkContext(conf) val linesRDD: RDD[String] = sc.textFile("data/students.txt") /** * 转换算子:将一个RDD转换成另一个RDD,转换算子是懒执行,需要action算子来触发执行 * * 操作算子:触发任务执行,一个action算子会触发一次任务执行,同时每一个action算子都会 * 触发前面的代码执行 * * */ val studentRDD: RDD[(String, String, Int, String, String)] = linesRDD .map(_.split(",")) .map{ case Array(id:String,name:String,age:String,gender:String,clazz:String) => println("============================") (id,name,age.toInt,gender,clazz) } studentRDD.foreach(println) println("=================================") studentRDD.foreach(println) /** * action算子:action算子的返回值不一定是rdd,每一个action算子都会触发一个job任务执行 * foreach:循环rdd * saveAsTextFile:保存数据 * count:统计行数 * collect:将rdd转换成集合 * take:取top * reduce:全局聚合 * sum:求和,rdd必须可以求和 * */ //保存数据 studentRDD.saveAsTextFile("data/temp") /** * 将rdd转换成数组 * * 处理的数据量很大时,会导致内存益处 * */ val array: Array[(String, String, Int, String, String)] = studentRDD.collect() //取出top val top: Array[(String, String, Int, String, String)] = studentRDD.take(10) val stuRDD: RDD[Int] = studentRDD.map(s => 1) val reduce: Int = stuRDD.reduce((x, y) => x + y) val sum: Double = stuRDD.sum() println(reduce) println(sum) while(true){ } } }
Spark缓存策略
package com.core.day2 import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} object Demo17Cache { def main(args: Array[String]): Unit = { /** * 缓存 * */ val conf = new SparkConf() conf.setMaster("local") conf.setAppName("Demo16Action") val sc = new SparkContext(conf) //设置checkpoint保存路径 //sc.setCheckpointDir("data/checkpoint") //读取学生表数据 val linesRDD: RDD[String] = sc.textFile("data/students.txt") //整理取出字段 val mapRDD: RDD[Array[String]] = linesRDD.map(_.split(",")) val studentRDD: RDD[(String, String, Int, String, String)] = mapRDD.map { case Array(id: String, name: String, age: String, gender: String, clazz: String) => println("=======map============") (id, name, age.toInt, gender, clazz) } /** * 对多次使用的RDD进行缓存 */ //缓存在内存中 //studentRDD.cache() //studentRDD.persist(StorageLevel.MEMORY_AND_DISK_SER) //studentRDD.persist(StorageLevel.MEMORY_AND_DISK) studentRDD.persist(StorageLevel.MEMORY_AND_DISK_SER) /** * checkpoint:将RDD的数据缓存到活hdfs中,任务失败了,数据也不会丢失 * checkpoint: 主要是再spark streaming中使用,用来保证任务的高可用 * cache:将数据缓存,在spark执行的服务器的内存或者磁盘上,如果任务失败,数据也就没来 * */ //studentRDD.persist() // studentRDD.checkpoint() //1、统计班级人数 studentRDD .map { case (_, _, _, _, clazz: String) => (clazz, 1) } .reduceByKey(_ + _) .saveAsTextFile("data/clazz_num") println("=" * 100) //统计性别的人数 studentRDD .map { case (_, _, _, gender: String, _) => (gender, 1) } .reduceByKey(_ + _) .saveAsTextFile("data/gender_num") //统计年龄的人数 studentRDD .map { case (_, _, age: Int, _, _) => (age, 1) } .reduceByKey(_ + _) .saveAsTextFile("data/age_num") while (true) { } } }
这篇关于Spark—算子—spark缓存策略的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南