Fork-Join #
简介 #
从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。
这种思想和MapReduce很像(input –> split –> map –> reduce –> output)
- 主要有两步:
- 第一、任务切分;
- 第二、结果合并
它的模型大致是这样的:线程池中的每个线程都有自己的工作队列(PS:这一点和ThreadPoolExecutor不同,ThreadPoolExecutor是所有线程公用一个工作队列, 所有线程都从这个工作队列中取任务),当自己队列中的任务都完成以后,会从其它线程的工作队列中偷一个任务执行,这样可以充分利用资源。
工作窃取(work-stealing) #
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:
那么为什么需要使用工作窃取算法呢?
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
API介绍 #
ForkJoinPool #
ForkJoinPool与其它的ExecutorService区别主要在于它使用“工作窃取”:线程池中的所有线程都企图找到并执行提交给线程池的任务。当大量的任务产生子任务的时候,或者同时当有许多小任务被提交到线程池中的时候,这种处理是非常高效的。特别的,当在构造方法中设置asyncMode为true的时候这种处理更加高效。
ForkJoinTask #
ForkJoinTask代表运行在ForkJoinPool中的任务。
主要方法:
- fork() 在当前线程运行的线程池中安排一个异步执行。简单的理解就是再创建一个子任务。
- join() 当任务完成的时候返回计算结果。
- invoke() 开始执行任务,如果必要,等待计算完成。
子类:
- RecursiveAction 一个递归无结果的ForkJoinTask(没有返回值)
- RecursiveTask 一个递归有结果的ForkJoinTask(有返回值)
ForkJoinWorkerThread #
ForkJoinWorkerThread代表ForkJoinPool线程池中的一个执行任务的线程。
- 类图
代码分析 #
WorkQueue是一个ForkJoinPool中的内部类,它是线程池中线程的工作队列的一个封装,支持任务窃取。
什么叫线程的任务窃取呢?就是说你和你的一个伙伴一起吃水果,你的那份吃完了,他那份没吃完,那你就偷偷的拿了他的一些水果吃了。 存在执行2个任务的子线程,这里要讲成存在A,B两个个WorkQueue在执行任务,A的任务执行完了,B的任务没执行完, 那么A的WorkQueue就从B的WorkQueue的ForkJoinTask数组中拿走了一部分尾部的任务来执行,可以合理的提高运行和计算效率。
- submit()
可以看到:
- 1:同样是提交任务,submit会返回ForkJoinTask,而execute不会
- 2:任务提交给线程池以后,会将这个任务加入到当前提交者的任务队列中。
前面我们说过,每个线程都有一个WorkQueue,而WorkQueue中有执行任务的线程(ForkJoinWorkerThread owner),还有这个线程需要处理的任务(ForkJoinTask<?>[] array)。那么这个新提交的任务就是加到array中。
- ForkJoinWorkerThread
从代码中我们可以清楚地看到,ForkJoinWorkThread持有ForkJoinPool和ForkJoinPool.WorkQueue的引用,以表明该线程属于哪个线程池,它的工作队列是哪个
-
ForkJoinTask
-
fork()
可以看到,如果是ForkJoinWorkerThread运行过程中fork(),则直接加入到它的工作队列中,否则,重新提交任务。
可以看到它们都会等待计算完成
图形化处理过程 #
使用示例 #
- 仅仅打印
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
public class ForkJoinPoolDemo1 {
public static class SendMsgTask extends RecursiveAction {
private final int THRESHOLD = 10;
private int start;
private int end;
private List<String> list;
public SendMsgTask(int start, int end, List<String> list) {
this.start = start;
this.end = end;
this.list = list;
}
@Override
protected void compute() {
// 做什么
//什么都不做
if ((end - start) <= THRESHOLD) {
for (int i = start; i < end; i++) {
//仅仅打印
System.out.println(Thread.currentThread().getName() + ": " + list.get(i));
}
} else {
// 拆分
int middle = (start + end) / 2;
SendMsgTask left = new SendMsgTask(start, middle, list);
SendMsgTask right = new SendMsgTask(middle, end, list);
invokeAll(left, right);
}
}
}
public static void main(String[] args)throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 123; i++) {
list.add(String.valueOf(i + 1));
}
ForkJoinPool pool = new ForkJoinPool();
//都行 submit 和 execute 以及 invoke
// pool.submit(new SendMsgTask(0, list.size(), list));
// pool.execute(new SendMsgTask(0, list.size(), list));
pool.invoke(new SendMsgTask(0, list.size(), list)) ;
pool.awaitTermination(10, TimeUnit.SECONDS);
pool.shutdown();
}
}
- 求和
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
public class ForkJoinPoolDemo2 {
public static class SumTask extends RecursiveTask<Integer> {
private final int THRESHOLD = 10;
private int start;
private int end;
private List<Integer> list;
public SumTask(int start, int end, List<Integer> list) {
this.start = start;
this.end = end;
this.list = list;
}
@Override
protected Integer compute() {
// 做什么
//什么都不做
if ((end - start) <= THRESHOLD) {
// 直接求和
int sum = 0;
for (int i = start; i < end; i++) {
//仅仅打印
sum += this.list.get(i);
}
return sum;
} else {
// 拆分
int middle = (start + end) / 2;
SumTask left = new SumTask(start, middle, list);
SumTask right = new SumTask(middle, end, list);
invokeAll(left, right);
return left.join() + right.join();
}
}
}
public static void main(String[] args) throws Exception {
List<Integer> list = new ArrayList<>();
for (int i = 1; i <= 123; i++) {
list.add(i);
}
ForkJoinPool pool = new ForkJoinPool();
//都行 submit 和 execute 以及 invoke
SumTask sumTask = new SumTask(0, list.size(), list);
pool.submit(sumTask);
// pool.execute(sumTask);
// pool.invoke(sumTask);
System.out.println(sumTask.join());
pool.awaitTermination(10, TimeUnit.SECONDS);
pool.shutdown();
}
}
总结 #
- 执行方法
方法名 | 说明 例 |
---|---|
invoke(ForkJoinTask) | 提交任务并一直阻塞直到任务执行完成返回合并结果。 |
execute(ForkJoinTask) | 异步执行任务,无返回值 |
submit(ForkJoinTask) | 异步执行任务,返回task本身,可以通过task.get()方法获取合并之后的结果。 |
-
是否有返回值
-
1.如果有返回值就继承RecursiveTask,没有返回值就继承RecursiveAction
-
2.invoke同步调用,如果想要异步调用,可以使用pool.execute(…);替换invoke方法