Spark解决SQL和RDDjoin结果不一致问题(工作实录)
2021/10/21 19:12:56
本文主要是介绍Spark解决SQL和RDDjoin结果不一致问题(工作实录),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
问题描述:DataFrame的join结果不正确,dataframeA(6000无重复条数据) join dataframeB(220条无重复数据,由dataframeA转化而来,key值均源于dataframeA) 只有200条数据,丢了20条
问题验证:
1,查询丢的20条数据,均无异常,不存在Null,数据不存在空格
2,重新运行算法,丢18条数据,证明丢数据存在一定随机性
3,简化问题到最简模式,代码如下:
val xxx1= phySiteEvaluationPhySiteKey.select("physitekey").distinct() val xxx2= physitefinal.select("physitekey").distinct() val xxx3 = xxx1.join(xxx2, Seq("physitekey")) val rdd1=xxx1.rdd.map(r=>r.getAs[String]("physitekey")).map(r=>(r,r)) val rdd2 =xxx2.rdd.map(r=>r.getAs[String]("physitekey")).map(r=>(r,r)) val rdd3=rdd1.join(rdd2) log.info(s"rdd3=${rdd3.count()}") log.info(s"xxx3==${xxx3.count()}")
xxx3和rdd3的结果居然不相等!!违背了spark常识
问题分析:
1,据spark原理可知,DataFrame的底层实现就是RDD,具体实现在Catalyst包类,需要DataFrame=>未解析的逻辑执行计划=>解析逻辑计划=>优化逻辑执行计划=>物理执行计划=>RDD执行
也就是说xxx3的执行计划生成出的RDD执行方案与RDD3结果不一致,因此在这里我打印了xxx3的执行计划,期望有所发现
xxx1.join(xxx2, Seq("physitekey")).explain()
执行计划长达1000多行,涉及内部实现因项目保密需要无法展示。
2,执行计划超长是因为phySiteEvaluationPhySiteKey、physitefinal均为迭代计算结果,不是直接来源于输入表
3,依据执行计划,我猜测Spark在逻辑计划优化的时候出错,导致结果不符合预期
4,验证方案:为xxx1、xxx2的取值加上checkpoint,斩断血缘依赖,重新查看执行计划是否符合预期
val xxx1= phySiteEvaluationPhySiteKey.select("physitekey").distinct().checkpoint() val xxx2= physitefinal.select("physitekey").distinct().checkpoint() xxx1.join(xxx2, Seq("physitekey")).explain() val xxx3 = xxx1.join(xxx2, Seq("physitekey")) val rdd1=xxx1.rdd.map(r=>r.getAs[String]("physitekey")).map(r=>(r,r)) val rdd2 =xxx2.rdd.map(r=>r.getAs[String]("physitekey")).map(r=>(r,r)) val rdd3=rdd1.join(rdd2) log.info(s"rdd3=${rdd3.count()}") log.info(s"xxx3==${xxx3.count()}")
结果执行计划如下:
== Physical Plan == *Project [physitekey#1648] +- *SortMergeJoin [physitekey#1648], [physitekey#43875], Inner :- *Sort [physitekey#1648 ASC NULLS FIRST], false, 0 : +- Exchange(coordinator id: 1135069612) hashpartitioning(physitekey#1648, 200), coordinator[target post-shuffle partition size: 67108864] : +- *Filter isnotnull(physitekey#1648) : +- Scan ExistingRDD[physitekey#1648] +- *Sort [physitekey#43875 ASC NULLS FIRST], false, 0 +- Exchange(coordinator id: 1135069612) hashpartitioning(physitekey#43875, 200), coordinator[target post-shuffle partition size: 67108864] +- *Filter isnotnull(physitekey#43875) +- Scan ExistingRDD[physitekey#43875]
没有问题,RDD3与XXX3结果相等,正确了。
确认问题出在Spark中DataFrame在持有超长血缘关系时转化为RDD执行出错,具体错误有机会下次分析,应当是仅在一定特殊情况下才会暴露的BUG
5、问题反思
开源组件也是可能存在BUG的,应当在使用时尽量使用其最常见的用法,列如在本问题中,如果迭代计算之后及时斩断血缘依赖,就不会出现问题
这篇关于Spark解决SQL和RDDjoin结果不一致问题(工作实录)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南