Fork/Join 框架详解
Fork/Join 是 Java 7 引入的并行计算框架,适用于可分解的大规模计算任务。
一、核心概念
1.1 什么是 Fork/Join
Fork/Join = 分治算法 + 工作窃取
Fork(分): 将大任务分解为小任务
Join(合): 合并小任务的结果
特点:
- 适合 CPU 密集型任务
- 自动负载均衡(工作窃取)
- 递归分解任务
1.2 适用场景
// ✅ 适合:CPU 密集型、可分解
- 大规模数组求和
- 矩阵乘法
- 排序算法(归并排序、快速排序)
- 斐波那契数列计算
- 图像处理
// ❌ 不适合:IO 密集型、不可分解
- 文件读写
- 网络请求
- 数据库操作
二、核心组件
2.1 ForkJoinPool
// 创建 ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
// 使用公共池(推荐)
ForkJoinPool.commonPool();
// 提交任务
ForkJoinTask<Integer> task = new SumTask(array, 0, array.length);
Integer result = pool.invoke(task);
// 异步执行
pool.submit(task);
2.2 ForkJoinTask
// 抽象基类
public abstract class ForkJoinTask<V>
implements Future<V>, Serializable {
// 执行任务
public abstract V get();
// 强制完成
public V join() {
// ...
}
// 异步执行
public void fork() {
// ...
}
}
2.3 RecursiveAction vs RecursiveTask
// RecursiveAction:无返回值
public class PrintTask extends RecursiveAction {
@Override
protected void compute() {
// 执行任务,无返回值
}
}
// RecursiveTask:有返回值
public class SumTask extends RecursiveTask<Integer> {
@Override
protected Integer compute() {
// 执行任务,返回结果
return result;
}
}
三、工作窃取算法
3.1 算法原理
工作窃取(Work Stealing):
每个线程有自己的双端队列(Deque)
↓
线程优先处理自己的任务(LIFO)
↓
空闲线程从其他线程队列"窃取"任务(FIFO)
↓
自动负载均衡
3.2 双端队列结构
// 每个线程的本地队列
class WorkQueue {
ForkJoinTask[] array; // 任务数组
int base; // 队首(窃取端)
int top; // 队尾(推送端)
}
// 自己执行:从 top 取(LIFO)
task = array[--top];
// 窃取任务:从 base 取(FIFO)
task = array[base++];
3.3 性能优势
传统线程池:
- 任务队列是共享的
- 需要全局锁
- 竞争激烈
Fork/Join:
- 每个线程有自己的队列
- 无锁操作(自己的队列)
- 窃取时才竞争
- 性能提升 2-5 倍
四、实战案例
4.1 数组求和
public class SumTask extends RecursiveTask<Integer> {
private final int[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 1000;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
// 直接计算
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// 分解任务
int mid = (start + end) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork(); // 异步执行
int rightResult = right.compute(); // 同步执行
int leftResult = left.join(); // 等待结果
return leftResult + rightResult;
}
}
}
// 使用
int[] array = new int[1000000];
SumTask task = new SumTask(array, 0, array.length);
int result = ForkJoinPool.commonPool().invoke(task);
4.2 归并排序
public class MergeSortTask extends RecursiveAction {
private final int[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 1000;
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
Arrays.sort(array, start, end);
} else {
int mid = (start + end) / 2;
MergeSortTask left = new MergeSortTask(array, start, mid);
MergeSortTask right = new MergeSortTask(array, mid, end);
left.fork();
right.compute();
left.join();
merge(array, start, mid, end);
}
}
private void merge(int[] array, int start, int mid, int end) {
// 归并逻辑
}
}
4.3 斐波那契数列
public class FibonacciTask extends RecursiveTask<Integer> {
private final int n;
public FibonacciTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
} else if (n <= 20) {
// 小问题直接计算
return computeIterative(n);
} else {
FibonacciTask f1 = new FibonacciTask(n - 1);
FibonacciTask f2 = new FibonacciTask(n - 2);
f1.fork();
int f2Result = f2.compute();
int f1Result = f1.join();
return f1Result + f2Result;
}
}
private int computeIterative(int n) {
if (n <= 1) return n;
int a = 0, b = 1;
for (int i = 2; i <= n; i++) {
int temp = a + b;
a = b;
b = temp;
}
return b;
}
}
五、并行流实现
5.1 并行流底层
// 并行流使用 Fork/Join 实现
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
// 串行流
list.stream().forEach(System.out::println);
// 并行流(使用 ForkJoinPool.commonPool())
list.parallelStream().forEach(System.out::println);
// 指定池
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() ->
list.parallelStream().forEach(System.out::println)
);
5.2 性能对比
int[] array = IntStream.range(0, 10000000).toArray();
// 串行求和
long start = System.currentTimeMillis();
int sum = Arrays.stream(array).sum();
// 耗时:80ms
// 并行求和
start = System.currentTimeMillis();
int parallelSum = Arrays.stream(array).parallel().sum();
// 耗时:25ms(3.2 倍提升)
5.3 并行流注意事项
// ❌ 错误:共享状态
List<Integer> results = new ArrayList<>();
list.parallelStream().forEach(results::add);
// 线程不安全
// ✅ 正确:使用收集器
List<Integer> results = list.parallelStream()
.collect(Collectors.toList());
// ❌ 错误:阻塞操作
list.parallelStream().forEach(item -> {
Thread.sleep(100); // IO 阻塞,浪费线程
});
// ✅ 正确:CPU 密集型操作
list.parallelStream().forEach(item -> {
computeIntensive(item);
});
六、性能调优
6.1 阈值选择
// 阈值太小:任务分解过多,开销大
// 阈值太大:并行度不足
// 经验法则
// - 简单操作:1000-10000
// - 复杂操作:100-1000
// - 根据 CPU 核心数调整
int threshold = array.length / (Runtime.getRuntime().availableProcessors() * 4);
6.2 线程池大小
// 默认:CPU 核心数
ForkJoinPool.commonPool();
// 自定义
int parallelism = Runtime.getRuntime().availableProcessors();
ForkJoinPool pool = new ForkJoinPool(parallelism);
// CPU 密集型:parallelism = CPU 核心数
// IO 密集型:parallelism = CPU 核心数 * 2
6.3 性能监控
// 监控 ForkJoinPool
ForkJoinPool pool = ForkJoinPool.commonPool();
System.out.println("并行度:" + pool.getParallelism());
System.out.println("活跃线程:" + pool.getActiveThreadCount());
System.out.println("运行线程:" + pool.getRunningThreadCount());
System.out.println("队列大小:" + pool.getQueuedSubmissionCount());
System.out.println("窃取次数:" + pool.getStealCount());
七、总结
Fork/Join 核心要点:
| 组件 | 作用 | 特点 |
|---|---|---|
| ForkJoinPool | 线程池 | 工作窃取、自动负载均衡 |
| ForkJoinTask | 任务基类 | 可 fork/join |
| RecursiveAction | 无返回值任务 | compute() 返回 void |
| RecursiveTask | 有返回值任务 | compute() 返回 V |
| 工作窃取 | 负载均衡 | 双端队列、LIFO/FIFO |
Fork/Join 适合 CPU 密集型、可分解的大规模计算任务,并行流是其典型应用。