概念
可重入锁
可重入锁是指同一个线程在外层方法获取锁的时候,在内层仍然可以使用,并且不会发生死锁(前提: 锁对象是同一个)
不会因为之前已经获取锁还没有释放而阻塞,ReentrantLock、synchronized 都是可重入锁,可重入锁的一个优点是可一定程度避免死锁
总结: 一个线程中的多个流程可以获取同一把锁,持有这把同步锁可以再次进入,可以获取自己的内部锁
可重入锁的种类分为两种:
- 隐式锁: 关键字 synchronized 使用的锁,默认是可重入锁 (包含同步代码块,同步方法)
- 显式锁: 即(Lock)人工手动获取锁和释放锁
实现原理:
- synchronized:任意 synchronized 锁对象,使用 java -c xx.class 编译以后会产生两个指令 monitorEnter 和 monitorExit,这两个指令代表锁的获取和锁的释放,多出来的 monitorExit 使程序异常的时候,可以正常的释放锁。每个锁对象拥有一个锁计数器和一个指向持有该锁的线程指针,当执行 moniterenter 时,如果目标锁对象的计数器为零,那么说明他没有被其他线程持有,Java 虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1;在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么java虚拟机计数器可以加1;否则需要等待,直至持有线程释放锁。当执行 monitorexit 时,Java 虚拟机则将锁对象的计数器减1,计数器为0说明锁已经释放。
- ReentrantLock:如果加锁次数和释放锁次数不一致,会导致其他线程始终无法获取到锁,程序会一直阻塞。正常情况下,加锁几次就要解锁几次。
公平锁和非公平锁
- 公平锁是指多个线程按照申请锁的顺序来获取锁
- 非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程先获取锁。在高并发情况下,有可能造成优先级反转或饥饿现象
- synchronized 是一种非公平锁
自旋锁
自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少上下文切换,缺点是循环会消耗 CPU
独占锁和共享锁
- 独占锁指该锁一次只能被一个线程占有,对 ReentratLock 和 synchronized 而言都是独占锁
- 共享锁指该锁可被多个线程所持有。对 ReentrantReadWriterLock 来说,其读锁是共享锁,其写锁是独占锁,读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的
synchronized 和 Lock 的区别
- 实现方式不同:synchronized 关键字属于 JVM 层面,当线程进入有 synchronized 修饰的方法时,底层会通过 monitor 对象来完成;其实 wait 和 notify 等方法也依赖于 monitor 对象;只有在同步块和方法中才能调用wait和notify等方法。Lock 是具体的 Java 类,底层实现是通过API调用完成锁的功能
-
使用方式不同:
- 释放锁的区别:synchronized 不需要用户手动释放锁,代码执行完后系统会自动让线程释放锁;ReentrantLock 则需要用户结合 try/finally 语句块手动释放锁,若没有主动释放锁,就有可能导致死锁
- 等待是否可中断的区别:synchronized 不可中断,除非抛出异常或者正常运行完成;ReentrantLock 可以通过设置超时时间 tryLock(long timeout TimeUnit unit)或者调用interrupt方法 lockInterruptibly()来中断
- 公平性的区别:synchronized 是非公平锁;ReetrantLock 可以通过传参决定使用公平锁或非公平锁,默认非公平锁
- 绑定多个条件的区别:synchronized 无法绑定多个条件;ReetrantLock 可以用来实现分组唤醒需要唤醒的线程,可以精确唤醒,而不是像 synchronized 随机唤醒一个或者全部唤醒
LockSupport
线程唤醒方式
- 使用 Object 中的 wait 方法让线程等待,使用 Object 中的 notify 方法唤醒线程
- 使用 JUC 包中 Condition 的 awati 方法让线程等待,使用 signal 唤醒线程
- 使用 LockSupport 类可以阻塞线程以及唤醒执行被阻塞的线程
传统唤醒方式的局限
- Object 类中的 wait、notify、notifyAll 用于线程等待和唤醒的方法,都必须要在 synchronized 内部执行
- 将 notify 放在 wait 方法前面执行,则程序一直无法结束;只有先 wait 后 notify、notifyAll 方法,等待中的线程才会唤醒
- wait 和 notify 方法 必须在同步块或者方法里面成对出现
结论: 传统的 synchronized 和 Lock 实现等待唤醒通知的约束时,线程需要先获得持有锁,并且必须在锁块(syncronized或lock)中先等待后唤醒
LockSupprot
- LockSupport 是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞
- LockSupport 的 park 和 unpark 的作用分别是阻塞线程和解除阻塞线程
- 每个线程都有一个许可,Permit只有两个值1和0,默认是0;可以把许可看成一种(0,1)信号量(Semaphore),但与Semphore不同的是,许可的累加上上限是1
- 调用一次 unpark 方法凭证就会加1变成1;调用一次 park 方法就会消费凭证,即凭证减一变成0,同时 park 立即返回,正常退出
- 如果凭证为 0 后,再次调用 park 会阻塞,直到凭证变成1;这时调用 unpark 凭证会变成 1,然后立刻触发阻塞的 park 执行将凭证变为 0
- 每个线程都有一个相关的凭证 permit,permit 只有一个且最大为 1,重复调用 unpark 不会积累凭证
方法实现:
-
阻塞方法 park:
public static void park(){ //permit默认是0,所以一开始调用park方法,当前线程就会阻塞, //直到别的线程将当前线程的permit设置为1时,park方法会被唤醒 //然后会将permit再次设置为0并返回 UNSAFE.park(false,0L); }
-
解除阻塞方法 unpark
public static void unpark(Thread thread){ //调用unpark()方法后,就会将thread线程的许可证permit设置成1. //注意多次调用unpark方法不会累加,permit值还是1,会自动唤醒thread线程 //即之前阻塞中的LockSupport()方法会立即返回 UNSAFE.unpark(thread); }
优势:
- 无锁块要求
- 先唤醒后等待,没有阻塞效果
AQS
AQS(AbstractQueuedSynchronizer)抽象队列同步器是一个没有抽象方法的抽象类,其提供了五个没有实现的方法,使用者可以按需实现
AQS 是用来构建锁或者其他同步器组件的重量级基础框架及整个JUC体系的基石
其内部维护一个 volatile int 类型的变量 state 作为锁资源,另外维护一个 CLH 队列用于线程排队,CLH 队列本质上就是一个先进先出(FIFO)的双向链表
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列,所谓虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系
AQS 是将每一条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node),来实现锁的分配
AQS 主要使用了自旋和 unsafe 的 CAS 方法
AQS 使用了模板模式,内部定义了很多 final 修饰的方法作为模板,比如:
public final void acquire(int arg)
public final void acquireShared(int arg)
public final boolean release(int arg)
public final boolean releaseShared(int arg)
这些模板主要完成了线程入列、出列、自旋、排队、挂起、唤醒等通用功能的实现,使用者只需要实现共享资源的获取与释放即可
模板内部引用的如下其中五个方法是没有实现的:
// 独占模式获取资源
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 独占模式释放资源
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式获取资源,负数失败;0 成功,但是没有资源;正数表示成功,且有剩余可用资源
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式释放资源,若释放资源后允许唤醒后续等待节点返回 true,否则返回 false
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 是否为独占模式;只有使用了 Condition 才需要去维护,比如 ReentrantLock
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
源码
内部类
AQS 有一个静态内部类 Node,CLH 队列就是由一个一个 Node 组成的双向链表
static final class Node {
// 以下两个常量用于标记一个节点是共享还是独占,用于 addWaiter 方法
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 以下常量为 waitStatus 的含义
// 节点已取消;当 timeout 或者被中断时会触发变更为此状态;进入该状态后的节点将不再变化
static final int CANCELLED = 1;
// 后继节点处于等待状态;后继节点进入队列时,会将前继节点的状态更新为 -1
static final int SIGNAL = -1;
// 节点在等待队列中;当其他线程对 Condition 调用了 signal 方法后,该节点将会从等待队列转移到同步队列中,加入到对同步状态的获取中
static final int CONDITION = -2;
// 在共享模式下,前继节点在释放资源后,会唤醒后继节点,并将这种共享模式传播下去
static final int PROPAGATE = -3;
// 节点状态,取值除了上方常量外还有一个默认值 0,含义为:None of the above
volatile int waitStatus;
// 指向前一节点
volatile Node prev;
// 指向后一节点
volatile Node next;
// 节点中的线程
volatile Thread thread;
// 等待队列中的下一个节点
Node nextWaiter;
// 是否共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前一个节点,为空则抛异常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 创建节点,用于初始化队列或者 SHARED
Node() {
}
// 创建节点,仅用于 addWaiter 方法
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// 创建节点,仅用于 addConditionWaiter 方法
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
waitStatus 的含义:
- CANCELLED = 1:节点已取消;当 timeout 或者被中断时会触发变更为此状态;进入该状态后的节点将不再变化
- SIGNAL = -1:后继节点处于等待状态;后继节点进入队列时,会将前继节点的状态更新为 -1
- CONDITION = -2:节点在等待队列中;当其他线程对 Condition 调用了 signal 方法后,该节点将会从等待队列转移到同步队列中,加入到对同步状态的获取中
- PROPAGATE = -3:在共享模式下,前继节点在释放资源后,会唤醒后继节点,并将这种共享模式传播下去
可以看出:节点状态为负数表示节点处于有效等待状态,而正数表示节点已经被取消了
所以,AQS 中很多地方用 waitStatus > 0 或者 waitStatus < 0 来判断队列中的节点是否正常
类结构
结合 ReentrantLock 类查看 AQS 独占锁的源码
独占模式下,只能有一个线程占有锁资源,其他竞争锁资源的线程在失败后都会进入等待队列,等待占有锁资源的线程释放锁,再重新被唤醒竞争锁资源
先看其类结构
最顶级的抽象类为 AOS(AbstractOwnableSynchronizer) ,其内部非常简单,用于记录独占模式下哪个线程持有锁
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
// 独占模式的当前所有者
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
次一级的抽象类就是要看的重点 AQS(AbstractQueuedSynchronizer)
最后的 ReentrantLock 类中有三个静态内部类:
- Sync 是继承 AQS 的抽象类,其有一个抽象方法 lock()
- FairSync 和 NonfairSync 是 Sync 的子类,分别实现公平锁和非公平锁
ReentrantLock 的构造方法有两个,默认是非公平锁,可以通过传参使用公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
加锁
ReentrantLock 使用 lock() 方法加锁,默认非公平锁实现如下:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
// 通过CAS的方式加锁,加锁成功将state设置为1
if (compareAndSetState(0, 1))
// 加锁成功将对象独占线程设置为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
非公平锁加锁步骤为:
-
通过CAS的方式加锁,加锁成功将state设置为1,其代码位于 AQS 中
protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
- 加锁成功将对象独占线程设置为当前线程,其代码位于 AOS 中
- 加锁失败执行方法:acquire(1); 其代码位于 AQS 中,是 final 方法,是 AQS 定义的模板
由下可见,acquire(1); 方法包含四个方法,依次看一下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire 方法在 AQS 内未实现,需要看子类,NonfairSync 内的 tryAcquire 则直接调用了父类 Sync 的 nonfairTryAcquire 方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取 state 的值
int c = getState();
if (c == 0) {
// state = 0 说明无锁,无锁则通过 CAS 加锁,加锁成功设置当前线程为独占线程并返回成功
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 如果有锁,且是当前线程持有锁,则重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 有锁且不是当前线程持有锁,则返回失败
return false;
}
- 获取 state 的值,state 为 0 说明无锁,否则有锁
- 无锁则通过 CAS 加锁,加锁成功设置当前线程为独占线程并返回成功
- 有锁且是当前线程持有锁,则重入并返回成功
- 有锁且不是当前线程持有锁,则返回失败
如果 tryAcquire 加锁失败,则先执行 addWaiter 方法,再执行 acquireQueued 方法
addWaiter 方法:
private Node addWaiter(Node mode) {
// 构造一个节点(EXCLUSIVE【独占】,SHARED【共享】)
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
// 队列的尾部节点不为空,则将构造的新节点追加至队列,并将 tail 指向新节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果尾部节点为空或者插入失败,则执行 enq 方法
enq(node);
return node;
}
private Node enq(final Node node) {
// 死循环,自旋
for (;;) {
Node t = tail;
if (t == null) {
// 如果尾部节点为空则初始化队列
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 尾部节点不为空,则将当前节点插入队列尾部;一直自旋,直到插入成功
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter + enq 方法在初始化一个队列后,将当前线程封装为一个节点,并插入到了队列尾部
然后是 acquireQueued 方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前级节点,如果为null,则抛异常
final Node p = node.predecessor();
// 如果前级节点是 head,则尝试抢锁
if (p == head && tryAcquire(arg)) {
// 抢锁成功,将当前节点设置为新的 head
setHead(node);
// 旧的 head 置为空,方便垃圾回收
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果前级节点不为head,或者抢锁失败,则根据节点的状态决定是否需要挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果获取锁异常,则取消获取锁操作
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前级节点的 waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前级节点的 waitStatus 为 SIGNAL(-1),说明当前节点已经在等待唤醒了,所以直接返回true
return true;
if (ws > 0) {
// 如果前级节点的 waitStatus > 0,说明已经取消,则向前遍历,直到找到节点状态不为取消的节点,然后将其设置为当前节点的前级节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前级节点的 waitStatus = 0 或者不等于 -1 的 < 0 的值,则将当前节点的前级节点设置为 SIGNAL(-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// shouldParkAfterFailedAcquire 方法返回 true,则执行线程挂起
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport 是 JDK1.6 开始提供的一个线程同步工具类,这里主要用其挂起线程和唤醒线程。
LockSupport 的挂起和唤醒线程都是不可重入的,它有一个许可标志,当调用 park 时,会将许可设置为 0,挂起线程;如果再调用一次 park ,会阻塞线程。当调用 unpark 时才会将许可标志设置为 1
private static void setBlocker(Thread t, Object arg) {
UNSAFE.putObject(t, parkBlockerOffset, arg);
}
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
非公平锁加锁总结
释放锁
ReentrantLock 的释放锁方法 unlock 调用了 AQS 的模板方法 release
public void unlock() {
sync.release(1);
}
AQS 的 release 方法如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 释放成功后,判断头节点的状态是否为无锁状态,如果不为无锁状态,则将头节点中的线程唤醒
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease 的实现位于 ReentrantLock 的抽象类 Sync 中,具体如下:
protected final boolean tryRelease(int releases) {
// 因为是重入锁,所以每释放一次减一
int c = getState() - releases;
// 释放资源的线程和持有锁的线程不一致,则抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 完全释放锁
free = true;
// 只有将独占线程设置为 null,后续竞争线程才有可能抢占
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease 在释放锁资源时,可以单纯的理解为是修改独占模式的状态值和置空占有线程
将 state 的值每次减去相应的参数值(一般是1),如果结果为 0,则将独占线程置为 null,这样后续竞争线程才有可能抢占锁
由于 ReentrantLock 的重入机制,线程每加一次锁,state 都会加一,所以解锁时必须执行同样的次数,state 才能为 0,才能释放锁
释放锁成功后,唤醒线程的 unparkSuccessor 方法位于 AQS 中:
private void unparkSuccessor(Node node) {
// 获取节点的等待状态(此处由传参可知是头节点)
int ws = node.waitStatus;
if (ws < 0)
// 将状态置为无锁状态
compareAndSetWaitStatus(node, ws, 0);
// 获取需要唤醒的下一个节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 如果下一个节点为空,或者已取消
s = null;
// 则从尾部向前找,直到找到一个未取消的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 如果存在未取消的节点,则唤醒其线程
LockSupport.unpark(s.thread);
}
unparkSuccessor 的流程为:将需要释放的线程节点置为无锁状态,然后去队列中找到可以唤醒的节点,进行线程唤醒
为什么去队列中找可以唤醒的节点时要从后找呢?英文注释说:从前找,下一个节点有可能取消了或者为 null 了,所以从后往前找
非公平锁释放总结
释放锁相对来说比较简单,所以流程比较简略
公平锁加锁
ReentrantLock 的公平锁和非公平锁在释放锁的逻辑上没有区别,均调用的 AQS 的模板 release 方法。二者的差异体现在加锁上,其加锁差异如下:
-
非公平锁先尝试加锁,失败再执行 acquire:
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
公平锁直接 acquire
final void lock() { acquire(1); }
-
非公平锁在 tryAcquire 时如果 getState 为 0,则直接尝试加锁
if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
公平锁则先判断是否有前节点,没有才尝试加锁
if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; // 当前线程不在头节点的下一个节点,说明其有前节点 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
可见线程在非公平锁的情况下,并不是按照先来后到的顺序获取锁的,而是一有机会就尝试插队。。。
共享锁加锁
结合 CountDownLatch 看 AQS 的共享锁逻辑,由于和独占锁共享很多 AQS 中的方法,所以会比较简单
CountDownLatch 内部有一个静态内部类 Sync,实现了 tryAcquireShared、tryReleaseShared 等方法
CountDownLatch 的构造方法调用了静态内部类的构造方法,本质上就是给 AQS 的 state 赋值
// CountDownLatch 的构造方法调用了 Sync 的构造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 给 AQS 中的 state 赋值
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
……
}
再来看加锁方法 await(),其调用了 AQS 的 acquireSharedInterruptibly 方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
// 如果线程中断,则抛异常
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// 尝试获取资源失败,则进入自旋
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared 位于静态内部类 Sync
protected int tryAcquireShared(int acquires) {
// 获取 state 值,值为 0 成功返回 1,失败返回 -1
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedInterruptibly 位于 AQS 中
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 构造共享节点并加入队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 前级节点是头节点,则尝试获取资源,失败则自旋
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取成功,则将head节点指向自己
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 判断是否需要挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate 方法翻译为“设置头节点并传播”,其实就是在获取共享锁资源的时候,如果资源在唤醒一个节点后还有剩余,则继续唤醒后续的节点,直到资源被用完,体现了共享模式的共享。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// 如果唤醒下一个节点后资源还有剩余,且新唤醒的节点不为无效状态,就继续唤醒队列中后续节点中的线程
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 在释放锁一起解释
doReleaseShared();
}
}
共享锁释放锁
CountDownLatch 的释放锁直接调用了 AQS 的模板
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 释放锁成功后,唤醒节点
doReleaseShared();
return true;
}
return false;
}
还是比较简单的,通过 CAS 释放锁,释放成功之后如果为0,说明释放成功
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
// 释放锁却发现不需要释放,则返回失败
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 释放成功且 state 为 0,则返回成功,否则失败
return nextc == 0;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果头节点状态为等待唤醒,则将头节点的状态置为无锁状态,设置失败则自旋
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒头节点
unparkSuccessor(h);
}
// 如果头节点的状态已经为无锁状态了,那么将头节点的状态设置为传播唤醒状态
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}