动手用 Java 实现一个 Master - Worker
Java 中提供了几种可以选择的线程池使用, 都是将线程放到线程里处理就行了,但是对于数据放回就显得有些无力。
Master-Worker 设计的核心思想为,Master进程负责接受任务和分配任务
Master-Worker 目的在于将一个大的任务分解成若干个小任务,并行执行,提供对系统的利用率。
Master
package com.fzb.worker;import java.util.HashMap;import java.util.Map;import java.util.Map.Entry;import java.util.Queue;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentLinkedQueue;public class Master {private Queue<Object> workQueue=new ConcurrentLinkedQueue<Object>();private Map<String,Thread> threadMap=new HashMap<String, Thread>();private Map<String,Object> resultMap=new ConcurrentHashMap<String, Object>();public Master(Worker worker,int countWorker){worker.setWorkQueue(workQueue);worker.setResultMap(getResultMap());for (int i = 0; i < countWorker; i++) {threadMap.put(Integer.toString(i), new Thread(worker,Integer.toString(i)));}}public void submit(Object job){workQueue.add(job);}public boolean isComplete(){for (Entry<String, Thread> entry : threadMap.entrySet()) {if(entry.getValue().getState()!=Thread.State.TERMINATED){return false;}}return true;}public Map<String,Object> getResultMap() {return resultMap;}public void execute(){for (Map.Entry<String, Thread> entry: threadMap.entrySet()) {entry.getValue().start();}}}
Worker
package com.fzb.worker;import java.util.Map;import java.util.Queue;public class Worker implements Runnable{private Queue<Object> workQueue;private Map<String,Object> resultMap;public void run() {while(true){Object obj=workQueue.poll();if(obj==null) break;Object re=handle(obj);resultMap.put(obj.toString(), re);}}public Object handle(Object obj){return obj;}public Queue<Object> getWorkQueue() {return workQueue;}public void setWorkQueue(Queue<Object> workQueue) {this.workQueue = workQueue;}public Map<String,Object> getResultMap() {return resultMap;}public void setResultMap(Map<String,Object> resultMap) {this.resultMap = resultMap;}}
上面2段代码完整展示了一个 Master-Worker 的全貌,在应用的使用只需要重载 Worker.handle() 进行了。
比如从 1-100 的3次方的相加的和
package com.fzb.worker;import java.util.Map;import java.util.Set;import com.fzb.worker.Master;import com.fzb.worker.Worker;public class PlusWorker extends Worker{@Overridepublic Object handle(Object obj) {Integer i = (Integer)obj;return i*i*i;}public static void main(String[] args) {long start=System.currentTimeMillis();Master master=new Master(new PlusWorker(), 5);for (int i = 1; i <= 100; i++) {master.submit(i);}master.execute();Map<String,Object> resultMap=master.getResultMap();int re = 0;while(resultMap.size()>0 || !master.isComplete()){Set<String> keys=resultMap.keySet();String key=null;for (String string : keys) {key=string;break;}Integer i=null;if(key!=null){i=(Integer) resultMap.get(key);}if(i!=null){re+=i;}if(key!=null){resultMap.remove(key);}}System.out.println(re);}}
在主函数里面创建了5个Worker工作线程。完成100个任务添加后,通过submit让线程跑起来,整个过程中无须等待所有的计算完成后就能进行结果的计算。
Master-Worker 通过在不停的监控Worker的状态来确定Master是否完成。
转载请注明作者和出处,并添加本页链接。
原文链接: //xiaochun.zrlog.com/241.html