AQS框架源码解析
约 1859 字大约 6 分钟
javaaqsconcurrent
2025-03-07
概述
AbstractQueuedSynchronizer(AQS)是 java.util.concurrent.locks 包中的核心抽象类,是构建锁和同步器的基础框架。ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock 等并发工具都基于 AQS 实现。
AQS 的核心思想:用一个 volatile int state 表示同步状态,用一个 CLH变体队列 管理等待线程。
核心结构
Node节点
static final class Node {
// 等待状态
static final int CANCELLED = 1; // 线程已取消
static final int SIGNAL = -1; // 后继节点需要被唤醒
static final int CONDITION = -2; // 在条件队列中等待
static final int PROPAGATE = -3; // 共享模式下传播唤醒
volatile int waitStatus;
volatile Node prev; // 前驱
volatile Node next; // 后继
volatile Thread thread; // 关联的线程
Node nextWaiter; // Condition队列的下一个节点 / 模式标记
// 共享模式标记
static final Node SHARED = new Node();
// 独占模式标记
static final Node EXCLUSIVE = null;
}state变量
// 同步状态
private volatile int state;
protected final int getState() { return state; }
protected final void setState(int newState) { state = newState; }
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}不同同步器对 state 的含义不同:
| 同步器 | state 含义 |
|---|---|
| ReentrantLock | 0=未锁,>0=重入次数 |
| Semaphore | 可用许可数 |
| CountDownLatch | 剩余计数 |
| ReentrantReadWriteLock | 高16位=读锁持有数,低16位=写锁重入数 |
独占模式(Exclusive)
acquire — 获取锁
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 子类实现
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 入队 + 自旋/阻塞
selfInterrupt();
}
// 添加节点到队列尾部
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { // CAS设置尾节点
pred.next = node;
return node;
}
}
enq(node); // CAS失败或队列未初始化时,自旋入队
return node;
}
// 在队列中自旋或阻塞等待获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 前驱是head才有资格尝试获取
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 检查是否应该阻塞
parkAndCheckInterrupt()) // LockSupport.park
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}release — 释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) { // 子类实现
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒head的后继节点
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从tail向前遍历,找到最前面的有效节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒
}共享模式(Shared)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); // 传播唤醒
p.next = null;
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed) cancelAcquire(node);
}
}传播机制:共享模式下,一个线程获取成功后,会检查后继节点是否也是共享模式,如果是,则继续唤醒,形成连锁唤醒效果。
ConditionObject — 条件队列
public class ConditionObject implements Condition {
private transient Node firstWaiter; // 条件队列头
private transient Node lastWaiter; // 条件队列尾
public final void await() throws InterruptedException {
Node node = addConditionWaiter(); // 加入条件队列
int savedState = fullyRelease(node); // 完全释放锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 不在同步队列中则阻塞
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被signal后,重新在同步队列中竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// ...
}
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 将条件队列头节点转移到同步队列
}
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // CAS改状态 + enq入同步队列
(first = firstWaiter) != null);
}
}基于AQS的实现示例
ReentrantLock
// 非公平锁实现(默认)
static final class NonfairSync extends Sync {
final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 非公平:直接CAS抢锁,不管队列中有无等待者
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 重入
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
// 公平锁:tryAcquire中增加hasQueuedPredecessors()检查
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 公平:先检查是否有前驱节点在等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// ... 重入逻辑同上
return false;
}
}CountDownLatch
// state = count
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) { setState(count); }
// await调用:state==0时获取成功
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// countDown调用:state减1
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0) return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0; // 减到0时唤醒所有await的线程
}
}
}Semaphore
// state = permits(可用许可数)
abstract static class Sync extends AbstractQueuedSynchronizer {
// acquire: permits减1
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining; // <0表示获取失败
}
}
// release: permits加1
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}自定义同步器
// 简单的互斥锁实现
public class SimpleMutex {
private final Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0); // 无需CAS,持锁线程独占
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1 && getExclusiveOwnerThread() == Thread.currentThread();
}
}
public void lock() { sync.acquire(1); }
public void unlock() { sync.release(1); }
}AQS设计模式总结
子类只需实现"尝试"逻辑(tryAcquire/tryRelease),AQS 负责处理队列管理、线程阻塞和唤醒等复杂逻辑。
总结
AQS 是 JUC 并发包的基石,通过 state + CLH队列 + CAS 构建了一个通用的同步框架。理解 AQS 的 acquire/release 流程、Node 的 waitStatus 状态流转、以及 ConditionObject 的双队列机制,是深入理解 Java 并发编程的关键。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于