算子map

2021/7/16 23:40:03

本文主要是介绍算子map,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

package sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo02Map {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("map").setMaster("local")

    val sc = new SparkContext(conf)

    /**
      * map:一行一行处理rdd的数据
      */

    /**
      * 构建RDD的方法
      * 1、读取文件
      * 2、基于Scala集合构建RDD
      */

    //基于Scal集合构建rdd
    val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))

    println("listRDD分区数据" + listRDD.getNumPartitions)

    val mapRDD: RDD[Int] = listRDD.map(i => i * 2)

    //打印RDD中的数据
    mapRDD.foreach(println)

    /**
      * mapValues;处理kv格式rdd的value
      */
    //转换成kv格式
    val kvRDD: RDD[(Int, Int)] = listRDD.map(i => (i, i))
    //对v进行*2
    val mapValuesRDD: RDD[(Int, Int)] = kvRDD.mapValues(i => i * 2)

    mapValuesRDD.foreach(println)

    /**
      * mapPartitions:一次处理一个分区的数据,返回值需要是一个迭代器
      * mapPartitionsWithIndex:uole一个下标
      *
      */
    val mapPartitionsRDD: RDD[Int] =listRDD.mapPartitions((iter:Iterator[Int])=>{
      val list: List[Int] = iter.toList
      list.map(i=>i*2).toIterator
    })
    mapPartitionsRDD.foreach(println)

    val mapPartitionsWithIndexRDD: RDD[Int] =listRDD.mapPartitionsWithIndex{
      case (index:Int,iter:Iterator[Int])=>
        println("当前分区编号:"+index)
        iter
    }
    mapPartitionsWithIndexRDD.foreach(println)

  }

}


这篇关于算子map的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程