AQS源码解析---以ReentrantLock为例
概述
学习并发编程,就必须要了解到AQS(AbstractQueuedSynchronizer)。比如经常使用的ReentrantLock就是继承的AQS,而且大部分的线程操作都是由AQS完成,ReentrantLock自身仅仅负责实现当前线程对共享资源的获取和释放功能。而AQS负责实现了对没有获取到资源的线程放入同步队列、阻塞、唤醒、溢出阻塞队列的操作。
其实对多线程获取资源分为三个步骤:
(1)线程尝试获取资源;
(2)将没有获取到资源的线程放入同步队列当中,线程进入等待状态(在此过程中,没有获取到资源的线程也会多次尝试重新获取资源);
(3)在同步队列中陷入等待状态的线程被上一个节点中断等待,重新尝试获取资源。
架构
AQS的功能十分强大,既支持线程的共享资源抢占,也支持线程独占。这里先了解线程的资源独占方式。先明确两个概念:
(1)共享资源是否被抢占有一个信号标识,标识为0则资源未被抢占,可以在源码中看到大量的cas操作;
(2)线程在AQS中被封装成为一个node节点,node节点有前驱及后继,构成了等待队列。node节点有5中状态值:
-
CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
-
SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
-
CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
-
PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
-
0:新结点入队时的默认状态,此状态下的线程代表是就绪或执行状态。
我们这里先关注默认值0和SIGNAL(-1)两种状态即可。
1、ReentrantLock资源抢占
ReentrantLock默认是非公平锁,因此当前线程会什么都不管,先进行一次cas操作尝试获取锁,如下:
final void lock() {
//当前线程直接尝试获取资源
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//开始进行后续的资源抢占方法
acquire(1);
}
而如果是公平锁,则直接执行acquire(1)方法,这里是公平锁和非公平锁的区别之一,接下来还有一个地方可以展现两种锁的区别。
2、acquire()
acquire()方法是在AQS中定义的,是对线程操作的入口方法,这里先进行大致说明。tryAcquire(arg)是让当前线程尝试获取资源,addWaiter(Node.EXCLUSIVE)是将当前线程封装为Node加入到等待队列,acquireQueued()是将线程执行park()方法进入到等待状态。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2.1、tryAcquire()
可以看下AQS当中的方法源码,没有任何的功能逻辑,只是抛出了个异常。没错,这个方法需要我们自己去实现。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
那我们看下ReentrantLock的tryAcquire()方法,ReentrantLock实现了公平锁和非公平锁两种方式,我们先看非公平锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
好吧,那我们继续 nonfairTryAcquire(acquires)方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//执行cas操作抢占资源
if (compareAndSetState(0, acquires)) {
//如果抢占成功,则设置当前线程为拥有资源的线程
setExclusiveOwnerThread(current);
return true;
}
}
//如果资源已经被抢占了,则判断当前线程是否为拥有资源的线程
else if (current == getExclusiveOwnerThread()) {
//如果是,则把资源的占用数+1,注意,这里就是可重入锁的重要标志哦
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平锁的tryAcquire()方法差别不大,不过还是说一下吧,如下
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//和非公平锁的重要差别就是多了这一个hasQueuedPredecessors方法
//这个方法就是判断当同步队列中是否有正在等待的线程,如果没有则当前线程就可以直接尝试获取资源,如果有那就只能乖乖排队了,是不是很公平
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
2.2、addWaiter
前面尝试让线程获取资源不成功,因此接下来就需要将线程封装进Node中,然后放入队列的尾部进行排队了。
private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node mode) {
//将当前线程封装进node
AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//得到同步队列中的尾部结点
AbstractQueuedSynchronizer.Node pred = tail;
//如果尾部结点不为空
if (pred != null) {
//cas操作尝试把当前线程的节点插入值同步队列的末尾
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//同步队列中可能还没有插入任何节点,因此尾结点为空,则执行enq方法
enq(node);
return node;
}
2.2.1、enq()
通过cas自旋操作将node结点加入到同步队列当中。
private AbstractQueuedSynchronizer.Node enq(final AbstractQueuedSynchronizer.Node node) {
//开始自旋
for (;;) {
//得到同步队列尾部节点
AbstractQueuedSynchronizer.Node t = tail;
//判断尾部结点是否为null,是不是有些疑惑,上一步骤中已经判断尾结点为null了,为什么还要判断呢
//不要忘记我们这里是多线程操作哦,在这个线程执行过程中,可能其它线程已经插入尾部结点了,因此需要再次判断
if (t == null) { // Must initialize
//为null,说明同步队列还没初始化
//这里进行cas操作对同步队列进行初始化
if (compareAndSetHead(new AbstractQueuedSynchronizer.Node()))
tail = head;
} else {
//不为null,说明同步队列已经初始化了,那么进行cas操作将次node结点插入就可以了
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
//经过自旋cas操作后
return t;
}
}
}
}
2.3、acquireQueued()
将线程封装进Node节点之后,线程不能一直处于运行状态占用着CPU吧,那么接下来就需要让线程进入等待状态了。那么随之而来有一个问题,线程阻塞之后谁来唤醒自己呢,先说结论,是由当前node节点的前驱节点唤醒的,但是任意的前驱都会唤醒自己的后继节点吗,当然不是了。因此上一步只是将node节点加入到了同步队列的尾部,接下来就必须为自己找一个能唤醒自己的前驱了。
final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) {
//让线程进入等待状态是否失败的标志,确切说应该是是否抛出异常的标志
boolean failed = true;
try {
//线程是否打上了中断标志
boolean interrupted = false;
//又是自旋操作
for (;;) {
//得到当前线程节点的前驱
final AbstractQueuedSynchronizer.Node p = node.predecessor();
/**
* 如果当前线程节点的前驱是同步队列的头结点,那么也就是说自己已经成为同步队列的二把手了
* 那么自己可以执行tryAcquire方法再次尝试抢占下资源了
* 因为线程状态的切换也是消耗资源的,所以能不改变线程的状态就不要改变
*
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* shouldParkAfterFailedAcquire()方法用于将当前线程节点找到合适的前驱结点,并告知前驱结点执行完之后记得唤醒自己
* parkAndCheckInterrupt()方法用于将线程执行park()方法进入等待状态
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.3.1、shouldParkAfterFailedAcquire()
线程节点进入等待状态后,需要有其它线程唤醒自己,这里是通过当前结点的前驱唤醒的,但是不是所有的结点都可以唤醒自己的后继,因此找到一个靠谱的大哥是很重要的(找到一个能够唤醒自己的结点当做前驱)。
private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node
pred, AbstractQueuedSynchronizer.Node node) {
//得到自己前驱的线程状态
int ws = pred.waitStatus;
//如果处于SIGNAL状态,那没事了,这个前驱会唤醒自己的
if (ws == AbstractQueuedSynchronizer.Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//如果大于0,说明当前前驱的线程已经被取消了,根本不会再执行,那怎么也不会唤醒自己的
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
//开始一步步向前遍历,直到直到一个线程没被取消的结点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//找到之后,把前驱node结点的状态设置为SIGNAL,即告诉了自己的前驱执行完之后,记得唤醒自己这个等待线程
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, AbstractQueuedSynchronizer.Node.SIGNAL);
}
return false;
}
2.3.2、parkAndCheckInterrupt()
上一步骤当中终于为自己找到合适的前驱了,接下来就可以安心的让自己这个线程进入等待状态了。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
以上终于是完完全全的将线程封装为Node放入了同步队列中、并让线程进入了等待状态,那么接下来是不是就等着占用着资源的线程执行完毕,然后释放锁并唤醒占用资源线程的后继线程了。
2.4 、unlock()
ReentrantLock释放锁调用的是unlock()方法,里面方法很简单,调用了AQS的release()方法。
public void unlock() {
sync.release(1);
}
2.5、release()
释放锁要经过两个步骤:
(1)让出当前线程所占用的资源,即修改资源占用信号量为0,设置占据当前资源的线程为null;
(2)通知当前线程结点的后继结点开始抢占资源,即将后继结点的线程从阻塞等待状态就行唤醒转为就绪状态。
public final boolean release(int arg) {
//调用ReentrantLock自身的tryRelease()方法,将资源占用标识修改为0,资源占用线程修改为null
if (tryRelease(arg)) {
AbstractQueuedSynchronizer.Node h = head;
//waitStatus!=0才能唤醒后继线程,这里就是为什么要将等待线程的前驱节点的waitStatus设置为SIGNAL(-1)的原因
if (h != null && h.waitStatus != 0)
//唤醒当前线程的后继
unparkSuccessor(h);
return true;
}
return false;
}
2.5.1、tryRelease()
那么现在就执行让出资源的第一步,注意可能是可重入锁,因此仅释放一次锁不代表被占用的资源被释放,一定要资源占用信号量为0时,才代表资源被释放了。
protected final boolean tryRelease(int releases) {
/**
* 记得吗,ReentrantLock是可重入锁,因此不一定释放一次锁就释放了占用的资源
* 所以每unlock()一次,占用锁的信号量就减一
*/
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//直到资源占用信号量为0了,那么说明真的让出了资源
if (c == 0) {
free = true;
//此时把占用资源的线程设置为null
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2.5.2、unparkSuccessor()
好了,现在资源被释放了,可以通知下一个结点从等待状态转为就绪状态,进而去占用资源继续线程的执行了。
private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
//将结点状态设置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
AbstractQueuedSynchronizer.Node s = node.next;
//得到当前线程结点的后继,如果后续未null,或者后继结点的状态为取消状态
if (s == null || s.waitStatus > 0) {
s = null;
//从同步队列的尾部向前遍历,
for (AbstractQueuedSynchronizer.Node t = tail; t != null && t != node; t = t.prev)
//得到第一个不是出于取消状态的结点,并将其插入到当前可执行结点的后继当中
//自己这里有些疑惑,为什么不是从前往后遍历,而是从后向前遍历呢
//感觉从后往前遍历会丢失一些处于等待状态的结点,这个后续我再进行思考,大家有了解的也请多多指教
if (t.waitStatus <= 0)
s = t;
}
//后继结点不为null,并且不是取消状态,那么唤醒后继结点,后继结点的线程可以去抢占资源了
if (s != null)
LockSupport.unpark(s.thread);
}
2.6、acquireQueued()
是不是很疑惑,为什么又要讲acquireQueued()方法,不是已经说过了吗?不过我想更疑惑的应该是,线程被唤醒之后为什么就可以重新尝试占用资源了呢?所以我们再来回顾下这个方法吧。
线程在这个位置进入了等待状态,那么被唤醒之后,自然要继续往下执行。然后我们就发现,此时线程处于一个自旋操作中,只有成功抢占到资源才能推出自旋操作,否则只能再次进入等待状态,是不是到这里终于将线程的上锁、等待、解锁、抢占行程了一个闭环操作。学习到这里真的是不得不佩服这个Doug Lea老爷子设计思想的精妙。
3、总结
好了,到这里AQS的介绍就完毕了,非常感谢这位博主博客提供的思路,Java并发之AQS详解 - waterystone - 博客园
AQS的功能是十分强大的,我只整理出了部分代码的逻辑就花费了很多的精力,但也确实是收获满满,不过理解还不是特别深刻,如果有理解错误的地方大家请多多指教,有问题也请多多交流一起学习啊。