AQS源码解析---以ReentrantLock为例

概述

        学习并发编程,就必须要了解到AQS(AbstractQueuedSynchronizer)。比如经常使用的ReentrantLock就是继承的AQS,而且大部分的线程操作都是由AQS完成,ReentrantLock自身仅仅负责实现当前线程对共享资源的获取和释放功能。而AQS负责实现了对没有获取到资源的线程放入同步队列、阻塞、唤醒、溢出阻塞队列的操作。

        其实对多线程获取资源分为三个步骤:

(1)线程尝试获取资源;

(2)将没有获取到资源的线程放入同步队列当中,线程进入等待状态(在此过程中,没有获取到资源的线程也会多次尝试重新获取资源);

(3)在同步队列中陷入等待状态的线程被上一个节点中断等待,重新尝试获取资源。

架构

        AQS的功能十分强大,既支持线程的共享资源抢占,也支持线程独占。这里先了解线程的资源独占方式。先明确两个概念:

(1)共享资源是否被抢占有一个信号标识,标识为0则资源未被抢占,可以在源码中看到大量的cas操作;

(2)线程在AQS中被封装成为一个node节点,node节点有前驱及后继,构成了等待队列。node节点有5中状态值:

  • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。

  • SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。

  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

  • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。

  • 0:新结点入队时的默认状态,此状态下的线程代表是就绪或执行状态。

我们这里先关注默认值0和SIGNAL(-1)两种状态即可。

1、ReentrantLock资源抢占

        ReentrantLock默认是非公平锁,因此当前线程会什么都不管,先进行一次cas操作尝试获取锁,如下:

        final void lock() {
            //当前线程直接尝试获取资源
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
             //开始进行后续的资源抢占方法
                acquire(1);
        }

        而如果是公平锁,则直接执行acquire(1)方法,这里是公平锁和非公平锁的区别之一,接下来还有一个地方可以展现两种锁的区别。

2、acquire()

        acquire()方法是在AQS中定义的,是对线程操作的入口方法,这里先进行大致说明。tryAcquire(arg)是让当前线程尝试获取资源,addWaiter(Node.EXCLUSIVE)是将当前线程封装为Node加入到等待队列,acquireQueued()是将线程执行park()方法进入到等待状态。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

2.1、tryAcquire()

        可以看下AQS当中的方法源码,没有任何的功能逻辑,只是抛出了个异常。没错,这个方法需要我们自己去实现。

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

        那我们看下ReentrantLock的tryAcquire()方法,ReentrantLock实现了公平锁和非公平锁两种方式,我们先看非公平锁

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

        好吧,那我们继续 nonfairTryAcquire(acquires)方法

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //执行cas操作抢占资源
                if (compareAndSetState(0, acquires)) {
                    //如果抢占成功,则设置当前线程为拥有资源的线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果资源已经被抢占了,则判断当前线程是否为拥有资源的线程
            else if (current == getExclusiveOwnerThread()) {
                //如果是,则把资源的占用数+1,注意,这里就是可重入锁的重要标志哦
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        公平锁的tryAcquire()方法差别不大,不过还是说一下吧,如下

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //和非公平锁的重要差别就是多了这一个hasQueuedPredecessors方法
                //这个方法就是判断当同步队列中是否有正在等待的线程,如果没有则当前线程就可以直接尝试获取资源,如果有那就只能乖乖排队了,是不是很公平
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

2.2、addWaiter

        前面尝试让线程获取资源不成功,因此接下来就需要将线程封装进Node中,然后放入队列的尾部进行排队了。

        private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node mode) {
            //将当前线程封装进node
            AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            //得到同步队列中的尾部结点
            AbstractQueuedSynchronizer.Node pred = tail;
            //如果尾部结点不为空
            if (pred != null) {
                //cas操作尝试把当前线程的节点插入值同步队列的末尾
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //同步队列中可能还没有插入任何节点,因此尾结点为空,则执行enq方法
            enq(node);
            return node;
        }

2.2.1、enq()

        通过cas自旋操作将node结点加入到同步队列当中。

        private AbstractQueuedSynchronizer.Node enq(final AbstractQueuedSynchronizer.Node node) {
            //开始自旋
            for (;;) {
                //得到同步队列尾部节点
                AbstractQueuedSynchronizer.Node t = tail;
                //判断尾部结点是否为null,是不是有些疑惑,上一步骤中已经判断尾结点为null了,为什么还要判断呢
                //不要忘记我们这里是多线程操作哦,在这个线程执行过程中,可能其它线程已经插入尾部结点了,因此需要再次判断
                if (t == null) { // Must initialize
                    //为null,说明同步队列还没初始化
                    //这里进行cas操作对同步队列进行初始化
                    if (compareAndSetHead(new AbstractQueuedSynchronizer.Node()))
                        tail = head;
                } else {
                    //不为null,说明同步队列已经初始化了,那么进行cas操作将次node结点插入就可以了
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        //经过自旋cas操作后
                        return t;
                    }
                }
            }
        }

2.3、acquireQueued()

        将线程封装进Node节点之后,线程不能一直处于运行状态占用着CPU吧,那么接下来就需要让线程进入等待状态了。那么随之而来有一个问题,线程阻塞之后谁来唤醒自己呢,先说结论,是由当前node节点的前驱节点唤醒的,但是任意的前驱都会唤醒自己的后继节点吗,当然不是了。因此上一步只是将node节点加入到了同步队列的尾部,接下来就必须为自己找一个能唤醒自己的前驱了。

        final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) {
            //让线程进入等待状态是否失败的标志,确切说应该是是否抛出异常的标志
            boolean failed = true;
            try {
                //线程是否打上了中断标志
                boolean interrupted = false;
                //又是自旋操作
                for (;;) {
                    //得到当前线程节点的前驱
                    final AbstractQueuedSynchronizer.Node p = node.predecessor();
                    /**
                     * 如果当前线程节点的前驱是同步队列的头结点,那么也就是说自己已经成为同步队列的二把手了
                     * 那么自己可以执行tryAcquire方法再次尝试抢占下资源了
                     * 因为线程状态的切换也是消耗资源的,所以能不改变线程的状态就不要改变
                     *
                     */
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    /**
                     * shouldParkAfterFailedAcquire()方法用于将当前线程节点找到合适的前驱结点,并告知前驱结点执行完之后记得唤醒自己
                     * parkAndCheckInterrupt()方法用于将线程执行park()方法进入等待状态
                     */
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

2.3.1、shouldParkAfterFailedAcquire()

        线程节点进入等待状态后,需要有其它线程唤醒自己,这里是通过当前结点的前驱唤醒的,但是不是所有的结点都可以唤醒自己的后继,因此找到一个靠谱的大哥是很重要的(找到一个能够唤醒自己的结点当做前驱)。

        private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node
        pred, AbstractQueuedSynchronizer.Node node) {
            //得到自己前驱的线程状态
            int ws = pred.waitStatus;
            //如果处于SIGNAL状态,那没事了,这个前驱会唤醒自己的
            if (ws == AbstractQueuedSynchronizer.Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
             //如果大于0,说明当前前驱的线程已经被取消了,根本不会再执行,那怎么也不会唤醒自己的
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                //开始一步步向前遍历,直到直到一个线程没被取消的结点
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                //找到之后,把前驱node结点的状态设置为SIGNAL,即告诉了自己的前驱执行完之后,记得唤醒自己这个等待线程
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, AbstractQueuedSynchronizer.Node.SIGNAL);
            }
            return false;
        }

2.3.2、parkAndCheckInterrupt()

        上一步骤当中终于为自己找到合适的前驱了,接下来就可以安心的让自己这个线程进入等待状态了。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

        以上终于是完完全全的将线程封装为Node放入了同步队列中、并让线程进入了等待状态,那么接下来是不是就等着占用着资源的线程执行完毕,然后释放锁并唤醒占用资源线程的后继线程了。

2.4 、unlock()

        ReentrantLock释放锁调用的是unlock()方法,里面方法很简单,调用了AQS的release()方法。

    public void unlock() {
        sync.release(1);
    }

2.5、release()

        释放锁要经过两个步骤:

        (1)让出当前线程所占用的资源,即修改资源占用信号量为0,设置占据当前资源的线程为null;

        (2)通知当前线程结点的后继结点开始抢占资源,即将后继结点的线程从阻塞等待状态就行唤醒转为就绪状态。

        public final boolean release(int arg) {
            //调用ReentrantLock自身的tryRelease()方法,将资源占用标识修改为0,资源占用线程修改为null
            if (tryRelease(arg)) {
                AbstractQueuedSynchronizer.Node h = head;
                //waitStatus!=0才能唤醒后继线程,这里就是为什么要将等待线程的前驱节点的waitStatus设置为SIGNAL(-1)的原因
                if (h != null && h.waitStatus != 0)
                    //唤醒当前线程的后继
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

2.5.1、tryRelease()

        那么现在就执行让出资源的第一步,注意可能是可重入锁,因此仅释放一次锁不代表被占用的资源被释放,一定要资源占用信号量为0时,才代表资源被释放了。

        protected final boolean tryRelease(int releases) {
            /**
             * 记得吗,ReentrantLock是可重入锁,因此不一定释放一次锁就释放了占用的资源
             * 所以每unlock()一次,占用锁的信号量就减一
             */
            
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //直到资源占用信号量为0了,那么说明真的让出了资源
            if (c == 0) {
                free = true;
                //此时把占用资源的线程设置为null
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

2.5.2、unparkSuccessor()

        好了,现在资源被释放了,可以通知下一个结点从等待状态转为就绪状态,进而去占用资源继续线程的执行了。

        private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            //将结点状态设置为0
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);

            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            AbstractQueuedSynchronizer.Node s = node.next;
            //得到当前线程结点的后继,如果后续未null,或者后继结点的状态为取消状态
            if (s == null || s.waitStatus > 0) {
                s = null;
                //从同步队列的尾部向前遍历,
                for (AbstractQueuedSynchronizer.Node t = tail; t != null && t != node; t = t.prev)
                    //得到第一个不是出于取消状态的结点,并将其插入到当前可执行结点的后继当中
                    //自己这里有些疑惑,为什么不是从前往后遍历,而是从后向前遍历呢
                    //感觉从后往前遍历会丢失一些处于等待状态的结点,这个后续我再进行思考,大家有了解的也请多多指教
                    if (t.waitStatus <= 0)
                        s = t;
            }
            //后继结点不为null,并且不是取消状态,那么唤醒后继结点,后继结点的线程可以去抢占资源了
            if (s != null)
                LockSupport.unpark(s.thread);
        }

2.6、acquireQueued()

        是不是很疑惑,为什么又要讲acquireQueued()方法,不是已经说过了吗?不过我想更疑惑的应该是,线程被唤醒之后为什么就可以重新尝试占用资源了呢?所以我们再来回顾下这个方法吧。

        线程在这个位置进入了等待状态,那么被唤醒之后,自然要继续往下执行。然后我们就发现,此时线程处于一个自旋操作中,只有成功抢占到资源才能推出自旋操作,否则只能再次进入等待状态,是不是到这里终于将线程的上锁、等待、解锁、抢占行程了一个闭环操作。学习到这里真的是不得不佩服这个Doug Lea老爷子设计思想的精妙。

3、总结

        好了,到这里AQS的介绍就完毕了,非常感谢这位博主博客提供的思路,Java并发之AQS详解 - waterystone - 博客园

        AQS的功能是十分强大的,我只整理出了部分代码的逻辑就花费了很多的精力,但也确实是收获满满,不过理解还不是特别深刻,如果有理解错误的地方大家请多多指教,有问题也请多多交流一起学习啊。