动手用 Java 实现一个 Master - Worker

/

Java 中提供了几种可以选择的线程池使用, 都是将线程放到线程里处理就行了,但是对于数据放回就显得有些无力。

Master-Worker 设计的核心思想为,Master进程负责接受任务和分配任务

Master-Worker 目的在于将一个大的任务分解成若干个小任务,并行执行,提供对系统的利用率。

Master

  1. package com.fzb.worker;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.Map.Entry;
  5. import java.util.Queue;
  6. import java.util.concurrent.ConcurrentHashMap;
  7. import java.util.concurrent.ConcurrentLinkedQueue;
  8. public class Master {
  9. private Queue<Object> workQueue=new ConcurrentLinkedQueue<Object>();
  10. private Map<String,Thread> threadMap=new HashMap<String, Thread>();
  11. private Map<String,Object> resultMap=new ConcurrentHashMap<String, Object>();
  12. public Master(Worker worker,int countWorker){
  13. worker.setWorkQueue(workQueue);
  14. worker.setResultMap(getResultMap());
  15. for (int i = 0; i < countWorker; i++) {
  16. threadMap.put(Integer.toString(i), new Thread(worker,Integer.toString(i)));
  17. }
  18. }
  19. public void submit(Object job){
  20. workQueue.add(job);
  21. }
  22. public boolean isComplete(){
  23. for (Entry<String, Thread> entry : threadMap.entrySet()) {
  24. if(entry.getValue().getState()!=Thread.State.TERMINATED){
  25. return false;
  26. }
  27. }
  28. return true;
  29. }
  30. public Map<String,Object> getResultMap() {
  31. return resultMap;
  32. }
  33. public void execute(){
  34. for (Map.Entry<String, Thread> entry: threadMap.entrySet()) {
  35. entry.getValue().start();
  36. }
  37. }
  38. }

Worker

  1. package com.fzb.worker;
  2. import java.util.Map;
  3. import java.util.Queue;
  4. public class Worker implements Runnable{
  5. private Queue<Object> workQueue;
  6. private Map<String,Object> resultMap;
  7. public void run() {
  8. while(true){
  9. Object obj=workQueue.poll();
  10. if(obj==null) break;
  11. Object re=handle(obj);
  12. resultMap.put(obj.toString(), re);
  13. }
  14. }
  15. public Object handle(Object obj){
  16. return obj;
  17. }
  18. public Queue<Object> getWorkQueue() {
  19. return workQueue;
  20. }
  21. public void setWorkQueue(Queue<Object> workQueue) {
  22. this.workQueue = workQueue;
  23. }
  24. public Map<String,Object> getResultMap() {
  25. return resultMap;
  26. }
  27. public void setResultMap(Map<String,Object> resultMap) {
  28. this.resultMap = resultMap;
  29. }
  30. }

上面2段代码完整展示了一个 Master-Worker 的全貌,在应用的使用只需要重载 Worker.handle() 进行了。

比如从 1-100 的3次方的相加的和

  1. package com.fzb.worker;
  2. import java.util.Map;
  3. import java.util.Set;
  4. import com.fzb.worker.Master;
  5. import com.fzb.worker.Worker;
  6. public class PlusWorker extends Worker{
  7. @Override
  8. public Object handle(Object obj) {
  9. Integer i = (Integer)obj;
  10. return i*i*i;
  11. }
  12. public static void main(String[] args) {
  13. long start=System.currentTimeMillis();
  14. Master master=new Master(new PlusWorker(), 5);
  15. for (int i = 1; i <= 100; i++) {
  16. master.submit(i);
  17. }
  18. master.execute();
  19. Map<String,Object> resultMap=master.getResultMap();
  20. int re = 0;
  21. while(resultMap.size()>0 || !master.isComplete()){
  22. Set<String> keys=resultMap.keySet();
  23. String key=null;
  24. for (String string : keys) {
  25. key=string;
  26. break;
  27. }
  28. Integer i=null;
  29. if(key!=null){
  30. i=(Integer) resultMap.get(key);
  31. }
  32. if(i!=null){
  33. re+=i;
  34. }
  35. if(key!=null){
  36. resultMap.remove(key);
  37. }
  38. }
  39. System.out.println(re);
  40. }
  41. }

在主函数里面创建了5个Worker工作线程。完成100个任务添加后,通过submit让线程跑起来,整个过程中无须等待所有的计算完成后就能进行结果的计算。


Master-Worker 通过在不停的监控Worker的状态来确定Master是否完成。

转载请注明作者和出处,并添加本页链接。
原文链接: //xiaochun.zrlog.com/241.html