并发编程之Fork/join

2018/04/08 线程Thread

引言

通常,当你实现一个简单的并发应用程序,你实现一些Runnable对象和相应的 Thread对象。在你的程序中,你控制这些线程的创建、执行和状态。Java 5引入了Executor和ExecutorService接口及其实现类进行了改进(比如:ThreadPoolExecutor类)。 Java 7更进一步,包括一个面向特定问题的ExecutorService接口的额外实现,它就是Fork/Join框架。

fork/join框架是ExecutorService接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在于能够使用所有可用的运算能力来提升你的应用的性能。 类似于ExecutorService接口的其他实现,fork/join框架会将任务分发给线程池中的工作线程。fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。 fork/join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务。

基本使用方法

使用fork/join框架的第一步是编写执行一部分工作的代码。你的代码结构看起来应该与下面所示的伪代码类似:

if (当前这个任务工作量足够小)
   直接完成这个任务
else
   将这个任务或这部分工作分解成两个部分
   分别触发(invoke)这两个子任务的执行,并等待结果

你需要将这段代码包裹在一个ForkJoinTask的子类中。不过,通常情况下会使用一种更为具体的的类型,或者是RecursiveTask(会返回一个结果),或者是RecursiveAction

当你的ForkJoinTask子类准备好了,创建一个代表所有需要完成工作的对象,然后将其作为参数传递给一个ForkJoinPool实例的invoke()方法即可。

demo演示:

package com.guonl.forkjoin;

import java.util.concurrent.RecursiveTask;

/**
 * Created by guonl
 * Date 2018/4/8 下午4:23
 * Description: 启动一个计数线程
 */
public class CountTask extends RecursiveTask<Integer> {
    private static int THRESHOLD = 3;
    private int[] integers;
    private int start;
    private int end;

    public CountTask(int[] integers, int start, int end) {
        this.integers = integers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int len = end - start + 1;
        boolean isOverThreshold = len > THRESHOLD;
        int sum = 0;
        if (!isOverThreshold) {
            for (int i = start; i <= end; i++) {
                sum += integers[i];
            }
        } else {
            int mid = (start + end) / 2;
            CountTask leftTask = new CountTask(this.integers, start, mid);
            CountTask rightTask = new CountTask(this.integers, mid + 1, end);
            invokeAll(leftTask, rightTask);
            //fork的作用是将当前任务放到workerThread里面去做
            //invokeAll是将其中一个放在本线程做,其他的调用fork
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            sum = leftResult + rightResult;
        }
        return sum;
    }
}


package com.guonl.forkjoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

public class ForkjoinTest {
    public static void main(String[] args) {
        int[] integers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        ForkJoinTask<Integer> task = new CountTask(integers, 0, 9);
        int sum = forkJoinPool.invoke(task);
        System.out.println("计算的结果为:" + sum);
    }
}


输出结果:

计算的结果为:55

参考链接:

Show Disqus Comments

Search

    Post Directory