Skip to content
清晨的一缕阳光
返回

并发工具类实战

并发工具类实战

Java 并发包提供了丰富的工具类,简化多线程协作,提高代码可维护性。

一、CountDownLatch(倒计时门闩)

1.1 核心概念

CountDownLatch = 计数器 + 等待

特点:
- 一个或多个线程等待其他 N 个线程完成
- 计数器只能使用一次
- 不可重置

1.2 基本使用

CountDownLatch latch = new CountDownLatch(3);

// 线程 1、2、3
new Thread(() -> {
    doWork();
    latch.countDown(); // 计数器 -1
}).start();

// 主线程等待
latch.await(); // 阻塞直到计数器为 0
System.out.println("All threads completed");

// 带超时等待
boolean success = latch.await(5, TimeUnit.SECONDS);

1.3 应用场景

(1) 启动准备

public class Service {
    private final CountDownLatch readyLatch = new CountDownLatch(3);
    
    public void start() {
        // 启动 3 个组件
        new Thread(this::initDatabase).start();
        new Thread(this::initCache).start();
        new Thread(this::initConfig).start();
        
        // 等待所有组件就绪
        readyLatch.await();
        System.out.println("Service started");
    }
    
    private void initDatabase() {
        // 初始化数据库
        readyLatch.countDown();
    }
    
    private void initCache() {
        // 初始化缓存
        readyLatch.countDown();
    }
    
    private void initConfig() {
        // 初始化配置
        readyLatch.countDown();
    }
}

(2) 并行任务汇总

public class DataAggregator {
    public Result aggregate(List<Task> tasks) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(tasks.size());
        List<Result> results = Collections.synchronizedList(new ArrayList<>());
        
        for (Task task : tasks) {
            new Thread(() -> {
                try {
                    results.add(task.execute());
                } finally {
                    latch.countDown();
                }
            }).start();
        }
        
        latch.await(); // 等待所有任务完成
        return combine(results);
    }
}

二、CyclicBarrier(循环屏障)

2.1 核心概念

CyclicBarrier = 屏障 + 可重用

特点:
- 一组线程互相等待,到达屏障点后一起执行
- 可重复使用(reset)
- 可设置屏障动作

2.2 基本使用

CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("All threads reached barrier");
});

// 线程 1、2、3
new Thread(() -> {
    doWork1();
    barrier.await(); // 等待其他线程
    doWork2();       // 一起执行
}).start();

2.3 应用场景

(1) 并行计算

public class MatrixCalculator {
    private final CyclicBarrier barrier;
    private final int[][] matrix;
    private final int[][] result;
    
    public MatrixCalculator(int[][] matrix, int threadCount) {
        this.matrix = matrix;
        this.result = new int[matrix.length][matrix[0].length];
        this.barrier = new CyclicBarrier(threadCount, this::mergeResults);
    }
    
    public void calculate() throws Exception {
        int chunkSize = matrix.length / Runtime.getRuntime().availableProcessors();
        
        for (int i = 0; i < threadCount; i++) {
            final int start = i * chunkSize;
            final int end = (i == threadCount - 1) ? matrix.length : start + chunkSize;
            
            new Thread(() -> {
                try {
                    // 计算分片
                    for (int row = start; row < end; row++) {
                        result[row] = process(matrix[row]);
                    }
                    barrier.await(); // 等待所有线程
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
    
    private void mergeResults() {
        // 合并结果(屏障动作)
    }
}

(2) 多线程测试

public class PerformanceTest {
    private final CyclicBarrier barrier;
    
    public PerformanceTest(int threadCount) {
        this.barrier = new CyclicBarrier(threadCount + 1);
    }
    
    public void test() throws Exception {
        // 启动工作线程
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    barrier.await(); // 等待所有线程就绪
                    long start = System.currentTimeMillis();
                    doWork();
                    long end = System.currentTimeMillis();
                    System.out.println("Cost: " + (end - start) + "ms");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
        
        Thread.sleep(1000); // 确保所有线程启动
        barrier.await(); // 主线程发出开始信号
    }
}

三、CountDownLatch vs CyclicBarrier

特性CountDownLatchCyclicBarrier
可重用❌ 否✅ 是
作用一个线程等待 N 个线程N 个线程互相等待
计数器递减到 0递增到阈值
屏障动作❌ 无✅ 可设置
适用场景启动准备、任务汇总并行计算、分阶段任务

四、Semaphore(信号量)

4.1 核心概念

Semaphore = 许可证 + 获取/释放

特点:
- 控制同时访问的线程数
- 公平/非公平模式
- 可获取多个许可证

4.2 基本使用

Semaphore semaphore = new Semaphore(3); // 3 个许可证

// 获取许可证
semaphore.acquire();
try {
    doWork();
} finally {
    semaphore.release();
}

// 尝试获取(超时)
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
    try {
        doWork();
    } finally {
        semaphore.release();
    }
}

4.3 应用场景

(1) 限流器

public class RateLimiter {
    private final Semaphore semaphore;
    
    public RateLimiter(int maxConcurrent) {
        this.semaphore = new Semaphore(maxConcurrent);
    }
    
    public void execute(Runnable task) throws InterruptedException {
        semaphore.acquire();
        try {
            task.run();
        } finally {
            semaphore.release();
        }
    }
}

// 使用
RateLimiter limiter = new RateLimiter(10); // 最多 10 个并发
limiter.execute(() -> doWork());

(2) 数据库连接池

public class ConnectionPool {
    private final Semaphore semaphore;
    private final BlockingQueue<Connection> pool;
    
    public ConnectionPool(int maxSize) {
        this.semaphore = new Semaphore(maxSize);
        this.pool = new ArrayBlockingQueue<>(maxSize);
        // 初始化连接
        for (int i = 0; i < maxSize; i++) {
            pool.add(createConnection());
        }
    }
    
    public Connection getConnection() throws InterruptedException {
        semaphore.acquire();
        return pool.poll();
    }
    
    public void returnConnection(Connection conn) {
        pool.offer(conn);
        semaphore.release();
    }
}

(3) 文件读写控制

public class FileReader {
    private final Semaphore readSemaphore = new Semaphore(5); // 最多 5 个读
    private final Semaphore writeSemaphore = new Semaphore(1); // 1 个写
    
    public void read() throws InterruptedException {
        readSemaphore.acquire();
        try {
            doRead();
        } finally {
            readSemaphore.release();
        }
    }
    
    public void write() throws InterruptedException {
        writeSemaphore.acquire();
        try {
            doWrite();
        } finally {
            writeSemaphore.release();
        }
    }
}

五、其他工具类

5.1 CountDownTimer(倒计时器)

// Java 没有内置,可自定义实现
public class CountDownTimer {
    private final long millisInFuture;
    private final long countDownInterval;
    
    public void start() {
        new Timer().schedule(new TimerTask() {
            long remaining = millisInFuture;
            
            @Override
            public void run() {
                onTick(remaining);
                remaining -= countDownInterval;
                if (remaining <= 0) {
                    cancel();
                    onFinish();
                }
            }
        }, 0, countDownInterval);
    }
    
    protected void onTick(long millisUntilFinished) {}
    protected void onFinish() {}
}

5.2 Phaser(阶段器)

// JDK 7+,更灵活的屏障
Phaser phaser = new Phaser(3); // 3 个参与方

// 线程 1
phaser.arriveAndAwaitAdvance(); // 到达并等待
// 进入下一阶段

// 动态注册/注销
phaser.register();   // 注册
phaser.arriveAndDeregister(); // 到达并注销

5.3 Exchanger(交换器)

// 两个线程交换数据
Exchanger<String> exchanger = new Exchanger<>();

// 线程 A
String dataA = exchanger.exchange("Data from A");

// 线程 B
String dataB = exchanger.exchange("Data from B");

// 交换后:线程 A 拿到 dataB,线程 B 拿到 dataA

六、最佳实践

6.1 异常处理

// ❌ 不处理中断
latch.await();

// ✅ 处理中断
try {
    latch.await();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new RuntimeException("Interrupted", e);
}

6.2 避免死锁

// ❌ 可能死锁
CyclicBarrier barrier = new CyclicBarrier(3);
// 如果某个线程异常退出,其他线程永远等待

// ✅ 设置超时
barrier.await(5, TimeUnit.SECONDS);

// ✅ 处理异常
try {
    barrier.await();
} catch (BrokenBarrierException e) {
    // 屏障被破坏
}

6.3 资源释放

// ✅ 总是 finally 中释放
semaphore.acquire();
try {
    doWork();
} finally {
    semaphore.release();
}

6.4 选择合适的工具

场景推荐工具
等待 N 个任务完成CountDownLatch
多线程并行计算CyclicBarrier
控制并发访问数Semaphore
分阶段协作Phaser
线程间交换数据Exchanger

七、总结

并发工具类核心要点:

工具作用可重用典型场景
CountDownLatch等待 N 个线程完成启动准备、任务汇总
CyclicBarrier线程互相等待并行计算、分阶段
Semaphore控制并发访问数限流、连接池
Phaser灵活阶段控制复杂协作
Exchanger线程间交换数据数据交换

选择合适的工具类可以简化并发编程,提高代码可读性和可维护性。


分享这篇文章到:

上一篇文章
ThreadLocal 原理与实战
下一篇文章
Atomic 原子类详解