ReadWriteLock简单介绍
ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁。读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。相对于互斥锁而言,ReadWriteLoc允许更高的并发量。
所有读写锁的实现必须确保写操作对读操作的内存影响。换句话说,一个获得了读锁的线程必须能看到前一个释放的写锁所更新的内容。
ReadWriteLock接口
public interface ReadWriteLock { Lock readLock();//获取读锁 Lock writeLock();//获取写锁}
ReentrantReadWriteLock实现类
ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性。
- 锁的获取获取模式
- 非公平模式(默认):读锁和写锁的获取的顺序是不确定的。非公平锁主张竞争获取,比公平锁有更高的吞吐量。
- 公平模式:线程将会以队列的顺序获取锁。当当前线程释放锁后,等待时间最长的写/读锁线程就会被分配写/读锁;当有写线程持有写锁或者有等待的写线程时,一个尝试获取公平的读锁(非重入)的线程就会阻塞。这个线程直到等待时间最长的写锁获得锁后并释放掉锁后才能获取到读锁。
- 可重入:允许读锁可写锁可重入。写锁可以获得读锁,读锁不能获得写锁。
- 锁降级:允许写锁降低为读锁。
- 中断锁的获取:在读锁和写锁的获取过程中支持中断。
- 支持Condition:写锁提供Condition实现。
- 监控:提供确定锁是否被持有等辅助方法
锁降低的简单就示例
class ReadWriteLockTest { String data;//缓存中的对象 volatile boolean cacheValid;//缓存是否还有效 final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() throws InterruptedException { rwl.readLock().lock(); if (!cacheValid) {//缓存失效需要再次读取缓存 //在读取缓存前,必须释放读锁 rwl.readLock().unlock(); rwl.writeLock().lock(); try { //再次检查是否需要再次读取缓存,如果需要则 if (!cacheValid) { data = Thread.currentThread().getName()+":缓存数据测试,实际开发可能是一个对象!"; cacheValid = true; } //在释放之前,通过获取读锁降级写锁 rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); //释放写锁,持有读锁 } } try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName()+"【"+data+"】"); } finally { rwl.readLock().unlock(); } } public static void main(String[] args) throws InterruptedException { ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest(); for(int i=0;i<10;i++){ Thread thread = new Thread(()->{ try { readWriteLockTest.processCachedData(); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); } }}
源码分析
构造方法
public ReentrantReadWriteLock() { this(false);//默认为非公平模式}public ReentrantReadWriteLock(boolean fair) { //决定了Sync是FairSync还是NonfairSync。Sync继承了AbstractQueuedSynchronizer,而Sync是一个抽象类,NonfairSync和FairSync继承了Sync,并重写了其中的抽象方法。 sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this);}
获取锁
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
Sync分析
FairSync
static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); }}
writerShouldBlock和readerShouldBlock方法都表示当有别的线程也在尝试获取锁时,是否应该阻塞。
对于公平模式,hasQueuedPredecessors()方法表示前面是否有等待线程。一旦前面有等待线程,那么为了遵循公平,当前线程也就应该被挂起。
NonfairSync
static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; // 写线程无需阻塞 } final boolean readerShouldBlock() { //apparentlyFirstQueuedIsExclusive在当前线程是写锁占用的线程时,返回true;否则返回false。也就说明,如果当前有一个写线程正在写,那么该读线程应该阻塞。 return apparentlyFirstQueuedIsExclusive(); } }
ReentrantReadWriteLock中的state
继承AQS的类都需要使用state变量代表某种资源,ReentrantReadWriteLock中的state代表了读锁的数量和写锁的持有与否,整个结构如下: 可以看到state的高16位代表读锁的个数;低16位代表写锁的状态。
获取锁
读锁的获取
public void lock() { sync.acquireShared(1);}
读锁使用的是AQS的共享模式,AQS的acquireShared方法如下:
if (tryAcquireShared(arg) < 0) doAcquireShared(arg);
当tryAcquireShared()方法小于0时,那么会执行doAcquireShared方法将该线程加入到等待队列中。
Sync实现了tryAcquireShared方法,如下:
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //如果当前有写线程并且本线程不是写线程,不符合重入,失败. //在获取读锁时,如果有写线程,则获取失败 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //得到读锁的个数 int r = sharedCount(c); //如果读不应该阻塞并且读锁的个数小于最大值65535,并且可以成功更新状态值,成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) {//如果当前读锁为0 firstReader = current;//第一个读线程就是当前线程 firstReaderHoldCount = 1;//第一个线程持有读锁的个数 } //如果当前线程重入了,记录firstReaderHoldCount else if (firstReader == current) { firstReaderHoldCount++; } //当前读线程和第一个读线程不同,记录每一个线程读的次数 else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //否则,循环尝试 return fullTryAcquireShared(current); }
从上面的代码以及注释可以看到,分为三步:
- 如果当前有写线程并且本线程不是写线程,那么失败,返回-1
- 否则,说明当前没有写线程或者本线程就是写线程(可重入),接下来判断是否应该读线程阻塞并且读锁的个数是否小于最小值,并且CAS成功使读锁+1,成功,返回1。其余的操作主要是用于计数的
- 如果2中失败了,失败的原因有三,第一是应该读线程应该阻塞;第二是因为读锁达到了上线;第三是因为CAS失败,有其他线程在并发更新state,那么会调动fullTryAcquireShared方法。
fullTryAcquiredShared方法
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); //一旦有别的线程获得了写锁,并且获得写锁的线程不是本线程,返回-1,失败 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } //如果读线程需要阻塞 else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } //说明有别的读线程占有了锁 else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } //如果读锁达到了最大值,抛出异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //如果成功更改状态,成功返回 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
写锁的获取
public void lock() { sync.acquire(1);}
AQS的acquire方法如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
从上面可以看到,写锁使用的是AQS的独占模式。首先尝试获取锁,如果获取失败,那么将会把该线程加入到等待队列中。
Sync实现了tryAcquire方法用于尝试获取一把锁,如下:
protected final boolean tryAcquire(int acquires) { //得到调用lock方法的当前线程 Thread current = Thread.currentThread(); int c = getState(); //得到写锁的个数 int w = exclusiveCount(c); //如果当前有写锁或者读锁.(对于读锁而言,如果当前写线程可以进行写操作,那么读线程读到的数据可能有误) if (c != 0) { // 如果写锁为0或者当前线程不是独占线程(不符合重入),返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //如果写锁的个数超过了最大值(65535),抛出异常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 写锁重入,返回true setState(c + acquires); return true; } //如果当前没有写锁或者读锁,如果写线程应该阻塞或者CAS失败,返回false if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //否则将当前线程置为获得写锁的线程,返回true setExclusiveOwnerThread(current); return true; }
释放锁
读锁的释放
ReadLock的unlock方法如下:
public void unlock() { sync.releaseShared(1); }
调用了Sync的releaseShared方法,该方法在AQS中提供,如下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}
调用tryReleaseShared方法尝试释放锁,如果释放成功,调用doReleaseShared尝试唤醒下一个节点。
AQS的子类需要实现tryReleaseShared方法,Sync中的实现如下:
protected final boolean tryReleaseShared(int unused) { //得到调用unlock的线程 Thread current = Thread.currentThread(); //如果是第一个获得读锁的线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } //否则,是HoldCounter中计数-1 else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //死循环 for (;;) { int c = getState(); //释放一把读锁 int nextc = c - SHARED_UNIT; //如果CAS更新状态成功,返回读锁是否等于0;失败的话,则重试 if (compareAndSetState(c, nextc)) //释放读锁对读线程没有影响,但是可能会使等待的写线程解除挂起开始运行。所以,一旦没有锁了,就返回true,否则false;返回true后,那么则需要释放等待队列中的线程,这时读线程和写线程都有可能再获得锁。 return nextc == 0; }}
写锁的释放
WriteLock的unlock方法如下:
public void unlock() { sync.release(1);}
Sync的release方法使用的AQS中的,如下:
public final boolean release(int arg) { if (tryRelease(arg)) {//尝试释放锁 Node h = head; if (h != null && h.waitStatus != 0)//如果等待队列中有线程再等待 unparkSuccessor(h);//将下一个线程解除挂起。 return true; } return false;}
Sync需要实现tryRelease方法,如下:
protected final boolean tryRelease(int releases) { //如果没有线程持有写锁,但是仍要释放,抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; //如果没有写锁了,那么将AQS的线程置为null if (free) setExclusiveOwnerThread(null); //更新状态 setState(nextc); return free;//此处返回当且仅当free为0时返回,如果当前是写锁被占有了,只有当写锁的数据降为0时才认为释放成功;否则失败。因为只要有写锁,那么除了占有写锁的那个线程,其他线程即不可以获得读锁,也不能获得写锁}
getOwner()
getOwner方法用于返回当前获得写锁的线程,如果没有线程占有写锁,那么返回null。实现如下:
protected Thread getOwner() { return sync.getOwner();}
可以看到直接调用了Sync的getOwner方法,下面是Sync的getOwner方法:
final Thread getOwner() { // Must read state before owner to ensure memory consistency //如果独占锁的个数为0,说明没有线程占有写锁,那么返回null;否则返回占有写锁的线程。 return ((exclusiveCount(getState()) == 0) ?null :getExclusiveOwnerThread());}
getReadLockCount()
getReadLockCount()方法用于返回读锁的个数,实现如下:
public int getReadLockCount() { return sync.getReadLockCount();}
Sync的实现如下:
final int getReadLockCount() { return sharedCount(getState());}//要想得到读锁的个数,就是看AQS的state的高16位。这和前面讲过的一样,高16位表示读锁的个数,低16位表示写锁的个数。static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
getReadHoldCount()
getReadHoldCount()方法用于返回当前线程所持有的读锁的个数,如果当前线程没有持有读锁,则返回0。直接看Sync的实现即可:
final int getReadHoldCount() { //如果没有读锁,自然每个线程都是返回0 if (getReadLockCount() == 0) return 0; //得到当前线程 Thread current = Thread.currentThread(); //如果当前线程是第一个读线程,返回firstReaderHoldCount参数 if (firstReader == current) return firstReaderHoldCount; //如果当前线程不是第一个读线程,得到HoldCounter,返回其中的count HoldCounter rh = cachedHoldCounter; //如果缓存的HoldCounter不为null并且是当前线程的HoldCounter,直接返回count if (rh != null && rh.tid == getThreadId(current)) return rh.count; //如果缓存的HoldCounter不是当前线程的HoldCounter,那么从ThreadLocal中得到本线程的HoldCounter,返回计数 int count = readHolds.get().count; //如果本线程持有的读锁为0,从ThreadLocal中移除 if (count == 0) readHolds.remove(); return count;}
从上面的代码中,可以看到两个熟悉的变量,firstReader和HoldCounter类型。这两个变量在读锁的获取中接触过,前面没有细说,这里细说一下。HoldCounter类的实现如下:
static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread());}
readHolds是ThreadLocalHoldCounter类,定义如下:
static final class ThreadLocalHoldCounter extends ThreadLocal{ public HoldCounter initialValue() { return new HoldCounter(); } }
可以看到,readHolds存储了每一个线程的HoldCounter,而HoldCounter中的count变量就是用来记录线程获得的写锁的个数。所以可以得出结论:Sync维持总的读锁的个数,在state的高16位;由于读线程可以同时存在,所以每个线程还保存了获得的读锁的个数,这个是通过HoldCounter来保存的。 除此之外,对于第一个读线程有特殊的处理,Sync中有如下两个变量:
private transient Thread firstReader = null;//第一个得到读锁的线程private transient int firstReaderHoldCount;//第一个线程获得的写锁
其余获取到读锁的线程的信息保存在HoldCounter中。
看完了HoldCounter和firstReader,再来看一下getReadLockCount的实现,主要有三步:
- 当前没有读锁,那么自然每一个线程获得的读锁都是0;
- 如果当前线程是第一个获取到读锁的线程,那么返回firstReadHoldCount;
- 如果当前线程不是第一个获取到读锁的线程,得到该线程的HoldCounter,然后返回其count字段。如果count字段为0,说明该线程没有占有读锁,那么从readHolds中移除。获取HoldCounter分为两步,第一步是与cachedHoldCounter比较,如果不是,则从readHolds中获取。
getWriteLockCount()
getWriteLockCount()方法返回写锁的个数,Sync的实现如下:
final int getWriteHoldCount() { return isHeldExclusively() ? exclusiveCount(getState()) : 0;}
可以看到如果没有线程持有写锁,那么返回0;否则返回AQS的state的低16位。
总结
当分析ReentranctReadWriteLock时,或者说分析内部使用AQS实现的工具类时,需要明白的就是AQS的state代表的是什么。ReentrantLockReadWriteLock中的state同时表示写锁和读锁的个数。为了实现这种功能,state的高16位表示读锁的个数,低16位表示写锁的个数。AQS有两种模式:共享模式和独占模式,读写锁的实现中,读锁使用共享模式;写锁使用独占模式;另外一点需要记住的即使,当有读锁时,写锁就不能获得;而当有写锁时,除了获得写锁的这个线程可以获得读锁外,其他线程不能获得读锁。