从ReentrantLock来学习AQS
众所周知,AQS是Java用来构建同步工具的基本组件,我们常用的ReentrantLock,Semaphore等同步器都是基于AQS来构建的,这里我们从ReentrantLock这个最简单的同步锁来入手,学习AQS的基本思想。JDK源码基于JDK17,对比一下与JDK1.8在细节上有一些不同。
这里假定各位对AQS有一些基本了解,不再对AQS的原理进行介绍。
ReentrantLock#lock()
@ReservedStackAccess
final void lock() {
if (!initialTryLock())
acquire(1);
}
initialTryLock
方法会尝试去获取锁,这里公平锁和非公平锁有一些不同,公平锁会先判断当前队列是否为空,为空才会尝试获取
// NonfairSync
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { // first attempt is unguarded
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}
//FairSync
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
由于ReentrantLock是可重入锁,在获取成功之后需要对state +1,同样在release时也要做反操作。
在第一次尝试获取锁失败后,acquire(1)
这个方法就正式进入AQS的流程了。
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
这里还是先尝试获取锁,tryAcquire
内做的事情和initialTryLock
差不多,也是有公平锁和非公平锁两个实现。
然后就是重头戏了
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time){
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {...}
}
这个方法比较长,里面各种条件也是眼花缭乱,这里将代码拆分逐一介绍,各位在看这部分源码时也可以参考JDK1.8的版本,相对来说更加易懂。
第一步:在入队之前,尝试获取锁
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
看这里的条件,如果是头结点或者前置节点为空,会尝试获取锁,因为我们当前还没有创建节点,也没有入队,所以pred == null
为true。
第二步:如果获取锁失败,创建节点并插到队尾
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
}
这里其实留了一个小trick,可以看到创建节点和入队并不是在同一次循环中完成的,也就是说在成功入队之前,每一次循环都会走到第一步中尝试获取锁,这里也是JDK17与JDK1.8的不同
第三步:
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
这里的if
看起来非常复杂,其实主要是判断以下几个条件:
- 当前节点不是头结点
- 前置节点不为空
- 前置节点不是头结点
在这过程中还顺带做了以下两件事:
- 对pre赋值
- 对first赋值
理完这个if,再看里面做了什么。
- 如果前置的状态为0(表示前置节点已经取消),需要调用
cleanQueue
对队列进行清理。在cleanQueue里会从队尾循环查找是否有cancelled的节点,然后唤醒下一个最有可能获取锁的节点。 - 如果前置的前置为空,表明很快就能够获取到锁,使用
Thread.onSpinWait()
进行自旋。这里也是AQS为什么要使用双向队列的一个原因。
第四步:设置状态
新建的节点status总是0
else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
}
第五步:将线程挂起
else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
通过LockSupport.park
或者LockSupport.parkNanos
方法挂起当前线程,直到:
- 别的线程调用了unpark方法
- 别的线程中断了该线程
- 等待时间到期(LockSupport.parkNanos)
- The call spuriously (that is, for no reason) returns没看懂是什么意思
当前线程被唤醒之后,会将状态重新置为0
在这一过程中,如果等待超时或者线程被中断,将跳出循环进入到cancelAcquire
流程,这里暂且不管。
第六步:再次自旋
如果前置节点为头节点时,first会在上一步置为true
else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
}
spins这个值是在第四步park时右移计算得到的
最后会在 3->1->4->5->6步骤之间循环,直至获取锁,然后将当前节点置为头结点。
ReentrantLock#unlock()
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}
unlock方法就比较简单,先尝试释放资源,由于是可重入锁,这里需要判断重入次数是否为0,为0之后才能进行signalNext
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING);
LockSupport.unpark(s.waiter);
}
}