SparkSQL 创建空dataframe

2021/12/11 2:19:26

本文主要是介绍SparkSQL 创建空dataframe,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.types.DateType

object APPUser {
  def main(args:Array[String]):Unit = {
    Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val spark = SparkSession
                .builder()
                .appName("SparkSessionAppUser")
                .master("local[2]")
                .getOrCreate()
    val linesRDD = spark
                    .sparkContext.textFile("./data/user_app.txt")
                    .repartition(1)
    val rowsRDD = linesRDD
                  .map{row => row.split(",")}
                  .map{cols =>
                    Row(cols(0),cols(1),cols(2).trim.toInt,cols(3).trim.toDouble,cols(4))}

    val schema = StructType(List(
                  StructField("serv_number",StringType,false),
                  StructField("unf_app_code",StringType,false),
                  StructField("click_cnt",IntegerType,false),
                  StructField("access_time",DoubleType,false),
                  StructField("statis_date",StringType,false)))

    val user_appDF = spark.createDataFrame(rowsRDD,schema)
    val user_app_indexDF = user_appDF
                            .withColumn("id",monotonically_increasing_id + 1)
    user_app_indexDF.createOrReplaceTempView("user_app")
    //user_app 日期类型转换并注册视图
    val user_app_index_dateDF = spark.sql("select id,serv_number,unf_app_code,click_cnt,access_time," +
                          "to_date(statis_date,'yyyyMMdd') as static_date from user_app")
    user_app_index_dateDF.createOrReplaceTempView("user_app")

    val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],schema)

    //spark.sql("select current_timestamp,date_format('2016-04-08', 'y'), date_add('2016-07-30', 1),date_sub('2016-07-30', 1),datediff('2009-07-31', '2009-07-30')").show()
    spark.stop()

  }

}


这篇关于SparkSQL 创建空dataframe的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程