AQS 深度解析
AQS 是 Java 并发包的核心框架,通过状态变量和 FIFO 队列实现线程同步,为锁和同步器提供统一基础。
一、核心概念
1.1 什么是 AQS
AbstractQueuedSynchronizer(抽象队列同步器)是 java.util.concurrent.locks 中的核心抽象类,为构建锁和同步器提供模板化框架。
设计思想:
- 状态管理:通过
volatile int state表示同步状态 - 队列调度:基于 FIFO 双向链表管理等待线程
- 模板方法:子类重写
tryAcquire/tryRelease定义同步逻辑
1.2 应用场景
| 同步工具 | 模式 | state 含义 |
|---|---|---|
| ReentrantLock | 独占 | 锁重入次数 |
| Semaphore | 共享 | 可用许可数 |
| CountDownLatch | 共享 | 计数器 |
| ReentrantReadWriteLock | 独占 + 共享 | 读写锁状态 |
二、核心组件
2.1 同步状态(state)
// state 定义
private volatile int state;
// 操作方法
protected final int getState() { return state; }
protected final void setState(int newState) { state = newState; }
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
state 含义示例:
- ReentrantLock:
state=0锁空闲,state>0重入次数 - Semaphore:
state表示可用许可数 - CountDownLatch:
state初始化为计数值
2.2 同步队列(CLH 队列)
static final class Node {
// 节点状态
volatile int waitStatus;
// 前驱和后继
volatile Node prev;
volatile Node next;
// 绑定线程
volatile Thread thread;
// 条件队列或共享标记
Node nextWaiter;
// 状态常量
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
}
队列结构:
Head → Node1 → Node2 → Node3 → Tail
↓ ↓ ↓
Thread1 Thread2 Thread3
三、核心方法
3.1 独占模式
获取资源:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
流程:
tryAcquire:尝试直接获取(CAS 修改 state)- 失败则
addWaiter:加入队列尾部 acquireQueued:自旋/阻塞等待
释放资源:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
3.2 共享模式
获取资源:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
释放资源:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
四、关键机制
4.1 公平性
非公平锁:
// 直接尝试获取,可插队
protected boolean tryAcquire(int acquires) {
return compareAndSetState(0, 1);
}
公平锁:
// 检查队列中是否有等待线程
if (hasQueuedPredecessors())
return false;
return compareAndSetState(0, 1);
4.2 条件变量(ConditionObject)
// await() - 释放锁并加入条件队列
public final void await() throws InterruptedException {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
LockSupport.park(this);
// 被唤醒后重新竞争锁
}
// signal() - 转移到同步队列
public final void signal() {
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
4.3 CAS 与自旋
// CAS 操作保证原子性
protected final boolean compareAndSetState(int expect, int update)
// 自旋减少阻塞开销
while (!tryAcquire(arg)) {
if (shouldParkAfterFailedAcquire(p, node))
LockSupport.park(this);
}
五、实战示例
5.1 实现简单锁
class Mutex extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
if (getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public void unlock() { release(1); }
}
5.2 实现信号量
class Semaphore extends AbstractQueuedSynchronizer {
public Semaphore(int permits) {
setState(permits);
}
@Override
protected int tryAcquireShared(int acquires) {
while (true) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
@Override
protected boolean tryReleaseShared(int releases) {
while (true) {
int p = getState();
if (compareAndSetState(p, p + releases))
return true;
}
}
}
六、总结
AQS 核心要点:
| 组件 | 作用 | 实现 |
|---|---|---|
| state | 同步状态 | volatile + CAS |
| CLH 队列 | 线程调度 | 双向链表 |
| 模板方法 | 自定义逻辑 | tryAcquire/tryRelease |
| Condition | 条件等待 | 条件队列 |
AQS 是并发包的基石,理解 AQS 有助于深入掌握 ReentrantLock、Semaphore 等同步工具。