JDK7的新特性1 ForkJoinPool

JDK7的一个新特性:ForkJoinPool 很值得一说。

ForkJoinPool,首先上代码:

//计算 π 的值有一个通过多项式方法,即:π = 4 * (1 - 1/3 + 1/5 - 1/7 + 1/9 - ……),而且多项式的项数越多,计算出的 π 的值越精确
	static class PiEstimateTask extends RecursiveTask<Double> {
		private static final long serialVersionUID = 1L;
		private final long begin;
    private final long end;
    private final long threshold; // 分割任务的临界值

    public PiEstimateTask(long begin, long end, long threshold) {
        this.begin = begin;
        this.end = end;
        this.threshold = threshold;
    }

    @Override
    protected Double compute() {  // 实现 compute 方法
        if (end - begin <= threshold) {  // 临界值之下,不再分割,直接计算

            int sign; // 符号,多项式中偶数位取 1,奇数位取 -1(位置从 0 开始)
            double result = 0.0;
            for (long i = begin; i < end; i++) {
                sign = (i & 1) == 0 ? 1 : -1;
                result += sign / (i * 2.0 + 1);
            }
            System.out.println(Thread.currentThread()+ " From: "+ begin + " End: "+ end);
            return result * 4;
        }

        // 分割任务
        long middle = (begin + end) / 2;
        PiEstimateTask leftTask = new PiEstimateTask(begin, middle, threshold);
        PiEstimateTask rightTask = new PiEstimateTask(middle, end, threshold);

        leftTask.fork();  // 异步执行 leftTask
        rightTask.fork(); // 异步执行 rightTask

        double leftResult = leftTask.join();   // 阻塞,直到 leftTask 执行完毕返回结果
        double rightResult = rightTask.join(); // 阻塞,直到 rightTask 执行完毕返回结果

        return leftResult + rightResult; // 合并结果
    }

}
	
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ForkJoinPool pool = new ForkJoinPool();
		pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
		PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_00);
		pool.execute(task);
		System.out.println(task.get());
	}

从上面的代码中,我们可以看到ForkJoinPool,适用的一类业务: 把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

ForkJoinPool支持的任务大类有两种,一类是:Runnable ,类似于ExecutorService。一类是ForkJoinTask。

其中的ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。

代码分析

根据上面的示例代码,可以看出 fork() 和 join() 是 Fork/Join Framework “魔法”的关键。我们可以根据函数名假设一下 fork() 和 join() 的作用:

fork():开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。 join():等待该任务的处理线程处理完毕,获得返回值。

并不是每个 fork() 都会促成一个新线程被创建,而每个 join() 也不是一定会造成线程被阻塞。

具体的数据结构如下图:

数据结构

Fork/Join Framework 的实现算法并不是那么“显然”,而是一个更加复杂的算法——这个算法的名字就叫做 work stealing 算法。

ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)

每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。

每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。

在既没有自己的任务,也没有可以窃取的任务时,进入休眠。

fork() :

做的工作只有一件事,既是把任务推入当前工作线程的工作队列里。

join() :

工作则复杂得多,也是 join() 可以使得线程免于被阻塞的原因——不像同名的 Thread.join()。

  1. 检查调用 join() 的线程是否是 ForkJoinThread 线程。如果不是(例如 main 线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。
  2. 查看任务的完成状态,如果已经完成,直接返回结果。
  3. 如果任务尚未完成,但处于自己的工作队列内,则完成它。
  4. 如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲 join 的任务。
  5. 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务。 递归地执行第5步。

所谓work-stealing模式,即每个工作线程都会有自己的任务队列。当工作线程完成了自己所有的工作后,就会去“偷”别的工作线程的任务。

假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

submit():

其实除了前面介绍过的每个工作线程自己拥有的工作队列以外,ForkJoinPool 自身也拥有工作队列,这些工作队列的作用是用来接收由外部线程(非 ForkJoinThread 线程)提交过来的任务,而这些工作队列被称为 submitting queue 。

submit() 和 fork() 其实没有本质区别,只是提交对象变成了 submitting queue 而已(还有一些同步,初始化的操作)。submitting queue 和其他 work queue 一样,是工作线程”窃取“的对象,因此当其中的任务被一个工作线程成功窃取时,就意味着提交的任务真正开始进入执行阶段。

更加详细的研究:http://gee.cs.oswego.edu/dl/papers/fj.pdf

ForkJoinPool与ThreadPoolExecutor区别:

1.ForkJoinPool中的每个线程都会有一个队列,而ThreadPoolExecutor只有一个队列,并根据queue类型不同,细分出各种线程池

2.ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,ThreadPoolExecutor中根本没有什么父子关系任务

3.ForkJoinPool在使用过程中,会创建大量的子任务,会进行大量的gc,但是ThreadPoolExecutor不需要,因此单线程(或者任务分配平均)

4.ForkJoinPool在多任务,且任务分配不均是有优势,但是在单线程或者任务分配均匀的情况下,效率没有ThreadPoolExecutor高,毕竟要进行大量gc子任务

ForkJoinPool在多线程情况下,能够实现工作窃取(Work Stealing),在该线程池的每个线程中会维护一个队列来存放需要被执行的任务。当线程自身队列中的任务都执行完毕后,它会从别的线程中拿到未被执行的任务并帮助它执行。

ThreadPoolExecutor因为它其中的线程并不会关注每个任务之间任务量的差异。当执行任务量最小的任务的线程执行完毕后,它就会处于空闲的状态(Idle),等待任务量最大的任务执行完毕。

因此多任务在多线程中分配不均时,ForkJoinPool效率高。

stream中应用ForkJoinPool

 Arrays.asList("a1", "a2", "b1", "c2", "c1")
                .parallelStream()
                .filter(s -> {
                    System.out.format("filter: %s [%s]\n",
                            s, Thread.currentThread().getName());
                    return true;
                })
                .map(s -> {
                    System.out.format("map: %s [%s]\n",
                            s, Thread.currentThread().getName());
                    return s.toUpperCase();
                })
                .sorted((s1, s2) -> {
                    System.out.format("sort: %s <> %s [%s]\n",
                            s1, s2, Thread.currentThread().getName());
                    return s1.compareTo(s2);
                })
                .forEach(s -> System.out.format("forEach: %s [%s]\n",
                        s, Thread.currentThread().getName()));

parallelStream让部分Java代码自动地以并行的方式执行

最后:

有一点要注意,就是手动设置ForkJoinPool的线程数量时,实际线程数为设置的线程数+1,因为还有一个main主线程

即使将ForkJoinPool的通用线程池的线程数量设置为1,实际上也会有2个工作线程。因此线程数为1的ForkJoinPool通用线程池和线程数为2的ThreadPoolExecutor是等价的。

与ForkJoinPool对应的是CompletableFuture

Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。 阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果

CompletableFuture就是利用观察者设计模式当计算结果完成及时通知监听者 在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

Powered by andiHappy and Theme by AndiHappy