网站域名商,上海哪家网站建设好,wordpress 点赞代码,中国专利技术开发公司官网Master-Worker
Master-Worker模式是常用的并行计算模式。它的核心思想是系统由两类进程协作工作#xff1a;Master进程和Worker进程Master负责接收和分配任务#xff0c;Worker负责处理子任务当各个Worker子进程处理完成后#xff0c;会将结果返回给Master#xff0c;由Ma…Master-Worker
Master-Worker模式是常用的并行计算模式。它的核心思想是系统由两类进程协作工作Master进程和Worker进程Master负责接收和分配任务Worker负责处理子任务当各个Worker子进程处理完成后会将结果返回给Master由Master做归纳和总结。 其好处是能将一个大任务分解成若干个小任务并行执行从而提高系统的吞吐量master接收来自client的任务请求将任务分发给不同的worker任务节点去执行任务再将最终的任务结果返回给客户端模拟如下客户端、Master和Workermaster里面用ConcurrentLinkedQueue盛放待处理的任务和HashMapstring,Thread盛放每个线程以及将每一个worker的执行结果存放在ConcurrentHashMap 中worker需要对任务队列和线程处理进行映射并且实现Runnable接口设立一个集合存放任务处理完的结果等处理完之后将结果集合返还到master的ConcurrentHashMap中再由Master将结果返回到客户端具体代码如下
Task.java
package com.example.core.masterworker;public class Task {private int id;private int count;public Task(){}public Task(int id,int count){this.id id;this.count count;}public int getId() {return id;}public void setId(int id) {this.id id;}public int getCount() {return count;}public void setCount(int count) {this.count count;}
}Master.java
package com.example.core.masterworker;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;public class Master {//1 承装任务的一个容器private ConcurrentLinkedQueueTask taskQueue new ConcurrentLinkedQueue();//2 承装worker执行器private HashMapString,Threadworkers new HashMap();//3 接受worker处理成功的结果集合private ConcurrentHashMapString,ObjectresultMap new ConcurrentHashMap();//4 构造方法里面要对worker进行一个初始化操作public Master(Worker worker,int workerCount) {//4.1 每一个worker 应该有master任务队列容器对引用worker.setTaskQueue(this.taskQueue);//4.2 每一个worker 应该有master结果集容器对的引用worker.setResultMap(this.resultMap);//4.3 将所有的worker进行初始化放入workers容器中for(int i0;iworkerCount;i){this.workers.put(Integer.toString(i),new Thread(worker));}}//5 需要一个提交任务的方法public void submit(Task task){this.taskQueue.add(task);}//6 需要一个真正Master所有worker进行工作的方法public void execute(){for(Map.EntryString,Threadme:this.workers.entrySet()){me.getValue().start();}}//7 需要一个统计的方法用于合并结果结合public int getResult(){int sum0;for(Map.EntryString,Objectme : resultMap.entrySet()){sum (Integer)me.getValue();}return sum;}//8判断是否所有的worker都完成了工作如果全部完成就返truepublic boolean isComplete(){for(Map.EntryString,Thread me : this.workers.entrySet()){if(me.getValue().getState() ! Thread.State.TERMINATED){return false;}}return true;}
}worker.java
package com.example.core.masterworker;import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;public class Worker implements Runnable{private ConcurrentLinkedQueueTask taskQueue;private ConcurrentHashMapString,Object resultMap;//设置任务集合public void setTaskQueue(ConcurrentLinkedQueueTasktaskQueue){this.taskQueue taskQueue;}//设置结果集合public void setResultMap(ConcurrentHashMapString,ObjectresultMap){this.resultMap resultMap;}Overridepublic void run(){while(true){Task task this.taskQueue.poll();if(task null){break;}try{Object result handle(task);this.resultMap.put(Integer.toString(task.getId()),result);}catch(Exception e){e.printStackTrace();}}}private Random r new Random();//实际做每一个工作private Object handle(Task task)throws Exception{//每一个任务的处理时间Thread.sleep(200);int ret task.getCount();return ret;}
}Main.java
package com.example.core.masterworker;import java.util.Random;public class Main {public static void main(String[] args) {System.out.println(线程数Runtime.getRuntime().availableProcessors());Master master new Master(new Worker(),Runtime.getRuntime().availableProcessors());Random r new Random();for(int i0;i100;i){Task t new Task(i,r.nextInt(1000));master.submit(t);}master.execute();long start System.currentTimeMillis();while(true){if(master.isComplete()){long end System.currentTimeMillis();int result master.getResult();System.out.println(最终结果为result,总耗时:(end-start));break;}}}
}
/*
output:
线程数12
最终结果为48834,总耗时:1819*/