引言
通常,当你实现一个简单的并发应用程序,你实现一些Runnable对象和相应的 Thread对象。在你的程序中,你控制这些线程的创建、执行和状态。Java 5引入了Executor和ExecutorService接口及其实现类进行了改进(比如:ThreadPoolExecutor类)。
Java 7更进一步,包括一个面向特定问题的ExecutorService
接口的额外实现,它就是Fork/Join
框架。
fork/join框架是ExecutorService接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在于能够使用所有可用的运算能力来提升你的应用的性能。
类似于ExecutorService接口的其他实现,fork/join框架会将任务分发给线程池中的工作线程。fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。
fork/join框架的核心是ForkJoinPool
类,它是对AbstractExecutorService
类的扩展。ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务。
基本使用方法
使用fork/join框架的第一步是编写执行一部分工作的代码。你的代码结构看起来应该与下面所示的伪代码类似:
if (当前这个任务工作量足够小)
直接完成这个任务
else
将这个任务或这部分工作分解成两个部分
分别触发(invoke)这两个子任务的执行,并等待结果
你需要将这段代码包裹在一个ForkJoinTask的子类中。不过,通常情况下会使用一种更为具体的的类型,或者是RecursiveTask
(会返回一个结果),或者是RecursiveAction
。
当你的ForkJoinTask子类准备好了,创建一个代表所有需要完成工作的对象,然后将其作为参数传递给一个ForkJoinPool实例的invoke()方法即可。
demo演示:
package com.guonl.forkjoin;
import java.util.concurrent.RecursiveTask;
/**
* Created by guonl
* Date 2018/4/8 下午4:23
* Description: 启动一个计数线程
*/
public class CountTask extends RecursiveTask<Integer> {
private static int THRESHOLD = 3;
private int[] integers;
private int start;
private int end;
public CountTask(int[] integers, int start, int end) {
this.integers = integers;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int len = end - start + 1;
boolean isOverThreshold = len > THRESHOLD;
int sum = 0;
if (!isOverThreshold) {
for (int i = start; i <= end; i++) {
sum += integers[i];
}
} else {
int mid = (start + end) / 2;
CountTask leftTask = new CountTask(this.integers, start, mid);
CountTask rightTask = new CountTask(this.integers, mid + 1, end);
invokeAll(leftTask, rightTask);
//fork的作用是将当前任务放到workerThread里面去做
//invokeAll是将其中一个放在本线程做,其他的调用fork
int leftResult = leftTask.join();
int rightResult = rightTask.join();
sum = leftResult + rightResult;
}
return sum;
}
}
package com.guonl.forkjoin;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
public class ForkjoinTest {
public static void main(String[] args) {
int[] integers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
ForkJoinTask<Integer> task = new CountTask(integers, 0, 9);
int sum = forkJoinPool.invoke(task);
System.out.println("计算的结果为:" + sum);
}
}
输出结果:
计算的结果为:55
参考链接: