从ReentrantLock来学习AQS

众所周知,AQS是Java用来构建同步工具的基本组件,我们常用的ReentrantLock,Semaphore等同步器都是基于AQS来构建的,这里我们从ReentrantLock这个最简单的同步锁来入手,学习AQS的基本思想。JDK源码基于JDK17,对比一下与JDK1.8在细节上有一些不同。

这里假定各位对AQS有一些基本了解,不再对AQS的原理进行介绍。

ReentrantLock#lock()

		@ReservedStackAccess
        final void lock() {
            if (!initialTryLock())
                acquire(1);
        }

initialTryLock方法会尝试去获取锁,这里公平锁和非公平锁有一些不同,公平锁会先判断当前队列是否为空,为空才会尝试获取

        // NonfairSync
		final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            if (compareAndSetState(0, 1)) { // first attempt is unguarded
                setExclusiveOwnerThread(current);
                return true;
            } else if (getExclusiveOwnerThread() == current) {
                int c = getState() + 1;
                if (c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            } else
                return false;
        }


        //FairSync
		final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (getExclusiveOwnerThread() == current) {
                if (++c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            }
            return false;
        }

由于ReentrantLock是可重入锁,在获取成功之后需要对state +1,同样在release时也要做反操作。

在第一次尝试获取锁失败后,acquire(1)这个方法就正式进入AQS的流程了。

public final void acquire(int arg) {
        if (!tryAcquire(arg))
            acquire(null, arg, false, false, false, 0L);
    }

这里还是先尝试获取锁,tryAcquire内做的事情和initialTryLock差不多,也是有公平锁和非公平锁两个实现。

然后就是重头戏了

final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time){
		Thread current = Thread.currentThread();
        byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
        boolean interrupted = false, first = false;
        Node pred = null;                // predecessor of node when enqueued

        /*
         * Repeatedly:
         *  Check if node now first
         *    if so, ensure head stable, else ensure valid predecessor
         *  if node is first or not yet enqueued, try acquiring
         *  else if node not yet created, create it
         *  else if not yet enqueued, try once to enqueue
         *  else if woken from park, retry (up to postSpins times)
         *  else if WAITING status not set, set and retry
         *  else park and clear WAITING status, and check cancellation
         */

        for (;;) {...}
}

这个方法比较长,里面各种条件也是眼花缭乱,这里将代码拆分逐一介绍,各位在看这部分源码时也可以参考JDK1.8的版本,相对来说更加易懂。

第一步:在入队之前,尝试获取锁

			if (first || pred == null) {
                boolean acquired;
                try {
                    if (shared)
                        acquired = (tryAcquireShared(arg) >= 0);
                    else
                        acquired = tryAcquire(arg);
                } catch (Throwable ex) {
                    cancelAcquire(node, interrupted, false);
                    throw ex;
                }
                if (acquired) {
                    if (first) {
                        node.prev = null;
                        head = node;
                        pred.next = null;
                        node.waiter = null;
                        if (shared)
                            signalNextIfShared(node);
                        if (interrupted)
                            current.interrupt();
                    }
                    return 1;
                }
            }

看这里的条件,如果是头结点或者前置节点为空,会尝试获取锁,因为我们当前还没有创建节点,也没有入队,所以pred == null为true。

第二步:如果获取锁失败,创建节点并插到队尾

			if (node == null) {                 // allocate; retry before enqueue
                if (shared)
                    node = new SharedNode();
                else
                    node = new ExclusiveNode();
            } else if (pred == null) {          // try to enqueue
                node.waiter = current;
                Node t = tail;
                node.setPrevRelaxed(t);         // avoid unnecessary fence
                if (t == null)
                    tryInitializeHead();
                else if (!casTail(t, node))
                    node.setPrevRelaxed(null);  // back out
                else
                    t.next = node;
            }

这里其实留了一个小trick,可以看到创建节点和入队并不是在同一次循环中完成的,也就是说在成功入队之前,每一次循环都会走到第一步中尝试获取锁,这里也是JDK17与JDK1.8的不同

第三步:

if (!first && (pred = (node == null) ? null : node.prev) != null &&
                !(first = (head == pred))) {
                if (pred.status < 0) {
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }

这里的if看起来非常复杂,其实主要是判断以下几个条件:

  • 当前节点不是头结点
  • 前置节点不为空
  • 前置节点不是头结点

在这过程中还顺带做了以下两件事:

  • 对pre赋值
  • 对first赋值

理完这个if,再看里面做了什么。

  1. 如果前置的状态为0(表示前置节点已经取消),需要调用cleanQueue对队列进行清理。在cleanQueue里会从队尾循环查找是否有cancelled的节点,然后唤醒下一个最有可能获取锁的节点。
  2. 如果前置的前置为空,表明很快就能够获取到锁,使用Thread.onSpinWait()进行自旋。这里也是AQS为什么要使用双向队列的一个原因。

第四步:设置状态

新建的节点status总是0

			else if (node.status == 0) {
                node.status = WAITING;          // enable signal and recheck
            }

第五步:将线程挂起

			else {
                long nanos;
                spins = postSpins = (byte)((postSpins << 1) | 1);
                if (!timed)
                    LockSupport.park(this);
                else if ((nanos = time - System.nanoTime()) > 0L)
                    LockSupport.parkNanos(this, nanos);
                else
                    break;
                node.clearStatus();
                if ((interrupted |= Thread.interrupted()) && interruptible)
                    break;
            }

通过LockSupport.park或者LockSupport.parkNanos方法挂起当前线程,直到:

  • 别的线程调用了unpark方法
  • 别的线程中断了该线程
  • 等待时间到期(LockSupport.parkNanos)
  • The call spuriously (that is, for no reason) returns没看懂是什么意思

当前线程被唤醒之后,会将状态重新置为0
在这一过程中,如果等待超时或者线程被中断,将跳出循环进入到cancelAcquire流程,这里暂且不管。

第六步:再次自旋

如果前置节点为头节点时,first会在上一步置为true

else if (first && spins != 0) {
                --spins;                        // reduce unfairness on rewaits
                Thread.onSpinWait();
            }

spins这个值是在第四步park时右移计算得到的

最后会在 3->1->4->5->6步骤之间循环,直至获取锁,然后将当前节点置为头结点。

ReentrantLock#unlock()

	public final boolean release(int arg) {
        if (tryRelease(arg)) {
            signalNext(head);
            return true;
        }
        return false;
    }

		protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (getExclusiveOwnerThread() != Thread.currentThread())
                throw new IllegalMonitorStateException();
            boolean free = (c == 0);
            if (free)
                setExclusiveOwnerThread(null);
            setState(c);
            return free;
        }

unlock方法就比较简单,先尝试释放资源,由于是可重入锁,这里需要判断重入次数是否为0,为0之后才能进行signalNext

	private static void signalNext(Node h) {
        Node s;
        if (h != null && (s = h.next) != null && s.status != 0) {
            s.getAndUnsetStatus(WAITING);
            LockSupport.unpark(s.waiter);
        }
    }