Spark SQL中将 DataFrame 转为 json 格式
2021/6/21 19:27:58
本文主要是介绍Spark SQL中将 DataFrame 转为 json 格式,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
今天主要介绍一下如何将 Spark dataframe 的数据转成 json 数据。用到的是 scala 提供的 json 处理的 api。
用过 Spark SQL 应该知道,Spark dataframe 本身有提供一个 api 可以供我们将数据转成一个 JsonArray,我们可以在 spark-shell 里头举个栗子来看一下。
def main(args: Array[String]): Unit = { import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local[*]").appName("test") .getOrCreate(); //提供隐式转换功能,比如将 Rdd 转为 dataframe import spark.implicits._ val df:DataFrame = spark.sparkContext.parallelize(Array(("abc",2),("efg",4))).toDF() df.show() /*-------------show ----------- +---+---+ | _1| _2| +---+---+ |abc| 2| |efg| 4| +---+---+ */ //这里使用 dataframe Api 转换成 jsonArray val jsonStr:String = df.toJSON.collectAsList.toString println(jsonStr) /*--------------- json String------------- [{"_1":"abc","_2":2}, {"_1":"efg","_2":4}] */ }
可以发现,我们可以使用 dataframe 提供的 api 直接将 dataframe 转换成 jsonArray 的形式,但这样子却有些冗余。以上面的例子来说,很多时候我要的不是这样的形式。
[{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]
而是下面这种形式。
[{"abc":2}, {"efg":4}]
这才是我们通常会使用到的 json 格式。以 dataframe 的 api 转换而成的 json 明显太过冗余。为此,我们需要借助一些 json 处理的包,本着能懒则懒的原则,直接使用 scala 提供的 json 处理包。
import org.apache.spark.sql.DataFrame import scala.util.parsing.json.{JSONArray, JSONObject} object DFTest { def main(args: Array[String]): Unit = { import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local[*]").appName("test") .getOrCreate(); //提供隐式转换功能,比如将 Rdd 转为 dataframe import spark.implicits._ val df:DataFrame = spark.sparkContext.parallelize(Array(("abc",2),("efg",4))).toDF() df.show() /*-------------show ----------- +---+---+ | _1| _2| +---+---+ |abc| 2| |efg| 4| +---+---+ */ //接下来不一样了 val df2Array:Array[Tuple2[String,Int]] = df.collect().map{ case org.apache.spark.sql.Row(x:String,y:Int) => (x,y)} val jsonData:Array[JSONObject] = df2Array.map{ i => new JSONObject(Map(i._1 -> i._2)) } val jsonArray:JSONArray = new JSONArray(jsonData.toList) println(jsonArray) /*-----------jsonArray------------ [{"abc" : 2}, {"efg" : 4}] */ } }
大概说明一下上述的代码,首先我们要先将 df 变量进行 collect 操作,将它转换成 Array ,但是要生成 jsonObject 得是 Array[Tuple2[T,T]] 的格式,所以我们需要再进一步转换成对应格式。这里的 map 是函数式编程里面的 map 。
然后也是用 map 操作生成 Array[JSONObject],最后再转换成 JSONArray 就可以。
将数据转换成 json 的格式通常不能太大,一般用在 spark 跑出数据结果后写入到其他数据库的时候会用到,比如 Mysql 。
这篇关于Spark SQL中将 DataFrame 转为 json 格式的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-21Vue3教程:新手入门到实践应用
- 2024-12-21VueRouter4教程:从入门到实践
- 2024-12-20Vue3项目实战:从入门到上手
- 2024-12-20Vue3项目实战:新手入门教程
- 2024-12-20VueRouter4项目实战:新手入门教程
- 2024-12-20如何实现JDBC和jsp的关系?-icode9专业技术文章分享
- 2024-12-20Vue项目中实现TagsView标签栏导航的简单教程
- 2024-12-20Vue3入门教程:从零开始搭建你的第一个Vue3项目
- 2024-12-20从零开始学习vueRouter4:基础教程
- 2024-12-20Vuex4课程:新手入门到上手实战全攻略