Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(3)

本系列是基于经验设计原型,然后不断优化最终达到和AQS(AbstractQueuedSynchronizer)类似的设计。最终结果不一定和AQS完全一致,基于个人理解会有修改,可以作为理解AQS的不完全参考。

上篇。本篇主要介绍Condition即条件变量的实现,ReentrantLock中最后一块需要实现的内容。

在实现条件变量之前,考虑一下条件变量的一些特性。

  1. 条件变量依赖锁,而且是独占锁
  2. 执行await方法后释放锁,当前线程进入睡眠状态,等待满足条件后被其他线程signal/signalAll唤醒,被唤醒后会尝试重新获取锁
  3. 唤醒可以选择一个也可以全部

注意第一条特性,条件变量依赖的是独占锁,所以类似读锁这种共享锁是不支持条件变量的,ReentrantReadWriteLock中ReadLock#newCondition的实现是直接抛错。

按照第二条特性,可以得到如下的执行过程

 

注意ThreadA中被加粗的unlock和lock,调用条件变量的await方法时,虽然没有显式调用unlock和lock,但是内部其实是做了类似的事情的。

从第三条来看,条件变量应该是对应了一个专门的队列的,那些调用了await方法的线程会进入条件变量的队列中等待被signal。这个专门的队列和锁自身的lock/unlock等待队列不同,语义上唤醒是某个线程的signal/signalAll,而不是lock/unlock方法。如果有点难理解的话,可以考虑在上图中增加一个不调用条件变量任何方法的ThreadC,C的lock/unlock不会唤醒A,因为A等待的是B的唤醒,所以条件变量的队列和锁的lock/unlock队列是分离的,类似下图。

那么条件变量的等待队列需要设计得和锁的等待队列一样么?答案是不用。原因是假如你在await的unlock语义执行之前加入队列的话,因为条件队列依赖的是独占锁,只有一个线程可以操作condition queue。调用signal方法时,锁还没有被释放,同样只有一个线程可以操作condition queue。所以condition queue可以使用普通的同步队列设计。

基于以上初步分析之后得到的条件变量原型

这里为了只关注条件变量省去了lock的所有代码。可以看到await方法中,执行了如下操作

  1. enqueue(condition queue)
  2. unlock
  3. park
  4. lock

对应地,signal执行了如下操作

  1. dequeue(condition queue)
  2. unpark

提问,这里是否会发生signal无法唤醒await线程的情况?

考虑到enqueue和dequeue都是在持有独占锁的前提下操作的,实际可能有两种情况:

  1. 先dequeue
  2. 先enqueue

先dequeue意味着signal先执行(因为此时持有独占锁),以及队列为空,unpark不会唤醒任何线程。signal先执行等价于之后await调用不会发生,因为进入await之前的条件是满足的。执行结果正确。

先enqueue意味signal还无法执行(因为此时await的线程持有锁)。unlock之后,signal线程获取锁,发现await线程的节点,尝试unpark。这里有可能是unpark/park,也有可能是park/unpark执行序列。不管哪种,await线程都会被唤醒,尝试lock,进入lock的等待队列。signal线程释放锁之后,await线程获取锁,执行await之后的代码。执行结果正确。

可以看到,条件变量的分析比独占锁本身要容易很多,一方面这是条件变量本身的特性所致,另一方面合理利用条件变量的特性(特性一)简化了设计。

顺便说一句,设计上多个条件变量的队列互不影响,如果你看到某个锁的条件变量要求只能有一个,或者互相有影响的话,多少说明设计是有问题的。

刚才关于“先enqueue”的分析中,你是否注意到await线程被唤醒之后,由于signal线程还没有释放锁,所以await线程暂时无法获取锁,导致了一次多余的unpark/park。是否可以直接把await线程的节点从conditon queue转移到lock的等待队列?当然这样做的前提是条件变量了解lock的内部实现,由于条件变量依赖锁,所以这不是问题。

其次LockWithCondition1严格上来说在一些边界条件上不满足条件变量的语义。比如说await方法在抛出InterruptedException时必须持有锁(await返回时必须持有锁),现在的实现中lockInterruptibly在抛出InterruptedException时不持有锁。这是条件变量实现中最容易忽视的问题。

还有一些条件变量实现的细节问题,比如await的什么时候可以抛出InterruptedException。这块可以参考Condition接口本身的注释。

以下是修改版的条件变量实现。由于代码比较多,这里只展示了ConditionImpl的实现,其他代码可以参考

https://github.com/xnnyygn/concurrent-learning/tree/master/src/main/java/in/xnnyygn/concurrent/mylock

在分析修改版的await之前,解释一下这里的Node。MyLock使用的Node和AQS一样,可同时用于condition queue和lock queue。这样做只是为了复用Node,你也可以选择和LockWithCondition1一样,分开使用condition queu的节点和lock queue的节点。condition queue中主要使用Node的thread,status和next,lock queue使用除next以外所有字段。为了区分condition queue和lock queue的节点,condition queue的节点一开始的状态是STATUS_CONDITION,相对的,lock queue中节点一开始为STATUS_NORMAL。前篇提到过Node中status的迁移图,不包括STATUS_CONDITION。事实上加入lock queue时节点的status不可能为STATUS_CONDITION。STATUS_CONDITION只是condition queue中的状态,这点请注意。

condition queue中节点的status只有一种迁移

STATUS_CONDITION -> STATUS_NORMAL

可能的情况有

  1. signal
  2. signalAll
  3. await超时
  4. 中断

为什么不区分异常情况?原因是异常状态是也需要加入lock queue,此时status必须是STATUS_NORMAL。你不能把一个STATUS_ABORTED的节点加入lock queue,这样你就没办法获取锁了。重申一遍,条件变量的语义中不管是正常返回,还是await超时,中断,返回时必须持有锁。

由于以上原因,status迁移只有一种。考虑到signal/signalAll时signal线程执行状态迁移,await超时和中断时await线程执行状态迁移,status迁移还必须是CAS。

condition queue的具体实现在类ConditionQueue中,这个类现阶段有3个方法

  1. enqueue
  2. removeNonConditionNodes
  3. dequeue

enqueue很好理解,removeNonConditionNodes主要用于在await超时和中断移除await线程的节点的。注意,只是从condition queue中移除,之后会放到lock queue中去。dequeue就是正常情况从condition queue中移除下一个节点,之后放到lock queue中去。

在上述前提下理解await和signal方法。

首先看最简单的signal方法。从condition queue中取出一个节点,如果没有节点就直接返回。有的话尝试状态迁移,失败的话,说明await线程自己修改了状态(状态迁移的情况3和4),signal会尝试下一个节点。如果状态迁移成功,此时按照前篇的要求,设置前置节点status为STATUS_SIGNAL。如果前置节点放弃了,或者CAS过程中失败了(意味着前置节点放弃了),此时signal线程能做的事情就是唤醒await线程,让await线程自己跳过放弃的前置节点。

signal线程是否可以帮await线程的节点跳过放弃的节点而不是unpark await线程?这样可以帮忙省去一次unpark/park。

回答是请分析哪些是signal线程可以做的,哪些是await线程可以做的。

从MyLock是基于CLHLock实现这点来说,设置前置节点的status是后继节点对应的线程,即await线程该做的事情。所以signal线程这里设置前置节点的status其实不是它的职责,这里个人觉得是为了减少一般情况下的unpark/park。如果碰到前置节点是放弃了的节点还考虑帮忙跳过的话,有点舍本逐末了。

signalAll和signal类似,这里不再赘述。

接下来是相对简单的awaitUninterruptibly。为什么不是await?因为await对于中断的处理有点微妙。awaitUninterruptibly不处理中断,准确来说是,碰到中断,只会重新设置中断状态。这样关注点就可以集中到中断处理以外的部分。

awaitUninterruptibly的基本流程和LockWithCondition1有所不同。考虑到park会响应中断,需要把park放在一个循环中,记录中断状态。跳过循环的唯一条件是节点已经被加入lock queue。

判断是否已加入lock queue的最直接的方法(nodeNotLockEnqueued最后)是从tail检查节点是否存在。其次是检查进入lock queue之后才会被使用的字段,比如successor。假如successor被设置,那么节点肯定被加入了lock queue。还有一种提前判断的方法,节点自身的状态。如果节点status是STATUS_CONDITION或者进入lock queue后才会被使用的predecessor字段没有被设置的话,可以断言节点还没有加入lock queue。

awaitUninterruptibly中跳出循环之后,执行acquireUninterruptibly。在AQS中这个方法被叫做acquireQueued。从行为上这个acquireXXX方法在节点被加入lock queue后执行。其次,会记录acquire过程中的中断,但是不会因为中断而停止尝试获取锁。await系列方法其实需要的就是这种获取模式,不会因为中断而放弃获取的方式。

不管是acquireUninterruptibly还是循环过程中被中断,awaitUninterruptibly最终只会重新设置中断标志,除此以外什么都不做。

在理解了acquireUninterruptibly之后,看一下await方法。

相比于await方法,循环有两个出口。一个是由signal线程放到lock queue的条件,另外一个是中断。中断时,await线程会自己尝试把节点加入lock queue。从两个lockEnqueueByXXXThread方法可以看出,存在两个线程同时把节点放入lock queue的可能性。假如signal线程先放进lock queue,那么await在CAS这一步(lockEnqueueByAwaitingThread第一行)会失败,这时必须等待signal线程enqueue操作的完成(通过yield)。假如await线程先放进lock queue,那么signal线程会失败,现有逻辑中signal线程会尝试唤醒下一个节点。

注意这里的await和AQS的await方法有些许不同。AWS的await在await线程先放入lock queue时最后会抛出InterruptedException,但是在signal线程先放入的话,会设置中断状态。这里的await,在await线程先放入时同样会抛出异常,但是在signal线程先放入的话,不会设置中断状态。这里其实有一个边界问题,在中断时恰好被signal时算不算中断?个人觉得可以不算,要算也是以留下证据为目的。

跳出循环之后,同样调用acquireUninterruptibly,按情况设置中断标志。最后在确认是中断了的情况下,从condition queue中移除自己的节点,并抛出异常。注意这里,移除节点时持有独占锁,其次,中断不代表节点仍旧在condition queue里面。一种情况是await线程比同时执行的signal先CAS成功。

考虑到这里的await和AWS的await相比还是有点差异的,如果你对于改造不太放心的话,在理解的前面内容的基础上,相信你能修改成AQS的语义。

最后是几个限时await方法,这里全部引导到同一个限时方法awaitNanos上。主体流程和await很像,有一点不同是剩余时间的处理。个人理解awaitNanos的返回值代表是是否超时,所以不能简单把timeout减去执行时间返回去调用方。具体代码这里不再分析。

小结

总体来说,条件变量比起独占锁本身要简单。因为可以借助条件变量的特性使用同步队列。另一方面,条件变量实现中也有比较细节的问题需要考虑。

至此,ReentrantLock的实现算是真正完成了。接下来是ReentrantReadWriteLock的实现。