并发工具类实战
Java 并发包提供了丰富的工具类,简化多线程协作,提高代码可维护性。
一、CountDownLatch(倒计时门闩)
1.1 核心概念
CountDownLatch = 计数器 + 等待
特点:
- 一个或多个线程等待其他 N 个线程完成
- 计数器只能使用一次
- 不可重置
1.2 基本使用
CountDownLatch latch = new CountDownLatch(3);
// 线程 1、2、3
new Thread(() -> {
doWork();
latch.countDown(); // 计数器 -1
}).start();
// 主线程等待
latch.await(); // 阻塞直到计数器为 0
System.out.println("All threads completed");
// 带超时等待
boolean success = latch.await(5, TimeUnit.SECONDS);
1.3 应用场景
(1) 启动准备
public class Service {
private final CountDownLatch readyLatch = new CountDownLatch(3);
public void start() {
// 启动 3 个组件
new Thread(this::initDatabase).start();
new Thread(this::initCache).start();
new Thread(this::initConfig).start();
// 等待所有组件就绪
readyLatch.await();
System.out.println("Service started");
}
private void initDatabase() {
// 初始化数据库
readyLatch.countDown();
}
private void initCache() {
// 初始化缓存
readyLatch.countDown();
}
private void initConfig() {
// 初始化配置
readyLatch.countDown();
}
}
(2) 并行任务汇总
public class DataAggregator {
public Result aggregate(List<Task> tasks) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(tasks.size());
List<Result> results = Collections.synchronizedList(new ArrayList<>());
for (Task task : tasks) {
new Thread(() -> {
try {
results.add(task.execute());
} finally {
latch.countDown();
}
}).start();
}
latch.await(); // 等待所有任务完成
return combine(results);
}
}
二、CyclicBarrier(循环屏障)
2.1 核心概念
CyclicBarrier = 屏障 + 可重用
特点:
- 一组线程互相等待,到达屏障点后一起执行
- 可重复使用(reset)
- 可设置屏障动作
2.2 基本使用
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All threads reached barrier");
});
// 线程 1、2、3
new Thread(() -> {
doWork1();
barrier.await(); // 等待其他线程
doWork2(); // 一起执行
}).start();
2.3 应用场景
(1) 并行计算
public class MatrixCalculator {
private final CyclicBarrier barrier;
private final int[][] matrix;
private final int[][] result;
public MatrixCalculator(int[][] matrix, int threadCount) {
this.matrix = matrix;
this.result = new int[matrix.length][matrix[0].length];
this.barrier = new CyclicBarrier(threadCount, this::mergeResults);
}
public void calculate() throws Exception {
int chunkSize = matrix.length / Runtime.getRuntime().availableProcessors();
for (int i = 0; i < threadCount; i++) {
final int start = i * chunkSize;
final int end = (i == threadCount - 1) ? matrix.length : start + chunkSize;
new Thread(() -> {
try {
// 计算分片
for (int row = start; row < end; row++) {
result[row] = process(matrix[row]);
}
barrier.await(); // 等待所有线程
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
private void mergeResults() {
// 合并结果(屏障动作)
}
}
(2) 多线程测试
public class PerformanceTest {
private final CyclicBarrier barrier;
public PerformanceTest(int threadCount) {
this.barrier = new CyclicBarrier(threadCount + 1);
}
public void test() throws Exception {
// 启动工作线程
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
barrier.await(); // 等待所有线程就绪
long start = System.currentTimeMillis();
doWork();
long end = System.currentTimeMillis();
System.out.println("Cost: " + (end - start) + "ms");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(1000); // 确保所有线程启动
barrier.await(); // 主线程发出开始信号
}
}
三、CountDownLatch vs CyclicBarrier
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 可重用 | ❌ 否 | ✅ 是 |
| 作用 | 一个线程等待 N 个线程 | N 个线程互相等待 |
| 计数器 | 递减到 0 | 递增到阈值 |
| 屏障动作 | ❌ 无 | ✅ 可设置 |
| 适用场景 | 启动准备、任务汇总 | 并行计算、分阶段任务 |
四、Semaphore(信号量)
4.1 核心概念
Semaphore = 许可证 + 获取/释放
特点:
- 控制同时访问的线程数
- 公平/非公平模式
- 可获取多个许可证
4.2 基本使用
Semaphore semaphore = new Semaphore(3); // 3 个许可证
// 获取许可证
semaphore.acquire();
try {
doWork();
} finally {
semaphore.release();
}
// 尝试获取(超时)
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
try {
doWork();
} finally {
semaphore.release();
}
}
4.3 应用场景
(1) 限流器
public class RateLimiter {
private final Semaphore semaphore;
public RateLimiter(int maxConcurrent) {
this.semaphore = new Semaphore(maxConcurrent);
}
public void execute(Runnable task) throws InterruptedException {
semaphore.acquire();
try {
task.run();
} finally {
semaphore.release();
}
}
}
// 使用
RateLimiter limiter = new RateLimiter(10); // 最多 10 个并发
limiter.execute(() -> doWork());
(2) 数据库连接池
public class ConnectionPool {
private final Semaphore semaphore;
private final BlockingQueue<Connection> pool;
public ConnectionPool(int maxSize) {
this.semaphore = new Semaphore(maxSize);
this.pool = new ArrayBlockingQueue<>(maxSize);
// 初始化连接
for (int i = 0; i < maxSize; i++) {
pool.add(createConnection());
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire();
return pool.poll();
}
public void returnConnection(Connection conn) {
pool.offer(conn);
semaphore.release();
}
}
(3) 文件读写控制
public class FileReader {
private final Semaphore readSemaphore = new Semaphore(5); // 最多 5 个读
private final Semaphore writeSemaphore = new Semaphore(1); // 1 个写
public void read() throws InterruptedException {
readSemaphore.acquire();
try {
doRead();
} finally {
readSemaphore.release();
}
}
public void write() throws InterruptedException {
writeSemaphore.acquire();
try {
doWrite();
} finally {
writeSemaphore.release();
}
}
}
五、其他工具类
5.1 CountDownTimer(倒计时器)
// Java 没有内置,可自定义实现
public class CountDownTimer {
private final long millisInFuture;
private final long countDownInterval;
public void start() {
new Timer().schedule(new TimerTask() {
long remaining = millisInFuture;
@Override
public void run() {
onTick(remaining);
remaining -= countDownInterval;
if (remaining <= 0) {
cancel();
onFinish();
}
}
}, 0, countDownInterval);
}
protected void onTick(long millisUntilFinished) {}
protected void onFinish() {}
}
5.2 Phaser(阶段器)
// JDK 7+,更灵活的屏障
Phaser phaser = new Phaser(3); // 3 个参与方
// 线程 1
phaser.arriveAndAwaitAdvance(); // 到达并等待
// 进入下一阶段
// 动态注册/注销
phaser.register(); // 注册
phaser.arriveAndDeregister(); // 到达并注销
5.3 Exchanger(交换器)
// 两个线程交换数据
Exchanger<String> exchanger = new Exchanger<>();
// 线程 A
String dataA = exchanger.exchange("Data from A");
// 线程 B
String dataB = exchanger.exchange("Data from B");
// 交换后:线程 A 拿到 dataB,线程 B 拿到 dataA
六、最佳实践
6.1 异常处理
// ❌ 不处理中断
latch.await();
// ✅ 处理中断
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted", e);
}
6.2 避免死锁
// ❌ 可能死锁
CyclicBarrier barrier = new CyclicBarrier(3);
// 如果某个线程异常退出,其他线程永远等待
// ✅ 设置超时
barrier.await(5, TimeUnit.SECONDS);
// ✅ 处理异常
try {
barrier.await();
} catch (BrokenBarrierException e) {
// 屏障被破坏
}
6.3 资源释放
// ✅ 总是 finally 中释放
semaphore.acquire();
try {
doWork();
} finally {
semaphore.release();
}
6.4 选择合适的工具
| 场景 | 推荐工具 |
|---|---|
| 等待 N 个任务完成 | CountDownLatch |
| 多线程并行计算 | CyclicBarrier |
| 控制并发访问数 | Semaphore |
| 分阶段协作 | Phaser |
| 线程间交换数据 | Exchanger |
七、总结
并发工具类核心要点:
| 工具 | 作用 | 可重用 | 典型场景 |
|---|---|---|---|
| CountDownLatch | 等待 N 个线程完成 | ❌ | 启动准备、任务汇总 |
| CyclicBarrier | 线程互相等待 | ✅ | 并行计算、分阶段 |
| Semaphore | 控制并发访问数 | ✅ | 限流、连接池 |
| Phaser | 灵活阶段控制 | ✅ | 复杂协作 |
| Exchanger | 线程间交换数据 | ✅ | 数据交换 |
选择合适的工具类可以简化并发编程,提高代码可读性和可维护性。