Skip to content
清晨的一缕阳光
返回

Fork/Join 框架详解

Fork/Join 框架详解

Fork/Join 是 Java 7 引入的并行计算框架,适用于可分解的大规模计算任务。

一、核心概念

1.1 什么是 Fork/Join

Fork/Join = 分治算法 + 工作窃取

Fork(分): 将大任务分解为小任务
Join(合): 合并小任务的结果

特点:
- 适合 CPU 密集型任务
- 自动负载均衡(工作窃取)
- 递归分解任务

1.2 适用场景

// ✅ 适合:CPU 密集型、可分解
- 大规模数组求和
- 矩阵乘法
- 排序算法(归并排序、快速排序)
- 斐波那契数列计算
- 图像处理

// ❌ 不适合:IO 密集型、不可分解
- 文件读写
- 网络请求
- 数据库操作

二、核心组件

2.1 ForkJoinPool

// 创建 ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();

// 使用公共池(推荐)
ForkJoinPool.commonPool();

// 提交任务
ForkJoinTask<Integer> task = new SumTask(array, 0, array.length);
Integer result = pool.invoke(task);

// 异步执行
pool.submit(task);

2.2 ForkJoinTask

// 抽象基类
public abstract class ForkJoinTask<V> 
    implements Future<V>, Serializable {
    
    // 执行任务
    public abstract V get();
    
    // 强制完成
    public V join() {
        // ...
    }
    
    // 异步执行
    public void fork() {
        // ...
    }
}

2.3 RecursiveAction vs RecursiveTask

// RecursiveAction:无返回值
public class PrintTask extends RecursiveAction {
    @Override
    protected void compute() {
        // 执行任务,无返回值
    }
}

// RecursiveTask:有返回值
public class SumTask extends RecursiveTask<Integer> {
    @Override
    protected Integer compute() {
        // 执行任务,返回结果
        return result;
    }
}

三、工作窃取算法

3.1 算法原理

工作窃取(Work Stealing):

每个线程有自己的双端队列(Deque)

线程优先处理自己的任务(LIFO)

空闲线程从其他线程队列"窃取"任务(FIFO)

自动负载均衡

3.2 双端队列结构

// 每个线程的本地队列
class WorkQueue {
    ForkJoinTask[] array; // 任务数组
    int base;             // 队首(窃取端)
    int top;              // 队尾(推送端)
}

// 自己执行:从 top 取(LIFO)
task = array[--top];

// 窃取任务:从 base 取(FIFO)
task = array[base++];

3.3 性能优势

传统线程池:
- 任务队列是共享的
- 需要全局锁
- 竞争激烈

Fork/Join:
- 每个线程有自己的队列
- 无锁操作(自己的队列)
- 窃取时才竞争
- 性能提升 2-5 倍

四、实战案例

4.1 数组求和

public class SumTask extends RecursiveTask<Integer> {
    private final int[] array;
    private final int start;
    private final int end;
    private static final int THRESHOLD = 1000;
    
    public SumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Integer compute() {
        if (end - start <= THRESHOLD) {
            // 直接计算
            int sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            // 分解任务
            int mid = (start + end) / 2;
            SumTask left = new SumTask(array, start, mid);
            SumTask right = new SumTask(array, mid, end);
            
            left.fork();  // 异步执行
            int rightResult = right.compute(); // 同步执行
            int leftResult = left.join();      // 等待结果
            
            return leftResult + rightResult;
        }
    }
}

// 使用
int[] array = new int[1000000];
SumTask task = new SumTask(array, 0, array.length);
int result = ForkJoinPool.commonPool().invoke(task);

4.2 归并排序

public class MergeSortTask extends RecursiveAction {
    private final int[] array;
    private final int start;
    private final int end;
    private static final int THRESHOLD = 1000;
    
    @Override
    protected void compute() {
        if (end - start <= THRESHOLD) {
            Arrays.sort(array, start, end);
        } else {
            int mid = (start + end) / 2;
            MergeSortTask left = new MergeSortTask(array, start, mid);
            MergeSortTask right = new MergeSortTask(array, mid, end);
            
            left.fork();
            right.compute();
            left.join();
            
            merge(array, start, mid, end);
        }
    }
    
    private void merge(int[] array, int start, int mid, int end) {
        // 归并逻辑
    }
}

4.3 斐波那契数列

public class FibonacciTask extends RecursiveTask<Integer> {
    private final int n;
    
    public FibonacciTask(int n) {
        this.n = n;
    }
    
    @Override
    protected Integer compute() {
        if (n <= 1) {
            return n;
        } else if (n <= 20) {
            // 小问题直接计算
            return computeIterative(n);
        } else {
            FibonacciTask f1 = new FibonacciTask(n - 1);
            FibonacciTask f2 = new FibonacciTask(n - 2);
            
            f1.fork();
            int f2Result = f2.compute();
            int f1Result = f1.join();
            
            return f1Result + f2Result;
        }
    }
    
    private int computeIterative(int n) {
        if (n <= 1) return n;
        int a = 0, b = 1;
        for (int i = 2; i <= n; i++) {
            int temp = a + b;
            a = b;
            b = temp;
        }
        return b;
    }
}

五、并行流实现

5.1 并行流底层

// 并行流使用 Fork/Join 实现
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

// 串行流
list.stream().forEach(System.out::println);

// 并行流(使用 ForkJoinPool.commonPool())
list.parallelStream().forEach(System.out::println);

// 指定池
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() -> 
    list.parallelStream().forEach(System.out::println)
);

5.2 性能对比

int[] array = IntStream.range(0, 10000000).toArray();

// 串行求和
long start = System.currentTimeMillis();
int sum = Arrays.stream(array).sum();
// 耗时:80ms

// 并行求和
start = System.currentTimeMillis();
int parallelSum = Arrays.stream(array).parallel().sum();
// 耗时:25ms(3.2 倍提升)

5.3 并行流注意事项

// ❌ 错误:共享状态
List<Integer> results = new ArrayList<>();
list.parallelStream().forEach(results::add);
// 线程不安全

// ✅ 正确:使用收集器
List<Integer> results = list.parallelStream()
    .collect(Collectors.toList());

// ❌ 错误:阻塞操作
list.parallelStream().forEach(item -> {
    Thread.sleep(100); // IO 阻塞,浪费线程
});

// ✅ 正确:CPU 密集型操作
list.parallelStream().forEach(item -> {
    computeIntensive(item);
});

六、性能调优

6.1 阈值选择

// 阈值太小:任务分解过多,开销大
// 阈值太大:并行度不足

// 经验法则
// - 简单操作:1000-10000
// - 复杂操作:100-1000
// - 根据 CPU 核心数调整

int threshold = array.length / (Runtime.getRuntime().availableProcessors() * 4);

6.2 线程池大小

// 默认:CPU 核心数
ForkJoinPool.commonPool();

// 自定义
int parallelism = Runtime.getRuntime().availableProcessors();
ForkJoinPool pool = new ForkJoinPool(parallelism);

// CPU 密集型:parallelism = CPU 核心数
// IO 密集型:parallelism = CPU 核心数 * 2

6.3 性能监控

// 监控 ForkJoinPool
ForkJoinPool pool = ForkJoinPool.commonPool();

System.out.println("并行度:" + pool.getParallelism());
System.out.println("活跃线程:" + pool.getActiveThreadCount());
System.out.println("运行线程:" + pool.getRunningThreadCount());
System.out.println("队列大小:" + pool.getQueuedSubmissionCount());
System.out.println("窃取次数:" + pool.getStealCount());

七、总结

Fork/Join 核心要点:

组件作用特点
ForkJoinPool线程池工作窃取、自动负载均衡
ForkJoinTask任务基类可 fork/join
RecursiveAction无返回值任务compute() 返回 void
RecursiveTask有返回值任务compute() 返回 V
工作窃取负载均衡双端队列、LIFO/FIFO

Fork/Join 适合 CPU 密集型、可分解的大规模计算任务,并行流是其典型应用。


分享这篇文章到:

上一篇文章
微服务设计原则
下一篇文章
微服务核心概念