并发知识梳理:4. 读写锁等其他的类型的锁的支持

ReentrantReadWriteLock 是在重入锁的基础上,添加读写的控制。主要关注的是共享锁的实现和理解上。

首先还是复习一下重入锁的伪代码:


ReentrantLock
public NonFairLock nonfairLock = new NonFairLock();
lock(){
	nonfairLock.lock()
}

// 获取锁的过程
nonfairLock.lock(){
	if(cas(state,0,1)){
		setExcludeThread(Thread.currentThread());
	}else{
		if(!tryAcqure(1) && acquireQueued(addWaiter(Node.excluse),arg))
			selfInterrupt();
	}
}

nonfairLock.tryAcquire(int acquire){
	int c = getState();
	if(c == 0) {
		if(cas(state,0,1){
			setExcludeThread(Thread.currentThread());
			return true;
		}
	}else if(Thread.currentThread() == getExclude){
		int nextc = c+acquire;
		if(nextc < 0) {
			throw new Error(lock too large);
		}
		setstate(nextc);
		return true;
	}
	return false;
}

Node addWaiter(){ // 把Thread放到一个Node节点中,组成等待的链条
	 Node node = new Node(Thread.currentThread(), mode);
	 addTail(node)
	 return node;
}
acquireQueued(Node node) {// 更新等待链条的状态,释放锁的时候,可以调用tryAcquire
	interrupted = false;
	for(;;){
		if(head == node.pre && tryAcquire()){
			 setHead(node); // head = node;
       p.next = null; // help GC
       failed = false;
       return interrupted;
		}
		
		if(shouldParkAfterAcqiureFailed(p,node) && parkAndChectInterrupt()){
			interrupted = true;
		}
	}
}

//释放锁的过程
unlock(){
	nonfairLock.release(1);
}

nonfairLock.release(1){
	if(tryRealse(1)){
		If(head != null && h.state != 0){
			unparkSuccessor(head) // unpark head的下一个节点,head中不包含thread
		}
	}
}

nonfairLock.tryRelease(1){
	int c = getState  - 1;
	if(Thread.concurrent != getExcluseThread) {
		throw Error( not state )
	}
	 boolean free = false;
	if(c == 0) {
		setExcluseThread(null)
		free = true
	}
	
	setState(c);
	return free;
}

读写锁的控制,更加的复杂,我们只是大致的梳理出来重要的逻辑框架,并不扣具体的细节。

class CachedData {
 Object data;
 volatile boolean cacheValid;
 final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

 void processCachedData() {
  rwl.readLock().lock();
  if (!cacheValid) {
   // Must release read lock before acquiring write lock
   rwl.readLock().unlock();
   rwl.writeLock().lock();
   try {
    // Recheck state because another thread might have
    // acquired write lock and changed state before we did.
    if (!cacheValid) {
     data = "use data ";
     cacheValid = true;
    }
    // Downgrade by acquiring read lock before releasing write lock
    rwl.readLock().lock();
   } finally {
    rwl.writeLock().unlock(); // Unlock write, still hold read
   }
  }

  try {
   //       use(data);
  } finally {
   rwl.readLock().unlock();
  }
 }
}

从官方的示例代码中,我们可以知道ReentrantReadWriteLock(以下简写为RRW)有两个锁类型,读锁:ReadLock(RL)和写锁:WriteLock (WL) 从代码中也能初见端倪:

 public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

我们首先看读锁:RL

RL.lock(){
    sync.acquireShared(1);
}

还是原来的熟悉的配方,使用内部类:Sync来实现。Sync继承AQS的框架

AQS.acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

首先就是共享锁定加锁的过程:

Sync.tryAcquireShared(int unused) {

            Thread current = Thread.currentThread();
            int c = getState();
            // 如果被独占锁占据 直接返回-1
            if (exclusiveCount(c) != 0 && // exclusiveCount(c) c的后16位标识独占锁
                getExclusiveOwnerThread() != current)
                return -1;

            int r = sharedCount(c); // sharedCount(c) c的前16位标识的是共享锁

            //readerShouldBlock():第一个排队的线程是否为独占节点
            if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
                // 条件表示为:第一个等待的线程不是独占节点,并且CAS成功
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                //返回1
                return 1;
            }

            return fullTryAcquireShared(current); // 增加了无限的循环
        }

doAcquireShared 类似于重入锁里面的 addWaiter和 更新节点状态

 private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

再来看解锁的过程,这次释放的是共享锁

RL.unlock() {
            sync.releaseShared(1);
        }

同样的Sync的实现AQS的框架:

AQS.releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

熟悉的配方,但是不是熟悉的过程了。共享锁在释放的过程中,不再是线程安全的单个线程的调用,而是多个线程调用了。

Sync.tryReleaseShared(int unused) {
   Thread current = Thread.currentThread();
   // 首先是普通成员变量 firstReader 的更新操作
   if (firstReader == current) {
    // assert firstReaderHoldCount > 0;
    if (firstReaderHoldCount == 1)
     firstReader = null;
    else
     firstReaderHoldCount--;
   } else {
    HoldCounter rh = cachedHoldCounter;
    if (rh == null || rh.tid != getThreadId(current))
     rh = readHolds.get();
    int count = rh.count;
    if (count <= 1) {
     readHolds.remove();
     if (count <= 0)
      throw unmatchedUnlockException();
    }
    --rh.count;
   }
   // 再次就是无限循环的状态更新,最后的结果是 nextc 是否为0
   for (;;) {
    int c = getState();
    int nextc = c - SHARED_UNIT;
    if (compareAndSetState(c, nextc))
     // Releasing the read lock has no effect on readers,
     // but it may allow waiting writers to proceed if
     // both read and write locks are now free.
     return nextc == 0;
   }
  }

doReleaseShared 和重入锁的释放过程,基本类似。

private void doReleaseShared() {
  for (;;) {
   Node h = head;
   if (h != null && h != tail) {
    int ws = h.waitStatus;
    if (ws == Node.SIGNAL) {
     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
      continue; // loop to recheck cases
     unparkSuccessor(h);
    } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
     continue; // loop on failed CAS
   }
   if (h == head) // loop if head changed
    break;
  }
 }

Powered by andiHappy and Theme by AndiHappy