并发编程之Master-Worker模式
2020/2/22 17:03:07
本文主要是介绍并发编程之Master-Worker模式,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
我们知道,单个线程计算是串行的,只有等上一个任务结束之后,才能执行下一个任务,所以执行效率是比较低的。
那么,如果用多线程执行任务,就可以在单位时间内执行更多的任务,而Master-Worker就是多线程并行计算的一种实现方式。
它的思想是,启动两个进程协同工作:Master和Worker进程。
Master负责任务的接收和分配,Worker负责具体的子任务执行。每个Worker执行完任务之后把结果返回给Master,最后由Master汇总结果。(其实也是一种分而治之的思想,和forkjoin计算框架有相似之处,参看:并行任务计算框架forkjoin)
Master-Worker工作示意图如下:
下面用Master-Worker实现计算1-100的平方和,思路如下:
- 定义一个Task类用于存储每个任务的数据。
- Master生产固定个数的Worker,把所有worker存放在workers变量(map)中,Master需要存储所有任务的队列workqueue(ConcurrentLinkedQueue)和所有子任务返回的结果集resultMap(ConcurrentHashMap)。
- 每个Worker执行自己的子任务,然后把结果存放在resultMap中。
- Master汇总resultMap中的数据,然后返回给Client客户端。
- 为了扩展Worker的功能,用一个MyWorker继承Worker重写任务处理的具体方法。
Task类:
package com.thread.masterworker; public class Task { private int id; private String name; private int num; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } }复制代码
Master实现:
package com.thread.masterworker; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { //所有任务的队列 private ConcurrentLinkedQueue<Task> workerQueue = new ConcurrentLinkedQueue<Task>(); //所有worker private HashMap<String,Thread> workers = new HashMap<String,Thread>(); //共享变量,worker返回的结果 private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String,Object>(); //构造方法,初始化所有worker public Master(Worker worker,int workerCount){ worker.setWorkerQueue(this.workerQueue); worker.setResultMap(this.resultMap); for (int i = 0; i < workerCount; i++) { Thread t = new Thread(worker); this.workers.put("worker-"+i,t); } } //任务的提交 public void submit(Task task){ this.workerQueue.add(task); } //执行任务 public int execute(){ for (Map.Entry<String, Thread> entry : workers.entrySet()) { entry.getValue().start(); } //一直循环,直到结果返回 while (true){ if(isComplete()){ return getResult(); } } } //判断是否所有线程都已经执行完毕 public boolean isComplete(){ for (Map.Entry<String, Thread> entry : workers.entrySet()) { //只要有任意一个线程没有结束,就返回false if(entry.getValue().getState() != Thread.State.TERMINATED){ return false; } } return true; } //处理结果集返回最终结果 public int getResult(){ int res = 0; for (Map.Entry<String,Object> entry : resultMap.entrySet()) { res += (Integer) entry.getValue(); } return res; } }复制代码
父类Worker:
package com.thread.masterworker; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Worker implements Runnable { private ConcurrentLinkedQueue<Task> workerQueue; private ConcurrentHashMap<String,Object> resultMap; public void setWorkerQueue(ConcurrentLinkedQueue<Task> workerQueue) { this.workerQueue = workerQueue; } public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { this.resultMap = resultMap; } @Override public void run() { while(true){ //从任务队列中取出一个任务 Task task = workerQueue.poll(); if(task == null) break; //处理具体的任务 Object res = doTask(task); //把每次处理的结果放到结果集里面,此处直接把num值作为结果 resultMap.put(String.valueOf(task.getId()),res); } } public Object doTask(Task task) { return null; } } 复制代码
子类MyWorker继承父类Worker,重写doTask方法实现具体的逻辑:
package com.thread.masterworker; public class MyWorker extends Worker { @Override public Object doTask(Task task) { //暂停0.5秒,模拟任务处理 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } //计算数字的平方 int num = task.getNum(); return num * num; } } 复制代码
客户端Client:
package com.thread.masterworker; import java.util.Random; public class Client { public static void main(String[] args) { Master master = new Master(new MyWorker(), 10); //提交n个任务到任务队列里 for (int i = 0; i < 100; i++) { Task task = new Task(); task.setId(i); task.setName("任务"+i); task.setNum(i+1); master.submit(task); } //执行任务 long start = System.currentTimeMillis(); int res = master.execute(); long time = System.currentTimeMillis() - start; System.out.println("结果:"+res+",耗时:"+time); } } 复制代码
以上,我们用10个线程去执行子任务,最终由Master做计算求和(1-100的平方和)。每个线程暂停500ms,计算数字的平方值。
总共100个任务,分10个线程并行计算,相当于每个线程均分10个任务,一个任务的时间大概为500ms,故10个任务为5000ms,再加上计算平方值的时间,故稍大于5000ms。结果如下,
结果:338350,耗时:5084复制代码
这篇关于并发编程之Master-Worker模式的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-07-04TiDB 资源管控的对撞测试以及最佳实践架构
- 2024-07-03万字长文聊聊Web3的组成架构
- 2024-07-02springboot项目无法注册到nacos-icode9专业技术文章分享
- 2024-06-26结对编程到底难不难?答案在这里
- 2024-06-19《2023版Java工程师》课程升级公告
- 2024-06-15matplotlib作图不显示3D图,怎么办?
- 2024-06-1503-Loki 日志监控
- 2024-06-1504-让LLM理解知识 -Prompt
- 2024-06-05做软件测试需要懂代码吗?
- 2024-06-0514-ShardingSphere的分布式主键实现