Spark学习总结-Spark-Core
2021/6/6 18:52:39
本文主要是介绍Spark学习总结-Spark-Core,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Spark-Core
一 Driver和Executor通信
Driver相当于Client,Executor相当于Server
- Driver代码
package com.zxy.Socket import java.io.OutputStream import java.net.Socket object Driver { def main(args: Array[String]): Unit = { //连接服务器 val client: Socket = new Socket("localhost",9999) //发送数据 val out: OutputStream = client.getOutputStream out.write(2) out.flush() out.close() client.close() } }
- Executor代码
package com.zxy.Socket import java.io.InputStream import java.net.{ServerSocket, Socket} object Executor { def main(args: Array[String]): Unit = { //启动服务器,接受数据 val server: ServerSocket = new ServerSocket(9999) println("服务器启动,等待数据") //等待客户端连接接收数据 val client: Socket = server.accept() val in: InputStream = client.getInputStream val i: Int = in.read() println(s"接收到客户端数据 + ${i}") client.close() server.close() } }
先启动服务端Executor,等待数据
启动客户端Driver,建立连接发送数据
二 案例引入Spark三大数据结构
1 案例
修改以上案例,使用两个服务端Executor接收数据 将Task中的数据分开计算
-
Executor1
package com.zxy.Socket import java.io.{InputStream, ObjectInputStream} import java.net.{ServerSocket, Socket} object Executor1 { def main(args: Array[String]): Unit = { //启动服务器,接受数据 val server: ServerSocket = new ServerSocket(8888) println("服务器启动,等待数据") //等待客户端连接接收数据 val client: Socket = server.accept() val in: InputStream = client.getInputStream val TaskOBJ2: ObjectInputStream = new ObjectInputStream(in) val task: SubTask = TaskOBJ2.readObject().asInstanceOf[SubTask] val ints: List[Int] = task.computer() println(s"计算[8888]后的结果是: ${ints}") TaskOBJ2.close() client.close() server.close() } }
-
Executor2
package com.zxy.Socket import java.io.{InputStream, ObjectInputStream} import java.net.{ServerSocket, Socket} object Executor2 { def main(args: Array[String]): Unit = { //启动服务器,接受数据 val server: ServerSocket = new ServerSocket(9999) println("服务器启动,等待数据") //等待客户端连接接收数据 val client: Socket = server.accept() val in: InputStream = client.getInputStream val TaskOBJ1: ObjectInputStream = new ObjectInputStream(in) val task: SubTask = TaskOBJ1.readObject().asInstanceOf[SubTask] val ints: List[Int] = task.computer() println(s"计算[9999]后的结果是: ${ints}") TaskOBJ1.close() client.close() server.close() } }
-
Driver
package com.zxy.Socket import java.io.{ObjectOutputStream, OutputStream} import java.net.Socket object Driver { def main(args: Array[String]): Unit = { //连接服务器 val client1: Socket = new Socket("localhost",8888) val client2: Socket = new Socket("localhost",9999) val task: Task = new Task() //server1发送数据 val out1: OutputStream = client1.getOutputStream val TaskOBJ1: ObjectOutputStream = new ObjectOutputStream(out1) val subTask1 = new SubTask() subTask1.logic = task.logic subTask1.datas = task.datas.take(2) TaskOBJ1.writeObject(subTask1) TaskOBJ1.flush() TaskOBJ1.close() client1.close() //server2发送数据 val out2: OutputStream = client2.getOutputStream val TaskOBJ2: ObjectOutputStream = new ObjectOutputStream(out2) val subTask2 = new SubTask() subTask2.logic = task.logic subTask2.datas = task.datas.takeRight(2) TaskOBJ2.writeObject(subTask2) TaskOBJ2.flush() TaskOBJ2.close() client2.close() println("数据发送完毕") } }
-
Task
package com.zxy.Socket class Task extends Serializable { val datas = List(1,2,3,4) val logic:Int => Int = _ * 2 }
-
SubTask
package com.zxy.Socket class SubTask extends Serializable { //初始值 var datas:List[Int] = _ var logic:Int => Int = _ //计算 def computer()={ datas.map(logic) } }
-
执行效果
先启动Executor1,Executor2; 再启动Driver Executor1: 服务器启动,等待数据 计算[8888]后的结果是: List(2, 4) Executor2: 服务器启动,等待数据 计算[9999]后的结果是: List(6, 8) Driver: 数据发送完毕
2 Spark三大数据结构
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构, 用于处理不同的应用场景。三大数据结构分别是: > RDD: 弹性分布式数据集 > 累加器:分布式共享只写变量 > 广播变量:分布式共享只读变量
这篇关于Spark学习总结-Spark-Core的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-04el-table 开启定时器下,表格的选中状态会消失是什么原因-icode9专业技术文章分享
- 2024-10-03如何安装和初始化飞牛私有云 fnOS?-icode9专业技术文章分享
- 2024-10-03如何安装 App 并连接到飞牛 NAS?-icode9专业技术文章分享
- 2024-10-03如何安装飞牛 TV 并连接到影视服务器?-icode9专业技术文章分享
- 2024-10-03如何在PVE和ESXI上安装飞牛私有云 fnOS?-icode9专业技术文章分享
- 2024-10-03fnOS国产最强NAS安装系统异常情况处理-icode9专业技术文章分享
- 2024-10-03飞牛NAS如何创建存储空间?-icode9专业技术文章分享
- 2024-10-03fnOS国产最强NAS硬盘会自动休眠吗?-icode9专业技术文章分享
- 2024-10-03fnOS国产最强NAS如何安装飞牛影视和创建媒体库?-icode9专业技术文章分享
- 2024-10-03fnOS国产最强NAS如何为家人朋友开通影视账号?-icode9专业技术文章分享