动手用 Java 实现一个 Master - Worker
Java 中提供了几种可以选择的线程池使用, 都是将线程放到线程里处理就行了,但是对于数据放回就显得有些无力。
Master-Worker 设计的核心思想为,Master进程负责接受任务和分配任务
Master-Worker 目的在于将一个大的任务分解成若干个小任务,并行执行,提供对系统的利用率。
Master
package com.fzb.worker;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Master {
private Queue<Object> workQueue=new ConcurrentLinkedQueue<Object>();
private Map<String,Thread> threadMap=new HashMap<String, Thread>();
private Map<String,Object> resultMap=new ConcurrentHashMap<String, Object>();
public Master(Worker worker,int countWorker){
worker.setWorkQueue(workQueue);
worker.setResultMap(getResultMap());
for (int i = 0; i < countWorker; i++) {
threadMap.put(Integer.toString(i), new Thread(worker,Integer.toString(i)));
}
}
public void submit(Object job){
workQueue.add(job);
}
public boolean isComplete(){
for (Entry<String, Thread> entry : threadMap.entrySet()) {
if(entry.getValue().getState()!=Thread.State.TERMINATED){
return false;
}
}
return true;
}
public Map<String,Object> getResultMap() {
return resultMap;
}
public void execute(){
for (Map.Entry<String, Thread> entry: threadMap.entrySet()) {
entry.getValue().start();
}
}
}
Worker
package com.fzb.worker;
import java.util.Map;
import java.util.Queue;
public class Worker implements Runnable{
private Queue<Object> workQueue;
private Map<String,Object> resultMap;
public void run() {
while(true){
Object obj=workQueue.poll();
if(obj==null) break;
Object re=handle(obj);
resultMap.put(obj.toString(), re);
}
}
public Object handle(Object obj){
return obj;
}
public Queue<Object> getWorkQueue() {
return workQueue;
}
public void setWorkQueue(Queue<Object> workQueue) {
this.workQueue = workQueue;
}
public Map<String,Object> getResultMap() {
return resultMap;
}
public void setResultMap(Map<String,Object> resultMap) {
this.resultMap = resultMap;
}
}
上面2段代码完整展示了一个 Master-Worker 的全貌,在应用的使用只需要重载 Worker.handle() 进行了。
比如从 1-100 的3次方的相加的和
package com.fzb.worker;
import java.util.Map;
import java.util.Set;
import com.fzb.worker.Master;
import com.fzb.worker.Worker;
public class PlusWorker extends Worker{
@Override
public Object handle(Object obj) {
Integer i = (Integer)obj;
return i*i*i;
}
public static void main(String[] args) {
long start=System.currentTimeMillis();
Master master=new Master(new PlusWorker(), 5);
for (int i = 1; i <= 100; i++) {
master.submit(i);
}
master.execute();
Map<String,Object> resultMap=master.getResultMap();
int re = 0;
while(resultMap.size()>0 || !master.isComplete()){
Set<String> keys=resultMap.keySet();
String key=null;
for (String string : keys) {
key=string;
break;
}
Integer i=null;
if(key!=null){
i=(Integer) resultMap.get(key);
}
if(i!=null){
re+=i;
}
if(key!=null){
resultMap.remove(key);
}
}
System.out.println(re);
}
}
在主函数里面创建了5个Worker工作线程。完成100个任务添加后,通过submit
让线程跑起来,整个过程中无须等待所有的计算完成后就能进行结果的计算。
Master-Worker 通过在不停的监控Worker的状态来确定Master是否完成。
转载请注明作者和出处,并添加本页链接。
原文链接:
//xiaochun.zrlog.com/241.html