寒假学习进度6

2021/12/31 23:07:27

本文主要是介绍寒假学习进度6,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

今天继续学习sparkRDD的算子

(1)flatMap

def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11")
  val sparkContext = new SparkContext(sparkConf)

  val rdd: RDD[List[Int]]= sparkContext.makeRDD(List(List(1, 2), List(3, 4)))
  //flatmap,讲List变成Int
  //使用flatmap进行扁平化处理,将List集合里数据进行拆分
  val flatrdd: RDD[Int] = rdd.flatMap(
    list => {
      list //讲拆分的数据进行封装成一个LIst
    }
  )
  flatrdd.collect().foreach(println)
  sparkContext.stop()
}
def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11")
  val sparkContext = new SparkContext(sparkConf)

  val rdd: RDD[String]= sparkContext.makeRDD(List("hello word","hello spark"))
  //flatmap
  //使用flatmap进行扁平化处理,将List集合里数据进行拆分,用空格做分隔符
  val flatrdd: RDD[String] = rdd.flatMap(
    s => {
      s.split(" ")
    }
  )
  flatrdd.collect().foreach(println)
  sparkContext.stop()
}
def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11")
  val sparkContext = new SparkContext(sparkConf)

  val rdd= sparkContext.makeRDD(List(List(1, 2), 3,List(4, 5)))
  //flatmap
  //因为list集合里类型不一致,所以使用模式匹配的方式,讲不是集合的封装成一个集合
  val flatrdd: RDD[Any] = rdd.flatMap(
    data => {
      data match {
        case list: List[_] => list
        case data => List(data)

      }
    }
  )
  flatrdd.collect().foreach(println)
  sparkContext.stop()
}
 

(2)glom

def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  val sparkContext = new SparkContext(sparkConf)
  //讲Int变成Array
  val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)

  val glomrdd: RDD[Array[Int]] = rdd.glom()

  glomrdd.collect().foreach(data=>println(data.mkString(",")))
  sparkContext.stop()
}
def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  val sparkContext = new SparkContext(sparkConf)
  //将Int变成Array
  val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)
  val glomrdd: RDD[Array[Int]] = rdd.glom()

  //将2个分区数组数据(Array)用map中的max求每个分区中最大值
  val maxRdd: RDD[Int] = glomrdd.map(
    array => {
      array.max
    }
  )

  //将maxRdd 2个分区数组采集求和
  println(maxRdd.collect().sum)
  sparkContext.stop()
}

(3)groupBy

def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  val sparkContext = new SparkContext(sparkConf)
  val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)
  def groupFunction(num:Int)={
    num%2
  }

  val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
  groupRDD.collect().foreach(println)
  sparkContext.stop()
}

def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  val sparkContext = new SparkContext(sparkConf)
  val rdd: RDD[String] = sparkContext.makeRDD(List("hello","spark","hi","sss"), 2)

  val grouprdd: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0))
  grouprdd.collect().foreach(println)

  sparkContext.stop()
}
def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  val sparkContext = new SparkContext(sparkConf)

  //读取apache.log文件
  val rdd=sparkContext.textFile("data/apache.log")

  //取数据中每小时的点击量
  val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
    line => {
      //将每行数据以空格为分割,分成多个字符串
      val data = line.split(" ")
      //取第4个字符串
      val time = data(3)

      //转换格式
      val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
      //解析time
      val datas= sdf.parse(time)
      //取“小时”字符
      val sdf1 = new SimpleDateFormat("HH")
      //格式化字符
      val hour = sdf1.format(datas)
      (hour, 1)//比如08小时出现一次计1个
    }
  ).groupBy(_._1)

  timeRDD.map{
        //模式匹配
    case (hour,iter)=>{
      (hour,iter.size)
    }
  }.collect().foreach(println)

  sparkContext.stop()
}

(4)filter

def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  val sc = new SparkContext(sparkConf)

  //filter,根据符合规则的数据筛选
  val rdd= sc.makeRDD(List(1,2,3,4), 2)
  val fliterrdd: RDD[Int] = rdd.filter(
    num => num % 2 != 0
  )
  fliterrdd.collect().foreach(println)
  sc.stop()
}
def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  val sc = new SparkContext(sparkConf)

  //filter,根据符合规则的数据筛选
  val rdd=sc.textFile("data/apache.log")
 rdd.filter(
   line=>{
     //将每行数据以空格为分割,分成多个字符串
     val data = line.split(" ")
     //取第4个字符串
     val time = data(3)
     time.startsWith("17/05/2015")
   }
 ).collect().foreach(println)

  sc.stop()
}

 



这篇关于寒假学习进度6的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程