原创

第十二篇 : Fork/Join框架-工作窃取


一、什么是Fork/Join

Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成 若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总。

二、work-stealing(工作窃取模式)

Fork/Join采用“工作窃取模式”,当执行新的任务时他可以将其拆分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随即线程中偷一个并把它加入自己的队列中。 就比如两个CPU上有不同的任务,这时候A已经执行完,B还有任务等待执行,这时候A就会将B队尾的任务偷过来,加入自己的队列中,对于传统的线程,ForkJoin更有效的利用的CPU资源!

三、常用方法

ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:

RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。

四、实例

package com.gf.demo;


import org.junit.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

public class TestForkJoinPool {

    //自己写的拆分
    public static void main(String args[]){
        Instant start = Instant.now();

        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinSumCalculate( 0 , 1000000000L );
        Long sum = pool.invoke( task );

        Instant end = Instant.now();

        System.out.println(sum);

        System.out.println("耗时 : " + Duration.between( start , end ).toMillis());

    }

    //普通for循环
    @Test
    public void test1() {
        Instant start = Instant.now();

        long sum = 0;
        for (int i = 0 ; i <= 1000000000L ; i++) {
            sum += i;
        }

        Instant end = Instant.now();

        System.out.println(sum);

        System.out.println("耗时 : " + Duration.between( start , end ).toMillis());

    }

    //Java 8 的 ForkJoinPool
    @Test
    public void test2() {
        Instant start = Instant.now();
        Long sum = LongStream.rangeClosed( 0 , 1000000000L )
                             .parallel()
                             .reduce( 0L , (e1 , e2) -> {
                                 return e1 + e2;
                             } );
        Instant end = Instant.now();
        System.out.println(sum);
        System.out.println("耗时 : " + Duration.between( start , end ).toMillis());
    }

}

class ForkJoinSumCalculate extends RecursiveTask<Long> {

    private static final long serialVersionUID = -821634846883092881L;

    private long start;
    private long end;

    private static final long THURSHOLD = 10000000L;

    public ForkJoinSumCalculate(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long length = end - start;

        if (length <= THURSHOLD) {
            long sum = 0;
            for (long i = start ; i <= end ; i++) {
                sum += i;
            }
            return sum;
        } else {
            long middle = (start + end) / 2;

            ForkJoinSumCalculate left = new ForkJoinSumCalculate(start , middle);
            //进行拆分,同时压入线程队列
            left.fork();
            ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle + 1 , end);
            right.fork();

            return left.join() + right.join();
        }
    }
}

在使用 ForkJoin 时需要注意,如果任务的划分层次很深,一直得不到返回,那么可能出现两种情况:

  1. 系统内的线程数量越积越多,导致性能严重下降。
  2. 函数的调用层次变得很深,最终导致溢出。
juc
  • 作者:程序员果果
  • 发表时间:2018-11-09 09:38
  • 版权声明:自由转载-非商用-非衍生-保持署名 (创意共享4.0许可证)
  • 公众号转载:请在文末添加作者公众号二维码
  • 评论