AQS

Apr 6, 2021


本文参考这里
参考这里对相关概念进行补充

概念

可重入锁

可重入锁是指同一个线程在外层方法获取锁的时候,在内层仍然可以使用,并且不会发生死锁(前提: 锁对象是同一个)
不会因为之前已经获取锁还没有释放而阻塞,ReentrantLock、synchronized 都是可重入锁,可重入锁的一个优点是可一定程度避免死锁
总结: 一个线程中的多个流程可以获取同一把锁,持有这把同步锁可以再次进入,可以获取自己的内部锁

可重入锁的种类分为两种:

  1. 隐式锁: 关键字 synchronized 使用的锁,默认是可重入锁 (包含同步代码块,同步方法)
  2. 显式锁: 即(Lock)人工手动获取锁和释放锁

实现原理:

  1. synchronized:任意 synchronized 锁对象,使用 java -c xx.class 编译以后会产生两个指令 monitorEnter 和 monitorExit,这两个指令代表锁的获取和锁的释放,多出来的 monitorExit 使程序异常的时候,可以正常的释放锁。每个锁对象拥有一个锁计数器和一个指向持有该锁的线程指针,当执行 moniterenter 时,如果目标锁对象的计数器为零,那么说明他没有被其他线程持有,Java 虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1;在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么java虚拟机计数器可以加1;否则需要等待,直至持有线程释放锁。当执行 monitorexit 时,Java 虚拟机则将锁对象的计数器减1,计数器为0说明锁已经释放。
  2. ReentrantLock:如果加锁次数和释放锁次数不一致,会导致其他线程始终无法获取到锁,程序会一直阻塞。正常情况下,加锁几次就要解锁几次。

公平锁和非公平锁

  1. 公平锁是指多个线程按照申请锁的顺序来获取锁
  2. 非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程先获取锁。在高并发情况下,有可能造成优先级反转或饥饿现象
  3. synchronized 是一种非公平锁

自旋锁

自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少上下文切换,缺点是循环会消耗 CPU

独占锁和共享锁

  1. 独占锁指该锁一次只能被一个线程占有,对 ReentratLock 和 synchronized 而言都是独占锁
  2. 共享锁指该锁可被多个线程所持有。对 ReentrantReadWriterLock 来说,其读锁是共享锁,其写锁是独占锁,读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的

synchronized 和 Lock 的区别

  1. 实现方式不同:synchronized 关键字属于 JVM 层面,当线程进入有 synchronized 修饰的方法时,底层会通过 monitor 对象来完成;其实 wait 和 notify 等方法也依赖于 monitor 对象;只有在同步块和方法中才能调用wait和notify等方法。Lock 是具体的 Java 类,底层实现是通过API调用完成锁的功能
  2. 使用方式不同:

    • 释放锁的区别:synchronized 不需要用户手动释放锁,代码执行完后系统会自动让线程释放锁;ReentrantLock 则需要用户结合 try/finally 语句块手动释放锁,若没有主动释放锁,就有可能导致死锁
    • 等待是否可中断的区别:synchronized 不可中断,除非抛出异常或者正常运行完成;ReentrantLock 可以通过设置超时时间 tryLock(long timeout TimeUnit unit)或者调用interrupt方法 lockInterruptibly()来中断
  3. 公平性的区别:synchronized 是非公平锁;ReetrantLock 可以通过传参决定使用公平锁或非公平锁,默认非公平锁
  4. 绑定多个条件的区别:synchronized 无法绑定多个条件;ReetrantLock 可以用来实现分组唤醒需要唤醒的线程,可以精确唤醒,而不是像 synchronized 随机唤醒一个或者全部唤醒

LockSupport

线程唤醒方式

  1. 使用 Object 中的 wait 方法让线程等待,使用 Object 中的 notify 方法唤醒线程
  2. 使用 JUC 包中 Condition 的 awati 方法让线程等待,使用 signal 唤醒线程
  3. 使用 LockSupport 类可以阻塞线程以及唤醒执行被阻塞的线程

传统唤醒方式的局限

  1. Object 类中的 wait、notify、notifyAll 用于线程等待和唤醒的方法,都必须要在 synchronized 内部执行
  2. 将 notify 放在 wait 方法前面执行,则程序一直无法结束;只有先 wait 后 notify、notifyAll 方法,等待中的线程才会唤醒
  3. wait 和 notify 方法 必须在同步块或者方法里面成对出现

结论: 传统的 synchronized 和 Lock 实现等待唤醒通知的约束时,线程需要先获得持有锁,并且必须在锁块(syncronized或lock)中先等待后唤醒

LockSupprot

  1. LockSupport 是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞
  2. LockSupport 的 park 和 unpark 的作用分别是阻塞线程和解除阻塞线程
  3. 每个线程都有一个许可,Permit只有两个值1和0,默认是0;可以把许可看成一种(0,1)信号量(Semaphore),但与Semphore不同的是,许可的累加上上限是1
  4. 调用一次 unpark 方法凭证就会加1变成1;调用一次 park 方法就会消费凭证,即凭证减一变成0,同时 park 立即返回,正常退出
  5. 如果凭证为 0 后,再次调用 park 会阻塞,直到凭证变成1;这时调用 unpark 凭证会变成 1,然后立刻触发阻塞的 park 执行将凭证变为 0
  6. 每个线程都有一个相关的凭证 permit,permit 只有一个且最大为 1,重复调用 unpark 不会积累凭证

方法实现:

  1. 阻塞方法 park:

     public static void park(){
         //permit默认是0,所以一开始调用park方法,当前线程就会阻塞,
         //直到别的线程将当前线程的permit设置为1时,park方法会被唤醒
         //然后会将permit再次设置为0并返回
         UNSAFE.park(false,0L);
     }
    
  2. 解除阻塞方法 unpark

     public static void unpark(Thread thread){
         //调用unpark()方法后,就会将thread线程的许可证permit设置成1.
         //注意多次调用unpark方法不会累加,permit值还是1,会自动唤醒thread线程
         //即之前阻塞中的LockSupport()方法会立即返回
         UNSAFE.unpark(thread);
     }
    

优势:

  1. 无锁块要求
  2. 先唤醒后等待,没有阻塞效果

AQS

AQS(AbstractQueuedSynchronizer)抽象队列同步器是一个没有抽象方法的抽象类,其提供了五个没有实现的方法,使用者可以按需实现
AQS 是用来构建锁或者其他同步器组件的重量级基础框架及整个JUC体系的基石 其内部维护一个 volatile int 类型的变量 state 作为锁资源,另外维护一个 CLH 队列用于线程排队,CLH 队列本质上就是一个先进先出(FIFO)的双向链表
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列,所谓虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系
AQS 是将每一条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node),来实现锁的分配
AQS 主要使用了自旋和 unsafe 的 CAS 方法
AQS 使用了模板模式,内部定义了很多 final 修饰的方法作为模板,比如:

public final void acquire(int arg)
public final void acquireShared(int arg)
public final boolean release(int arg)
public final boolean releaseShared(int arg)

这些模板主要完成了线程入列、出列、自旋、排队、挂起、唤醒等通用功能的实现,使用者只需要实现共享资源的获取与释放即可
模板内部引用的如下其中五个方法是没有实现的:

// 独占模式获取资源
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
// 独占模式释放资源
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
// 共享模式获取资源,负数失败;0 成功,但是没有资源;正数表示成功,且有剩余可用资源
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
// 共享模式释放资源,若释放资源后允许唤醒后续等待节点返回 true,否则返回 false
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

// 是否为独占模式;只有使用了 Condition 才需要去维护,比如 ReentrantLock
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

源码

内部类

AQS 有一个静态内部类 Node,CLH 队列就是由一个一个 Node 组成的双向链表

static final class Node {
    // 以下两个常量用于标记一个节点是共享还是独占,用于 addWaiter 方法
    // 共享模式
    static final Node SHARED = new Node();
    // 独占模式
    static final Node EXCLUSIVE = null;

    // 以下常量为 waitStatus 的含义
    // 节点已取消;当 timeout 或者被中断时会触发变更为此状态;进入该状态后的节点将不再变化
    static final int CANCELLED =  1;
    // 后继节点处于等待状态;后继节点进入队列时,会将前继节点的状态更新为 -1
    static final int SIGNAL    = -1;
    // 节点在等待队列中;当其他线程对 Condition 调用了 signal 方法后,该节点将会从等待队列转移到同步队列中,加入到对同步状态的获取中
    static final int CONDITION = -2;
    // 在共享模式下,前继节点在释放资源后,会唤醒后继节点,并将这种共享模式传播下去
    static final int PROPAGATE = -3;

    // 节点状态,取值除了上方常量外还有一个默认值 0,含义为:None of the above
    volatile int waitStatus;

    // 指向前一节点
    volatile Node prev;

    // 指向后一节点
    volatile Node next;

    // 节点中的线程
    volatile Thread thread;

    // 等待队列中的下一个节点
    Node nextWaiter;

    // 是否共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 获取前一个节点,为空则抛异常
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    // 创建节点,用于初始化队列或者 SHARED
    Node() {
    }

    // 创建节点,仅用于 addWaiter 方法
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    // 创建节点,仅用于 addConditionWaiter 方法
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

waitStatus 的含义:

  1. CANCELLED = 1:节点已取消;当 timeout 或者被中断时会触发变更为此状态;进入该状态后的节点将不再变化
  2. SIGNAL = -1:后继节点处于等待状态;后继节点进入队列时,会将前继节点的状态更新为 -1
  3. CONDITION = -2:节点在等待队列中;当其他线程对 Condition 调用了 signal 方法后,该节点将会从等待队列转移到同步队列中,加入到对同步状态的获取中
  4. PROPAGATE = -3:在共享模式下,前继节点在释放资源后,会唤醒后继节点,并将这种共享模式传播下去

可以看出:节点状态为负数表示节点处于有效等待状态,而正数表示节点已经被取消了
所以,AQS 中很多地方用 waitStatus > 0 或者 waitStatus < 0 来判断队列中的节点是否正常

类结构

结合 ReentrantLock 类查看 AQS 独占锁的源码
独占模式下,只能有一个线程占有锁资源,其他竞争锁资源的线程在失败后都会进入等待队列,等待占有锁资源的线程释放锁,再重新被唤醒竞争锁资源
先看其类结构

img

最顶级的抽象类为 AOS(AbstractOwnableSynchronizer) ,其内部非常简单,用于记录独占模式下哪个线程持有锁

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {

    // 独占模式的当前所有者
    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

次一级的抽象类就是要看的重点 AQS(AbstractQueuedSynchronizer)
最后的 ReentrantLock 类中有三个静态内部类:

  1. Sync 是继承 AQS 的抽象类,其有一个抽象方法 lock()
  2. FairSync 和 NonfairSync 是 Sync 的子类,分别实现公平锁和非公平锁

ReentrantLock 的构造方法有两个,默认是非公平锁,可以通过传参使用公平锁

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

加锁

ReentrantLock 使用 lock() 方法加锁,默认非公平锁实现如下:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        // 通过CAS的方式加锁,加锁成功将state设置为1
        if (compareAndSetState(0, 1))
            // 加锁成功将对象独占线程设置为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

非公平锁加锁步骤为:

  1. 通过CAS的方式加锁,加锁成功将state设置为1,其代码位于 AQS 中

     protected final boolean compareAndSetState(int expect, int update) {
         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
     }
    
  2. 加锁成功将对象独占线程设置为当前线程,其代码位于 AOS 中
  3. 加锁失败执行方法:acquire(1); 其代码位于 AQS 中,是 final 方法,是 AQS 定义的模板

由下可见,acquire(1); 方法包含四个方法,依次看一下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire 方法在 AQS 内未实现,需要看子类,NonfairSync 内的 tryAcquire 则直接调用了父类 Sync 的 nonfairTryAcquire 方法

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 获取 state 的值
    int c = getState();
    if (c == 0) {
        // state = 0 说明无锁,无锁则通过 CAS 加锁,加锁成功设置当前线程为独占线程并返回成功
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 如果有锁,且是当前线程持有锁,则重入
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 有锁且不是当前线程持有锁,则返回失败
    return false;
}
  1. 获取 state 的值,state 为 0 说明无锁,否则有锁
  2. 无锁则通过 CAS 加锁,加锁成功设置当前线程为独占线程并返回成功
  3. 有锁且是当前线程持有锁,则重入并返回成功
  4. 有锁且不是当前线程持有锁,则返回失败

如果 tryAcquire 加锁失败,则先执行 addWaiter 方法,再执行 acquireQueued 方法

addWaiter 方法:

private Node addWaiter(Node mode) {
    // 构造一个节点(EXCLUSIVE【独占】,SHARED【共享】)
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        // 队列的尾部节点不为空,则将构造的新节点追加至队列,并将 tail 指向新节点
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果尾部节点为空或者插入失败,则执行 enq 方法
    enq(node);
    return node;
}

private Node enq(final Node node) {
    // 死循环,自旋
    for (;;) {
        Node t = tail;
        if (t == null) {
            // 如果尾部节点为空则初始化队列
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 尾部节点不为空,则将当前节点插入队列尾部;一直自旋,直到插入成功
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter + enq 方法在初始化一个队列后,将当前线程封装为一个节点,并插入到了队列尾部

然后是 acquireQueued 方法:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取前级节点,如果为null,则抛异常
            final Node p = node.predecessor();
            // 如果前级节点是 head,则尝试抢锁
            if (p == head && tryAcquire(arg)) {
                // 抢锁成功,将当前节点设置为新的 head
                setHead(node);
                // 旧的 head 置为空,方便垃圾回收
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果前级节点不为head,或者抢锁失败,则根据节点的状态决定是否需要挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 如果获取锁异常,则取消获取锁操作
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取前级节点的 waitStatus
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 如果前级节点的 waitStatus 为 SIGNAL(-1),说明当前节点已经在等待唤醒了,所以直接返回true
        return true;
    if (ws > 0) {
        // 如果前级节点的 waitStatus > 0,说明已经取消,则向前遍历,直到找到节点状态不为取消的节点,然后将其设置为当前节点的前级节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果前级节点的 waitStatus = 0 或者不等于 -1 的 < 0 的值,则将当前节点的前级节点设置为 SIGNAL(-1)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// shouldParkAfterFailedAcquire 方法返回 true,则执行线程挂起
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

LockSupport 是 JDK1.6 开始提供的一个线程同步工具类,这里主要用其挂起线程和唤醒线程。
LockSupport 的挂起和唤醒线程都是不可重入的,它有一个许可标志,当调用 park 时,会将许可设置为 0,挂起线程;如果再调用一次 park ,会阻塞线程。当调用 unpark 时才会将许可标志设置为 1

private static void setBlocker(Thread t, Object arg) {
    UNSAFE.putObject(t, parkBlockerOffset, arg);
}

public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

非公平锁加锁总结

img

释放锁

ReentrantLock 的释放锁方法 unlock 调用了 AQS 的模板方法 release

public void unlock() {
    sync.release(1);
}

AQS 的 release 方法如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 释放成功后,判断头节点的状态是否为无锁状态,如果不为无锁状态,则将头节点中的线程唤醒
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease 的实现位于 ReentrantLock 的抽象类 Sync 中,具体如下:

protected final boolean tryRelease(int releases) {
    // 因为是重入锁,所以每释放一次减一
    int c = getState() - releases;
    // 释放资源的线程和持有锁的线程不一致,则抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 完全释放锁
        free = true;
        // 只有将独占线程设置为 null,后续竞争线程才有可能抢占
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}    

tryRelease 在释放锁资源时,可以单纯的理解为是修改独占模式的状态值和置空占有线程
将 state 的值每次减去相应的参数值(一般是1),如果结果为 0,则将独占线程置为 null,这样后续竞争线程才有可能抢占锁
由于 ReentrantLock 的重入机制,线程每加一次锁,state 都会加一,所以解锁时必须执行同样的次数,state 才能为 0,才能释放锁

释放锁成功后,唤醒线程的 unparkSuccessor 方法位于 AQS 中:

private void unparkSuccessor(Node node) {
    // 获取节点的等待状态(此处由传参可知是头节点)
    int ws = node.waitStatus;
    if (ws < 0)
        // 将状态置为无锁状态
        compareAndSetWaitStatus(node, ws, 0);

    // 获取需要唤醒的下一个节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        // 如果下一个节点为空,或者已取消
        s = null;
        // 则从尾部向前找,直到找到一个未取消的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 如果存在未取消的节点,则唤醒其线程
        LockSupport.unpark(s.thread);
}

unparkSuccessor 的流程为:将需要释放的线程节点置为无锁状态,然后去队列中找到可以唤醒的节点,进行线程唤醒
为什么去队列中找可以唤醒的节点时要从后找呢?英文注释说:从前找,下一个节点有可能取消了或者为 null 了,所以从后往前找

非公平锁释放总结

释放锁相对来说比较简单,所以流程比较简略

img

公平锁加锁

ReentrantLock 的公平锁和非公平锁在释放锁的逻辑上没有区别,均调用的 AQS 的模板 release 方法。二者的差异体现在加锁上,其加锁差异如下:

  1. 非公平锁先尝试加锁,失败再执行 acquire:

     final void lock() {
         if (compareAndSetState(0, 1))
             setExclusiveOwnerThread(Thread.currentThread());
         else
             acquire(1);
     }
    

    公平锁直接 acquire

     final void lock() {
         acquire(1);
     }
    
  2. 非公平锁在 tryAcquire 时如果 getState 为 0,则直接尝试加锁

         if (c == 0) {
             if (compareAndSetState(0, acquires)) {
                 setExclusiveOwnerThread(current);
                 return true;
             }
         }
    

    公平锁则先判断是否有前节点,没有才尝试加锁

         if (c == 0) {
             if (!hasQueuedPredecessors() &&
                 compareAndSetState(0, acquires)) {
                 setExclusiveOwnerThread(current);
                 return true;
             }
         }
            
        public final boolean hasQueuedPredecessors() {
             Node t = tail;
             Node h = head;
             Node s;
             // 当前线程不在头节点的下一个节点,说明其有前节点
             return h != t &&
                 ((s = h.next) == null || s.thread != Thread.currentThread());
         }
    

可见线程在非公平锁的情况下,并不是按照先来后到的顺序获取锁的,而是一有机会就尝试插队。。。

共享锁加锁

结合 CountDownLatch 看 AQS 的共享锁逻辑,由于和独占锁共享很多 AQS 中的方法,所以会比较简单
CountDownLatch 内部有一个静态内部类 Sync,实现了 tryAcquireShared、tryReleaseShared 等方法
CountDownLatch 的构造方法调用了静态内部类的构造方法,本质上就是给 AQS 的 state 赋值

// CountDownLatch 的构造方法调用了 Sync 的构造方法
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    // 给 AQS 中的 state 赋值
    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    ……
}

再来看加锁方法 await(),其调用了 AQS 的 acquireSharedInterruptibly 方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        // 如果线程中断,则抛异常
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        // 尝试获取资源失败,则进入自旋
        doAcquireSharedInterruptibly(arg);
}

tryAcquireShared 位于静态内部类 Sync

protected int tryAcquireShared(int acquires) {
    // 获取 state 值,值为 0 成功返回 1,失败返回 -1
    return (getState() == 0) ? 1 : -1;
}

doAcquireSharedInterruptibly 位于 AQS 中

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 构造共享节点并加入队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 前级节点是头节点,则尝试获取资源,失败则自旋
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 获取成功,则将head节点指向自己
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 判断是否需要挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

setHeadAndPropagate 方法翻译为“设置头节点并传播”,其实就是在获取共享锁资源的时候,如果资源在唤醒一个节点后还有剩余,则继续唤醒后续的节点,直到资源被用完,体现了共享模式的共享。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    // 如果唤醒下一个节点后资源还有剩余,且新唤醒的节点不为无效状态,就继续唤醒队列中后续节点中的线程
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 在释放锁一起解释
            doReleaseShared();
    }
}

共享锁释放锁

CountDownLatch 的释放锁直接调用了 AQS 的模板

public void countDown() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 释放锁成功后,唤醒节点
        doReleaseShared();
        return true;
    }
    return false;
}

还是比较简单的,通过 CAS 释放锁,释放成功之后如果为0,说明释放成功

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0)
            // 释放锁却发现不需要释放,则返回失败
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            // 释放成功且 state 为 0,则返回成功,否则失败
            return nextc == 0;
    }
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果头节点状态为等待唤醒,则将头节点的状态置为无锁状态,设置失败则自旋
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒头节点
                unparkSuccessor(h);
            }
            // 如果头节点的状态已经为无锁状态了,那么将头节点的状态设置为传播唤醒状态
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}