meituan交互式系统浅析(3) sparkSQL数据倾斜解决
2021/6/16 19:24:38
本文主要是介绍meituan交互式系统浅析(3) sparkSQL数据倾斜解决,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
对于在开发过程中可能出现的数据倾斜问题,可提供一种利用双重group by的方法来解决。
分析:
可以使用类似于SparkCore中解决数据倾斜,提高的两阶段聚合(局部+全局) 局部——随机打散+前缀,通过groupBy完成局部统计 全局——去掉前缀,通过groupBy完成全局统计
object _05SparkSQLOptimizationOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local[2]").setAppName(s"${_05SparkSQLOptimizationOps.getClass.getSimpleName}") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //注册自定义的函数 sqlContext.udf.register[String, String, Int]("addRandomPrefix", (field, num) => addRandomPrefix(field, num)) sqlContext.udf.register[String, String]("removePrefix", field => removePrefix(field)) val df = sqlContext.read.text("E:/data/hello.log").toDF("line") // df.show() //sql的方式 df.registerTempTable("test") // groupByOps1(sqlContext) //1、添加前缀 sqlContext.sql("select " + "addRandomPrefix(w.word, 2) as p_word " + "from (" + "select " + "explode(split(line, ' ')) as word " + "from test" + ") w").show() //2、局部统计 sqlContext.sql("select " + "p.p_word," + "count(p.p_word) as p_count " + "from (" + "select " + "addRandomPrefix(w.word, 2) as p_word " + "from (" + "select " + "explode(split(line, ' ')) as word " + "from test" + ") w" + ") p " + "group by p.p_word").show() //3、干掉前缀 sqlContext.sql("select " + "removePrefix(p.p_word) as r_word," + "count(p.p_word) as r_count " + "from (" + "select " + "addRandomPrefix(w.word, 2) as p_word " + "from (" + "select " + "explode(split(line, ' ')) as word " + "from test" + ") w" + ") p " + "group by p.p_word").show() //4、全局统计 sqlContext.sql("select " + "r.r_word as field, " + "sum(r.r_count) as sum " + "from (" + "select " + "removePrefix(p.p_word) as r_word," + "count(p.p_word) as r_count " + "from (" + "select " + "addRandomPrefix(w.word, 2) as p_word " + "from (" + "select " + "explode(split(line, ' ')) as word " + "from test" + ") w" + ") p " + "group by p.p_word" + ") r " + "group by r.r_word").show() sc.stop() } private def groupByOps1(sqlContext: SQLContext) = { //拆分 sqlContext.sql("select explode(split(line, ' ')) as word from test") .registerTempTable("word_tmp") //添加前缀 sqlContext.sql("select addRandomPrefix(word, 2) as p_word from word_tmp") .registerTempTable("prefix_word_tmp") //局部聚合 sqlContext.sql("select p_word, count(p_word) as p_count from prefix_word_tmp group by p_word") .registerTempTable("prefix_count_word_tmp") //去掉前缀 sqlContext.sql("select removePrefix(p_word) as r_word, p_count as r_count from prefix_count_word_tmp") .registerTempTable("r_prefix_count_word_tmp") //全局聚合 sqlContext.sql("select r_word, sum(r_count) r_sum from r_prefix_count_word_tmp group by r_word").show() } /** * 添加随机前缀 * * @param field * @param num [0, num) * @return num_field */ def addRandomPrefix(field:String, num:Int):String = { val random = new Random() val prefix = random.nextInt(num) prefix + "_" + field } /** * 去掉随机前缀 * @param field * @return */ def removePrefix(field:String):String = field.split("_")(1) }
这篇关于meituan交互式系统浅析(3) sparkSQL数据倾斜解决的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-26Mybatis官方生成器资料详解与应用教程
- 2024-11-26Mybatis一级缓存资料详解与实战教程
- 2024-11-26Mybatis一级缓存资料详解:新手快速入门
- 2024-11-26SpringBoot3+JDK17搭建后端资料详尽教程
- 2024-11-26Springboot单体架构搭建资料:新手入门教程
- 2024-11-26Springboot单体架构搭建资料详解与实战教程
- 2024-11-26Springboot框架资料:新手入门教程
- 2024-11-26Springboot企业级开发资料入门教程
- 2024-11-26SpringBoot企业级开发资料详解与实战教程
- 2024-11-26Springboot微服务资料:新手入门全攻略