并发知识梳理:3. 通过具体的应用展示锁的实现机制
上一节关于AQS的锁的架构实现,说的非常的抽象,看的非常的费劲,这次通过代码的注释和说明,来展示具体是如何实现的。
ReentrantLock
关于可重入锁带着两个问题,去分析的:
- 怎么判断的重入,如何增加锁的次数的。
- 重入次数的判断,释放锁的时候,如果消减锁的次数,释放以后,其他的等待的线程是如何响应的
ReentrantLock lock 方法
public ReentrantLock() {
sync = new NonfairSync();
}
public void lock() {
sync.lock();
}
符合使用框架的时候,都采用内部类NonfairSync来实现实现的需要实现的接口,这点也符合DOC的说明。
NonfairSync 的实现逻辑如下:
static final class NonfairSync extends Sync {
/**
* so beautiful ! 整个操作如此的不见烟火气:
*
* ①首先是CAS(state,1) 确定是否已经有线程抢占到锁,然后设置:ExclusiveThread(这个是普通的成员变量),返回true;
*
* 然后是:Acquire(1),更新state为1,
*
* 具体的逻辑: !tryAcquire(1) => acquireQueued(addWaiter(Node.EXCLUSIVE), 1)) =>selfInterrupt()
*
* 根据getState的状态进行判断:
* 如果是0,说明原来的线程已经释放锁,重新的走①
* 如果大于零,当前的线程(Thread.concurrent)和 ExclusiveOwnerThread 进行比较
* 如果是独占线程为当前线程,则state+1,返回true; 如果不是,则返回false
*
* 只有在返回false的前提下,才会执行:addWaiter(Node.EXCLUSIVE), 1),
*/
final void lock() {
// 只是使用一个CAS的操作,就保证了线程抢占的唯一性
if (compareAndSetState(0, 1))
// private transient Thread exclusiveOwnerThread; 可以说只是一个普通的成员变量
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果是同一个线程,则去更新状态为1,如果不是当前的线程,会被阻塞的
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
ReentrantLock的lock方法,其实默认的调用的就是 NonfairSync的lock方法,也就是说默认的是非公屏锁。这里就产生了一个问题:
为什么是非公平的,和公平的差在什么地方?
然后具体的流程就是CAS后,设置独占线程,Note:这个独占线程为普通的成员变量。
再次就是acquire(1) ,这个方法其实调用的是AQS中的那个经典的方法:
//独占模式下的获取锁
public final void acquire(int arg) {
if (!tryAcquire(arg) && // tryAcquire 为子类必须实现的方法,提供扩展的逻辑
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 获取锁失败以后
selfInterrupt();
}
其中的tryAcquire方法,就是在NonfairSync 继承的 Sync 中实习。
/**
* 非公平获取锁 !!!重要的框架!!!
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
首先没有判断是否是独占线程,而是首先判断的state的状态!根据state的状态,进行二次的CAS操作。
如果首先判断是否是独占线程,是否有问题?
然后再去判断是否是独占线程,如果是,则去更新state的状态。
返回以后,则进入AQS的框架中,如果没有抢到锁,则进入等待的队列,这里需要说一下这个等待的队列的加入函数:addWaiter
// 独占性的节点和共享性的节点,加入等待的链表中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {//相当于 tail=(pre==tail)?node:pre
pred.next = node;
return node;
}
/**
* head(new Node()) <----Thread[Thread-1] <---- Thread[Thread-2] <---- Thread[Thread-3](tail)
* 相当于把node节点插入到队尾。
* Node node = new Node()
* Node pred = tail;
* node.prev = pred;
* tail = node;
* pred.next = node;
* */
}
// 设置失败之后,进入enq,enq为for的无线循环的模式
enq(node);
return node;
}
其中的注释可以比较清楚明白的说明,线程排队的数据结构。其中的enq也是tail为null的时候的,初始化操作。
然后就是更新线程队列的状态(我们现在只讨论比较普通的状况,特殊的等到遇见了在进行说明),以及怎么让线程“停下来”:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//Node 的 prev 成员变量
final Node p = node.predecessor();
// 再次去检查,是否有机会获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
再去更新等待队列线程节点Node的状态的时候,还是首先的去尝试获取锁,当然这个尝试是有条件的,那就是:node.predecessor() == head
这个条件就是说明,node的前一个节点就是头结点,在等待队列初始化的时候,我们已经知道头结点只是一个“象征节点”,是没有对应线程的。所以如果满足条件,那么这个就应该是争抢锁的下一个节点。
如果不满足条件,则会调用:shouldParkAfterFailedAcquire
这个就是线程对应的链表状态的更新:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
/**
* 这句是pre设置为了Node.SIGNAL状态:
* 待唤醒后继状态,当前节点的线程处于此状态,后继节点会被挂起,当前节点释放锁或取消之后必须唤醒它的后继节点。
* */
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这里注意一下:
acquireQueued
是一个无限的for循环结构,所以shouldParkAfterFailedAcquire
可能会被调用很多次。所以一般的会被调用两次,第一次,因为新建的Node节点,waitStatus 就是默认值0,所以如果下载再有阻塞节点的时候,
node.pre == 0
都会先把前驱节点设置为:Node.SIGNAL,也就是调用:
compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
然后下次acquireQueued的for循环里面,再次判断的时候
node.pre 就等于 Node.SIGNAL
然后shouldParkAfterFailedAcquire,返回了true。
轮到了函数:parkAndCheckInterrupt
这个方法很有意思:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
// LockSupport 的park方法
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
这里涉及到一个挺有意思的工具类:LockSupport,我们最基础的和工具类了,这个类的详细我们会面会说,只说一下其支持的两个方法的特点:
java.util.concurrent.locks.LockSupport.park(Object object) 暂停当前的线程,在缺乏凭证的前提下,会一直的:the current thread becomes disabled for thread scheduling
java.util.concurrent.locks.LockSupport.unpark(Thread thread) 给与参数中的Thread中的凭证,也就是立刻的唤醒对应的线程。
另外还有一点,unpark可以在 park之前调用,这样就不必担心漏掉“凭证”。具体的测试代码见:
public static void main(String[] args) throws InterruptedException {
Thread test1 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("park before !");
LockSupport.park("");
System.out.println("park after !");
}
});
test1.start();
LockSupport.unpark(test1);
System.out.println("Call unpark first");
Thread.sleep(1000);
System.out.println("Thread.state: " + test1.getState().name());
}
然后回到parkAndCheckInterrupt
方法,则等待的队列中的线程,算是真正的在等待着了。后面的只有等待别的线程释放锁以后,才能够运行了,我们接着看realease方法。
ReentrantLock unlock 方法
unlock 方法和lock的方法,又有很大的不同,因为lock方法一开始调用的时候,也就是一开始执行CAS代码的时候,是可能多个线程同时执行,但是unlock方法,在一开始的时候,只有拥有锁的方法,才去调用。
public void unlock() {
sync.release(1);
}
//sync.release方法,直接进入AQS的框架
/**
* 释放锁的框架:tryRelease(int arg) 有子类进行实现
* 如果释放成功,则去通知等待的线程
* */
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//如果存在等待的线程:Wait queue中有值,根据我们先前的AddWaiter和shouldParkAfterFailedAcquire 逻辑
// head 应该是new Node(),并且 waitStatus 为 Signal状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
我们还是按照原来的流程,先去关注Sync实现的子类方法tryRelease:
/**
* 释放的时候,肯定是获得锁的线程进行调用,所以不需要CAS之类的操作,只需要判定状态即可
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
首先是状态检查,然后根据state的值,判断锁释放是否成功,可以说这段的代码就是单线程在执行,不用担心线程安全的问题。
然后就是 release方法中的 再次争抢锁的过程了,这里面有一个比较重要的看点
private void unparkSuccessor(Node node) {
// 把当Node节点(head节点),设置为了状态0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//根据状态,获得最近的可以被唤醒的线程节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
这个方法有意思的一点就是,是和shouldParkAfterFailedAcquire 相对应的,shouldParkAfterFailedAcquire 把自己的pre Node 设置为Node.SIGNAL 状态,而unparkSuccessor 把Node 设置为原来的0 状态。
然后调用:
LockSupport.unpark(s.thread)
唤醒队列中最先等待的线程,让它去抢锁。
这不看起来挺公平的吗?为什么叫做:NonfairSync,难道有锁的时候,大家一起抢,才算作是公平吗?
我们在来看一下,公平锁是怎么实现的,公平又是如何体现的:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
两点比较重要:
- 没有单独的实现realease方法,说明在释放锁的逻辑上和非公平锁的逻辑是一样的,都是释放锁成功后,唤醒等待链表中,等待时间最长的那个。
- lock 方法上面有差别,公平锁上来就是acquire(1),然后在tryAcquire方法中,增加了一个!hasQueuedPredecessors 的判断。
其中hasQueuedPredecessors代码如下:
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
tail != head 好理解,意思是有线程在排队 (s = h.next) == null || s.thread != Thread.currentThread() 排队的线程不是当前线程
到了这里公平锁的逻辑就算理清楚了,它是在抢锁的时候,保证公平,如果有等待队列的时候,新进来的线程,首先会进行:!hasQueuedPredecessors,立马回返回false,因为存在等待的线程,并且当前的等待激活的线程不是当前的线程,就会立即的进入等待线程中等待。
但是被唤醒的线程,就会因为当前的等待激活的线程就是当前的线程
,而去执行compareAndSetState(0, acquires)
才有可能抢到锁。