前提 并发编程大师Doug Lea 在编写JUC
(java.util.concurrent
)包的时候引入了java.util.concurrent.locks.AbstractQueuedSynchronizer
,其实是Abstract Queued Synchronizer
,也就是”基于队列实现的抽象同步器”,一般我们称之为AQS
。其实Doug Lea
大神编写AQS
是有严谨的理论基础的,他的个人博客上有一篇论文《The java.util.concurrent Synchronizer Framewor》 ,可以在互联网找到相应的译文《JUC同步器框架》,如果想要深入研究AQS
必须要理解一下该论文的内容,然后结合论文内容详细分析一下AQS
的源码实现。本文在阅读AQS
源码的时候选用的JDK
版本是JDK11
。
出于写作习惯,下文会把AbstractQueuedSynchronizer称为AQS、JUC同步器框或者同步器框架。
AQS的主要功能 AQS
是JUC
包中用于构建锁或者其他同步组件(信号量、事件等)的基础框架类。AQS
从它的实现上看主要提供了下面的功能:
同步状态的原子性管理。
线程的阻塞和解除阻塞。
提供阻塞线程的存储队列。
基于这三大功能,衍生出下面的附加功能:
通过中断实现的任务取消,此功能基于线程中断实现。
可选的超时设置,也就是调用者可以选择放弃等待任务执行完毕直接返回。
定义了Condition接口
,用于支持管程形式的await/signal/signalAll
操作,代替了Object
类基于JNI
提供的wait/notify/notifyAll
。
AQS
还根据同步状态的不同管理方式区分为两种不同的实现:独占状态的同步器 和共享状态的同步器 。
同步器框架基本原理 《The java.util.concurrent Synchronizer Framework》 一文中其实有提及到同步器框架的伪代码:
while (synchronization state does not allow acquire) { enqueue current thread if not already queued; possibly block current thread; } dequeue current thread if it was queued; update synchronization state; if (state may permit a blocked thread to acquire){ unblock one or more queued threads; }
撇脚翻译一下:
while (同步状态申请获取失败){ if (当前线程未进入等待队列){ 当前线程放入等待队列; } 尝试阻塞当前线程; } 当前线程移出等待队列 更新同步状态 if (同步状态足够允许一个阻塞的线程申请获取){ 解除一个或者多个等待队列中的线程的阻塞状态; }
为了实现上述操作,需要下面三个基本环节的相互协作:
同步状态的原子性管理。
等待队列的管理。
线程的阻塞与解除阻塞。
其实基本原理很简单,但是为了应对复杂的并发场景和并发场景下程序执行的正确性,同步器框架在上面的acquire
操作和release
操作中使用了大量的死循环和CAS
等操作 ,再加上Doug Lea
喜欢使用单行复杂的条件判断代码,如一个if
条件语句会包含大量操作 ,AQS
很多时候会让人感觉实现逻辑过于复杂。
同步状态管理 AQS
内部内部定义了一个32
位整型的state
变量用于保存同步状态:
private volatile int state;protected final int getState () { return state; } protected final void setState (int newState) { state = newState; } protected final boolean compareAndSetState (int expect, int update) { return STATE.compareAndSet(this , expect, update); }
同步状态state
在不同的实现中可以有不同的作用或者表示意义,这里其实不能单纯把它理解为中文意义上的”状态”,它可以代表资源数、锁状态等等,下文遇到具体的场景我们再分析它表示的意义。
CLH队列与变体 CLH
锁即Craig, Landin, and Hagersten (CLH) locks
,因为它底层是基于队列实现,一般也称为CLH
队列锁。CLH
锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。从实现上看,CLH
锁是一种自旋锁,能确保无饥饿性,提供先来先服务的公平性。先看简单的CLH
锁的一个简单实现:
public class CLHLock implements Lock { AtomicReference<QueueNode> tail = new AtomicReference<>(new QueueNode()); ThreadLocal<QueueNode> pred; ThreadLocal<QueueNode> current; public CLHLock () { current = ThreadLocal.withInitial(QueueNode::new ); pred = ThreadLocal.withInitial(() -> null ); } @Override public void lock () { QueueNode node = current.get(); node.locked = true ; QueueNode pred = tail.getAndSet(node); this .pred.set(pred); while (pred.locked) { } } @Override public void unlock () { QueueNode node = current.get(); node.locked = false ; current.set(this .pred.get()); } static class QueueNode { boolean locked; } }
上面是一个简单的CLH
队列锁的实现,内部类QueueNode
只使用了一个简单的布尔值locked
属性记录了每个线程的状态,如果该属性为true
,则相应的线程要么已经获取到锁,要么正在等待锁,如果该属性为false
,则相应的线程已经释放了锁。新来的想要获取锁的线程必须对tail
属性调用getAndSet()
方法,使得自身成为队列的尾部,同时得到一个指向前驱节点的引用pred
,最后线程所在节点在其前驱节点的locked
属性上自旋,直到前驱节点释放锁。上面的实现是无法运行的,因为一旦自旋就会进入死循环导致CPU
飙升,可以尝试使用下文将要提到的LockSupport
进行改造。
CLH
队列锁本质是使用队列(实际上是单向链表)存放等待获取锁的线程,等待的线程总是在其所在节点的前驱节点的状态上自旋,直到前驱节点释放资源。从实际来看,过度自旋带来的CPU性能损耗比较大,并不是理想的线程等待队列的实现 。
基于原始的CLH
队列锁中提供的等待队列的基本原理,**AQS
实现一种了CLH锁队列的变体(Variant)**。AQS
类的protected
修饰的构造函数里面有一大段注释用于说明AQS
实现的等待队列的细节事项,这里列举几点重要的:
AQS
实现的等待队列没有直接使用CLH
锁队列,但是参考了其设计思路,等待节点会保存前驱节点中线程的信息,内部也会维护一个控制线程阻塞的状态值。
每个节点都设计为一个持有单独的等待线程并且”带有具体的通知方式”的监视器,这里所谓通知方式就是自定义唤醒阻塞线程的方式而已。
一个线程是等待队列中的第一个等待节点的持有线程会尝试获取锁,但是并不意味着它一定能够获取锁成功(这里的意思是存在公平和非公平的实现),获取失败就要重新等待。
等待队列中的节点通过prev
属性连接前驱节点,通过next
属性连接后继节点,简单来说,就是双向链表的设计 。
CLH
队列本应该需要一个虚拟的头节点,但是在AQS
中没有直接提供虚拟的头节点,而是延迟到第一次竞争出现的时候懒创建虚拟的头节点(其实也会创建尾节点,初始化时头尾节点是同一个节点)。
Condition
(条件)等待队列中的阻塞线程使用的是相同的Node
结构,但是提供了另一个链表用来存放,Condition
等待队列的实现比非Condition
等待队列复杂。
线程阻塞与唤醒 线程的阻塞和唤醒在JDK1.5
之前,一般只能依赖于Object
类提供的wait()
、notify()
和notifyAll()
方法,它们都是JNI
方法,由JVM
提供实现,并且它们必须运行在获取监视器锁的代码块内(synchronized
代码块中),这个局限性先不谈性能上的问题,代码的简洁性和灵活性是比较低的。JDK1.5
引入了LockSupport
类,底层是基于Unsafe
类的park()
和unpark()
方法,提供了线程阻塞和唤醒的功能,它的机制有点像只有一个允许使用资源的信号量java.util.concurrent.Semaphore
,也就是一个线程只能通过park()
方法阻塞一次,只能调用unpark()
方法解除调用阻塞一次,线程就会唤醒(多次调用unpark()
方法也只会唤醒一次),可以想象是内部维护了一个0-1的计数器。
LockSupport
类如果使用得好,可以提供更灵活的编码方式,这里举个简单的使用例子:
public class LockSupportMain implements Runnable { private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS" ); private Thread thread; private void setThread (Thread thread) { this .thread = thread; } public static void main (String[] args) throws Exception { LockSupportMain main = new LockSupportMain(); Thread thread = new Thread(main, "LockSupportMain" ); main.setThread(thread); thread.start(); Thread.sleep(2000 ); main.unpark(); Thread.sleep(2000 ); } @Override public void run () { System.out.println(String.format("%s-步入run方法,线程名称:%s" , FORMATTER.format(LocalDateTime.now()), Thread.currentThread().getName())); LockSupport.park(); System.out.println(String.format("%s-解除阻塞,线程继续执行,线程名称:%s" , FORMATTER.format(LocalDateTime.now()), Thread.currentThread().getName())); } private void unpark () { LockSupport.unpark(thread); } } 2019 -02 -25 00 :39 :57.780 -步入run方法,线程名称:LockSupportMain2019 -02 -25 00 :39 :59.767 -解除阻塞,线程继续执行,线程名称:LockSupportMain
LockSupport
类park()
方法也有带超时的变体版本方法,遇到带超时期限阻塞等待场景下不妨可以使用LockSupport#parkNanos()
。
独占线程的保存 AbstractOwnableSynchronizer
是AQS
的父类,一个同步器框架有可能在一个时刻被某一个线程独占,AbstractOwnableSynchronizer
就是为所有的同步器实现和锁相关实现提供了基础的保存、获取和设置独占线程的功能,这个类的源码很简单:
public abstract class AbstractOwnableSynchronizer implements java .io .Serializable { private static final long serialVersionUID = 3737899427754241961L ; protected AbstractOwnableSynchronizer () { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread (Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread () { return exclusiveOwnerThread; } }
它就提供了一个保存独占线程的变量对应的Setter
和Getter
方法,方法都是final
修饰的,子类只能使用不能覆盖。
CLH队列变体的实现 这里先重点分析一下AQS
中等待队列的节点AQS
的静态内部类Node
的源码:
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; } Node() {} Node(Node nextWaiter) { this .nextWaiter = nextWaiter; THREAD.set(this , Thread.currentThread()); } Node(int waitStatus) { WAITSTATUS.set(this , waitStatus); THREAD.set(this , Thread.currentThread()); } final boolean compareAndSetWaitStatus (int expect, int update) { return WAITSTATUS.compareAndSet(this , expect, update); } final boolean compareAndSetNext (Node expect, Node update) { return NEXT.compareAndSet(this , expect, update); } final void setPrevRelaxed (Node p) { PREV.set(this , p); } private static final VarHandle NEXT; private static final VarHandle PREV; private static final VarHandle THREAD; private static final VarHandle WAITSTATUS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); NEXT = l.findVarHandle(Node.class, "next" , Node.class); PREV = l.findVarHandle(Node.class, "prev" , Node.class); THREAD = l.findVarHandle(Node.class, "thread" , Thread.class); WAITSTATUS = l.findVarHandle(Node.class, "waitStatus" , int .class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } }
其中,变量句柄(VarHandle
)是JDK9
引入的新特性,其实底层依赖的还是Unsafe
的方法,笔者认为可以简单理解它为Unsafe
的门面类,而定义的方法基本都是面向变量属性的操作。这里需要关注一下Node
里面的几个属性:
waitStatus
:当前Node
实例的等待状态,可选值有5个。
初始值整数0:当前节点如果不指定初始化状态值,默认值就是0,侧面说明节点正在等待队列中处于等待状态。
Node#CANCELLED
整数值1:表示当前节点实例因为超时或者线程中断而被取消,等待中的节点永远不会处于此状态,被取消的节点中的线程实例不会阻塞。
Node#SIGNAL
整数值-1:表示当前节点的后继节点是(或即将是)阻塞的(通过LockSupport#park()
),当它释放或取消时,当前节点必须LockSupport#unpark()
它的后继节点。
Node#CONDITION
整数值-2:表示当前节点是条件队列中的一个节点,当它转换为同步队列中的节点的时候,状态会被重新设置为0。
Node#PROPAGATE
整数值-3:此状态值通常只设置到调用了doReleaseShared()
方法的头节点,确保releaseShared()
方法的调用可以传播到其他的所有节点,简单理解就是共享模式下节点释放的传递标记。
prev
、next
:当前Node
实例的前驱节点引用和后继节点引用。
thread
:当前Node
实例持有的线程实例引用。
nextWaiter
:这个值是一个比较容易令人生疑的值,虽然表面上它称为”下一个等待的节点”,但是实际上它有三种取值的情况。
值为静态实例Node.EXCLUSIVE
(也就是null),代表当前的Node
实例是独占模式。
值为静态实例Node.SHARED
,代表当前的Node
实例是共享模式。
值为非Node.EXCLUSIVE
和Node.SHARED
的其他节点实例,代表Condition等待队列中当前节点的下一个等待节点 。
Node
类的等待状态waitStatus
理解起来是十分费劲的,下面分析AQS
其他源码段的时候会标识此状态变化的时机 。
其实上面的Node
类可以直接拷贝出来当成一个新建的类,然后尝试构建一个双向链表自行调试,这样子就能深刻它的数据结构。例如:
public class AqsNode { static final AqsNode SHARED = new AqsNode(); static final AqsNode EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile AqsNode prev; volatile AqsNode next; volatile Thread thread; AqsNode nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final AqsNode predecessor () { AqsNode p = prev; if (p == null ) throw new NullPointerException(); else return p; } AqsNode() { } AqsNode(AqsNode nextWaiter) { this .nextWaiter = nextWaiter; THREAD.set(this , Thread.currentThread()); } AqsNode(int waitStatus) { WAITSTATUS.set(this , waitStatus); THREAD.set(this , Thread.currentThread()); } final boolean compareAndSetWaitStatus (int expect, int update) { return WAITSTATUS.compareAndSet(this , expect, update); } final boolean compareAndSetNext (AqsNode expect, AqsNode update) { return NEXT.compareAndSet(this , expect, update); } final void setPrevRelaxed (AqsNode p) { PREV.set(this , p); } private static final VarHandle NEXT; private static final VarHandle PREV; private static final VarHandle THREAD; private static final VarHandle WAITSTATUS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); NEXT = l.findVarHandle(AqsNode.class, "next" , AqsNode.class); PREV = l.findVarHandle(AqsNode.class, "prev" , AqsNode.class); THREAD = l.findVarHandle(AqsNode.class, "thread" , Thread.class); WAITSTATUS = l.findVarHandle(AqsNode.class, "waitStatus" , int .class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } public static void main (String[] args) throws Exception { AqsNode head = new AqsNode(); AqsNode next = new AqsNode(AqsNode.EXCLUSIVE); head.next = next; next.prev = head; AqsNode tail = new AqsNode(AqsNode.EXCLUSIVE); next.next = tail; tail.prev = next; List<Thread> threads = new ArrayList<>(); for (AqsNode node = head; node != null ; node = node.next) { threads.add(node.thread); } System.out.println(threads); } } [null , Thread[main,5 ,main], Thread[main,5 ,main]]
实际上,AQS
中一共存在两种等待队列,其中一种是普通的同步等待队列,这里命名为Sync Queue
,另一种是基于Sync Queue
实现的条件等待队列,这里命名为Condition Queue
。
理解同步等待队列 前面已经介绍完AQS
的同步等待队列节点类,下面重点分析一下同步等待队列的相关源码,下文的Sync队列、Sync Queue、同步队列和同步等待队列是同一个东西 。首先,我们通过分析Node
节点得知Sync
队列一定是双向链表,AQS
中有两个瞬时成员变量用来存放头节点和尾节点:
private transient volatile Node head;private transient volatile Node tail;private static final VarHandle STATE;private static final VarHandle HEAD;private static final VarHandle TAIL;static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state" , int .class); HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head" , Node.class); TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail" , Node.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } Class<?> ensureLoaded = LockSupport.class; } private final void initializeSyncQueue () { Node h; if (HEAD.compareAndSet(this , null , (h = new Node()))) tail = h; } private final boolean compareAndSetTail (Node expect, Node update) { return TAIL.compareAndSet(this , expect, update); } private void setHead (Node node) { head = node; node.thread = null ; node.prev = null ; }
当前线程加入同步等待队列和同步等待队列的初始化是同一个方法,前文提到过:同步等待队列的初始化会延迟到第一次可能出现竞争的情况,这是为了避免无谓的资源浪费,具体方法是addWaiter(Node mode)
:
private Node addWaiter (Node mode) { Node node = new Node(mode); for (;;) { Node oldTail = tail; if (oldTail != null ) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); } } }
在首次调用addWaiter()
方法,死循环至少执行两轮再跳出,因为同步队列必须初始化完成后(第一轮循环),然后再把当前线程所在的新节点实例添加到等待队列中再返(第二轮循环)当前的节点,这里需要注意的是新加入同步等待队列的节点一定是添加到队列的尾部并且会更新AQS
中的tail属性为最新入队的节点实例 。
假设我们使用Node.EXCLUSIVE
模式把新增的等待线程加入队列,例如有三个线程分别是thread-1
、thread-2
和thread-3
,线程入队的时候都处于阻塞状态,模拟一下依次调用上面的入队方法的同步队列的整个链表的状态。
先是线程thread-1
加入等待队列:
接着是线程thread-2
加入等待队列:
最后是线程thread-3
加入等待队列:
如果仔细研究会发现,如果所有的入队线程都处于阻塞状态的话,新入队的线程总是添加到队列的tail
节点,阻塞的线程总是”争抢”着成为head
节点,这一点和CLH
队列锁的阻塞线程总是基于前驱节点自旋以获取锁的思路是一致的 。下面将会分析的独占模式与共享模式,线程加入等待队列都是通过addWaiter()
方法 。
理解条件等待队列 前面已经相对详细地介绍过同步等待队列,在AQS
中还存在另外一种相对特殊和复杂的等待队列-条件等待队列 。介绍条件等待队列之前,要先介绍java.util.concurrent.locks.Condition
接口。
public interface Condition { void await () throws InterruptedException ; void awaitUninterruptibly () ; long awaitNanos (long nanosTimeout) throws InterruptedException ; boolean await (long time, TimeUnit unit) throws InterruptedException ; boolean awaitUntil (Date deadline) throws InterruptedException ; void signal () ; void signalAll () ; }
Condition
可以理解为Object
中的wait()
、notify()
和notifyAll()
的替代品,因为Object
中的相应方法是JNI
(Native
)方法,由JVM
实现,对使用者而言并不是十分友好(有可能伴随JVM
版本变更而受到影响),而Condition
是基于数据结构和相应算法实现对应的功能,我们可以从源码上分析其实现。
Condition
的实现类是AQS
的公有内部类ConditionObject
。ConditionObject
提供的入队列方法如下:
public class ConditionObject implements Condition , java .io .Serializable { private static final long serialVersionUID = 1173984872572414699L ; - 条件队列的第一个节点 private transient Node firstWaiter; - 条件队列的最后一个节点 private transient Node lastWaiter; public ConditionObject () { } private Node addConditionWaiter () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } private void unlinkCancelledWaiters () { Node t = firstWaiter; Node trail = null ; while (t != null ) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null ; if (trail == null ) firstWaiter = next; else trail.nextWaiter = next; if (next == null ) lastWaiter = trail; } else trail = t; t = next; } } protected boolean isHeldExclusively () { throw new UnsupportedOperationException(); } }
实际上,Condition
的所有await()
方法变体都调用addConditionWaiter()
添加阻塞线程到条件队列中。我们按照分析同步等待队列的情况,分析一下条件等待队列。正常情况下,假设有2个线程thread-1
和thread-2
进入条件等待队列,都处于阻塞状态。
先是thread-1
进入条件队列:
然后是thread-2
进入条件队列:
条件等待队列看起来也并不复杂,但是它并不是单独存在和使用的,一般依赖于同步等待队列,下面的一节分析Condition
的实现的时候再详细分析。
独占模式与共享模式 前文提及到,同步器涉及到独占模型和共享模式。下面就针对这两种模式详细分析一下AQS
的具体实现源码。
独占模式 AQS
同步器如果使用独占(EXCLUSIVE
)模式,那么意味着同一个时刻,只有唯一的一个节点所在线程获取(acuqire
)原子状态status
成功,此时该线程可以从阻塞状态解除继续运行,而同步等待队列中的其他节点持有的线程依然处于阻塞状态。独占模式同步器的功能主要由下面的四个方法提供:
acquire(int arg)
:申请获取arg
个原子状态status
(申请成功可以简单理解为status = status - arg
)。
acquireInterruptibly(int arg)
:申请获取arg
个原子状态status
,响应线程中断。
tryAcquireNanos(int arg, long nanosTimeout)
:申请获取arg
个原子状态status
,带超时的版本。
release(int arg)
:释放arg
个原子状态status
(释放成功可以简单理解为status = status + arg
)。
独占模式下,AQS
同步器实例初始化时候传入的status
值,可以简单理解为”允许申请的资源数量的上限值”,下面的acquire
类型的方法暂时称为”获取资源”,而release
方法暂时称为”释放资源”。接着我们分析前面提到的四个方法的源码,先看acquire(int arg)
:
public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); } static void selfInterrupt () { Thread.currentThread().interrupt(); } final boolean acquireQueued (final Node node, int arg) { boolean interrupted = false ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
上面的代码虽然看起来能基本理解,但是最好用图推敲一下”空间上的变化”:
接着分析一下release(int arg)
的实现:
public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected boolean tryRelease (int arg) { throw new UnsupportedOperationException(); } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) node.compareAndSetWaitStatus(ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node p = tail; p != node && p != null ; p = p.prev) if (p.waitStatus <= 0 ) s = p; } if (s != null ) LockSupport.unpark(s.thread); }
接着用上面的图:
上面图中thread-2
晋升为头节点的第一个后继节点,等待下一个release()
释放资源唤醒之就能晋升为头节点,一旦晋升为头节点也就是意味着可以解除阻塞继续运行。接着我们可以看acquire()
的响应中断版本和带超时的版本。先看acquireInterruptibly(int arg)
:
public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
doAcquireInterruptibly(int arg)
方法和acquire(int arg)
类似,最大的不同点在于阻塞线程解除阻塞后并不是正常继续运行,而是直接抛出InterruptedException
异常。最后看tryAcquireNanos(int arg, long nanosTimeout)
的实现:
public final boolean tryAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } private boolean doAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L ) return false ; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; return true ; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L ) { cancelAcquire(node); return false ; } if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this , nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
tryAcquireNanos(int arg, long nanosTimeout)
其实和doAcquireInterruptibly(int arg)
类似,它们都响应线程中断,不过tryAcquireNanos()
在获取资源的每一轮循环尝试都会计算剩余可用的超时时间,只有同时满足获取失败需要阻塞并且剩余超时时间大于SPIN_FOR_TIMEOUT_THRESHOLD(1000纳秒)
的情况下才会进行阻塞。
独占模式的同步器的一个显著特点就是:头节点的第一个有效(非取消)的后继节点,总是尝试获取资源,一旦获取资源成功就会解除阻塞并且晋升为头节点,原来所在节点会移除出同步等待队列,原来的队列长度就会减少1,然后头结点的第一个有效的后继节点继续开始竞争资源。
使用独占模式同步器的主要类库有:
可重入锁ReentrantLock
。
读写锁ReentrantReadWriteLock
中的写锁WriteLock
。
共享模式 共享(SHARED
)模式中的”共享”的含义是:同一个时刻,如果有一个节点所在线程获取(acuqire
)原子状态status
成功,那么它会解除阻塞被唤醒,并且会把唤醒状态传播 到所有有效的后继节点(换言之就是唤醒整个同步等待队列中的所有有效的节点)。共享模式同步器的功能主要由下面的四个方法提供:
acquireShared(int arg)
:申请获取arg
个原子状态status
(申请成功可以简单理解为status = status - arg
)。
acquireSharedInterruptibly(int arg)
:申请获取arg
个原子状态status
,响应线程中断。
tryAcquireSharedNanos(int arg, long nanosTimeout)
:申请获取arg
个原子状态status
,带超时的版本。
releaseShared(int arg)
:释放arg
个原子状态status
(释放成功可以简单理解为status = status + arg
)。
先看acquireShared(int arg)
的源码:
public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } protected int tryAcquireShared (int arg) { throw new UnsupportedOperationException(); } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean interrupted = false ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; return ; } } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); throw t; } finally { if (interrupted) selfInterrupt(); } } private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !h.compareAndSetWaitStatus(0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
其实代码的实现和独占模式有很多类似的地方,一个很大的不同点是:共享模式同步器当节点获取资源成功晋升为头节点之后,它会把自身的等待状态通过CAS
更新为Node.PROPAGATE
,下一个加入等待队列的新节点会把头节点的等待状态值更新回Node.SIGNAL
,标记后继节点处于可以被唤醒的状态,如果遇上资源释放,那么这个阻塞的节点就能被唤醒从而解除阻塞。我们还是画图理解一下,先假设tryAcquireShared(int arg)
总是返回小于0的值,入队两个阻塞的线程thread-1
和thread-2
,然后进行资源释放确保tryAcquireShared(int arg)
总是返回大于0的值:
看起来和独占模式下的同步等待队列差不多,实际上真正不同的地方在于有节点解除阻塞和晋升为头节点的过程。因此我们可以先看releaseShared(int arg)
的源码:
public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected boolean tryReleaseShared (int arg) { throw new UnsupportedOperationException(); }
releaseShared(int arg)
就是在tryReleaseShared(int arg)
调用返回true
的情况下主动调用一次doReleaseShared()
从而基于头节点传播唤醒状态和unpark
头节点的后继节点。接着之前的图:
接着看acquireSharedInterruptibly(int arg)
的源码实现:
public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
最后看tryAcquireSharedNanos(int arg, long nanosTimeout)
的源码实现:
public final boolean tryAcquireSharedNanos (int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } private boolean doAcquireSharedNanos (int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L ) return false ; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; return true ; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L ) { cancelAcquire(node); return false ; } if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this , nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
共享模式的同步器的一个显著特点就是:头节点的第一个有效(非取消)的后继节点,总是尝试获取资源,一旦获取资源成功就会解除阻塞并且晋升为头节点,原来所在节点会移除出同步等待队列,原来的队列长度就会减少1,重新设置头节点的过程会传播唤醒的状态,简单来说就是唤醒一个有效的后继节点,只要一个节点可以晋升为头节点,它的后继节点就能被唤醒,以此类推。节点的唤醒顺序遵循类似于FIFO的原则,通俗说就是先阻塞或者阻塞时间最长则先被唤醒 。
使用共享模式同步器的主要类库有:
信号量Semaphore
。
倒数栅栏CountDownLatch
。
Condition的实现 Condition
实例的建立是在Lock
接口的newCondition()
方法,它是锁条件等待的实现,基于作用或者语义可以见Condition
接口的相关API注释:
Condition是对象监视器锁方法Object#wait()、Object#notify()和Object#notifyAll()的替代实现,对象监视器锁实现锁的时候作用的效果是每个锁对象必须使用多个wait-set(JVM内置的等待队列),通过Object提供的方法和监视器锁结合使用就能达到Lock的实现效果。如果替换synchronized方法和语句并且结合使用Lock和Condition,就能替换并且达到对象监视器锁的效果。
Condition
必须固有地绑定在一个Lock
的实现类上,也就是要通过Lock
的实例建立Condition
实例,而且Condition
的方法调用使用必须在Lock
的”锁定代码块”中,这一点和synchronized
关键字以及Object
的相关JNI方法使用的情况十分相似。
前文介绍过Condition
接口提供的方法以及Condition
队列,也就是条件等待队列,通过画图简单介绍了它的队列节点组成。实际上,条件等待队列需要结合同步等待队列使用,这也刚好对应于前面提到的Condition
的方法调用使用必须在Lock
的锁定代码块中 。听起来很懵逼,我们慢慢分析一下ConditionObject
的方法源码就能知道具体的原因。
先看ConditionObject#await()
方法:
private static final int REINTERRUPT = 1 ;private static final int THROW_IE = -1 ;public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } final int fullyRelease (Node node) { try { int savedState = getState(); if (release(savedState)) return savedState; throw new IllegalMonitorStateException(); } catch (Throwable t) { node.waitStatus = Node.CANCELLED; throw t; } } final boolean isOnSyncQueue (Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null ) return false ; if (node.next != null ) return true ; return findNodeFromTail(node); } private boolean findNodeFromTail (Node node) { for (Node p = tail;;) { if (p == node) return true ; if (p == null ) return false ; p = p.prev; } } private int checkInterruptWhileWaiting (Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0 ; } final boolean transferAfterCancelledWait (Node node) { if (node.compareAndSetWaitStatus(Node.CONDITION, 0 )) { enq(node); return true ; } while (!isOnSyncQueue(node)) Thread.yield(); return false ; } private void reportInterruptAfterWait (int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
其实上面的await()
逻辑并不复杂,前提是理解了对象监视器锁那套等待和唤醒的机制(由JVM
实现,C
语言学得好的可以去看下源码),这里只是通过算法和数据结构重新进行了一次实现。await()
主要使用了两个队列:同步等待队列和条件等待队列。我们先假设有两个线程thread-1
和thread-2
调用了下面的代码中的process()
方法:
ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void process () { try { lock.lock(); condition.await(); }finally { lock.unlock(); } }
ReentrantLock
使用的是AQS
独占模式的实现,因此在调用lock()
方法的时候,同步等待队列的一个瞬时快照(假设线程thread-1
先加入同步等待队列)可能如下:
接着,线程thread-1
所在节点是头节点的后继节点,获取锁成功,它解除阻塞后可以调用await()
方法,这个时候会释放同步等待队列中的所有等待节点,也就是线程thread-2
所在的节点也被释放,因此线程thread-2
也会调用await()
方法:
只要有线程能够到达await()
方法,那么原来的同步器中的同步等待队列就会释放所有阻塞节点,表现为释放锁,然后这些释放掉的节点会加入到条件等待队列中,条件等待队列中的节点也是阻塞的 ,这个时候只有通过signal()
或者signalAll()
进行队列元素转移 才有机会唤醒阻塞的线程。因此接着看signal()
和signalAll()
的源码实现:
public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignal(first); } private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); } final boolean transferForSignal (Node node) { if (!node.compareAndSetWaitStatus(Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; } public final void signalAll () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignalAll(first); } private void doSignalAll (Node first) { lastWaiter = firstWaiter = null ; do { Node next = first.nextWaiter; first.nextWaiter = null ; transferForSignal(first); first = next; } while (first != null ); }
其实signal()
或者signalAll()
会对取消的节点或者短暂中间状态的节点进行解除阻塞,但是正常情况下,它们的操作结果是把阻塞等待时间最长的一个或者所有节点重新加入到AQS
的同步等待队列中。例如,上面的例子调用signal()
方法后如下:
这样子,相当于线程thread-1
重新加入到AQS
同步等待队列中(从条件等待队列中移动到同步等待队列中),并且开始竞争头节点,一旦竞争成功,就能够解除阻塞。这个时候从逻辑上看,signal()
方法最终解除了对线程thread-1
的阻塞。await()
的其他变体方法的原理是类似的,这里因为篇幅原因不再展开。这里小结一下Condition
的显著特点:
1、同时依赖两个同步等待队列,一个是AQS
提供,另一个是ConditionObject
提供的。
2、await()
方法会释放AQS
同步等待队列中的阻塞节点,这些节点会加入到条件等待队列中进行阻塞。
3、signal()
或者signalAll()
会把条件等待队列中的节点重新加入AQS
同步等待队列中,并不解除正常节点的阻塞状态。
4、接第3步,这些进入到AQS
同步等待队列的节点会重新竞争成为头节点,接下来的步骤其实也就是前面分析过的独占模式下的AQS
的运作原理。
取消获取资源(cancelAcquire) 新节点加入等待队列失败导致任何类型的异常或者带超时版本的API调用的时候剩余超时时间小于等于零的时候,就会调用cancelAcquire()
方法,用于取消该节点对应节点获取资源的操作。
private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { pred.compareAndSetNext(predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) pred.compareAndSetNext(predNext, next); } else { unparkSuccessor(node); } node.next = node; } }
cancelAcquire()
方法有多处调用,主要包括下面的情况:
1、节点线程在阻塞过程中主动中断的情况下会调用。
2、acquire
的处理过程发生任何异常的情况下都会调用,包括tryAcquire()
、tryAcquireShared()
等。
3、新节点加入等待队列失败导致任何类型的异常或者带超时版本的API调用的时候剩余超时时间小于等于零的时候。
cancelAcquire()
主要作用是把取消的节点移出同步等待队列,必须时候需要进行后继节点的唤醒。
实战篇 AQS
是一个抽象的同步器基础框架,其实我们也可以直接使用它实现一些高级的并发框架。下面基于AQS
实现一些非内建的功能,这两个例子来自于AQS
的注释中。
metux 大学C
语言课程中经常提及到的只有一个资源的metux
(互斥区),也就是说,同一个时刻,只能有一个线程获取到资源,其他获取资源的线程需要阻塞等待到前一个线程释放资源。
public class Metux implements Lock , Serializable { private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { assert 1 == arg; if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { assert 1 == arg; if (!isHeldExclusively()) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null ); setState(0 ); return true ; } public Condition newCondition () { return new ConditionObject(); } public boolean isLocked () { return getState() != 0 ; } @Override public boolean isHeldExclusively () { return getExclusiveOwnerThread() == Thread.currentThread(); } private void readObject (ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0 ); } } private final Sync sync = new Sync(); @Override public void lock () { sync.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return sync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(time)); } public boolean isLocked () { return sync.isLocked(); } public boolean isHeldByCurrentThread () { return sync.isHeldExclusively(); } @Override public void unlock () { sync.release(1 ); } @Override public Condition newCondition () { return sync.newCondition(); } public static void main (String[] args) throws Exception { final Metux metux = new Metux(); new Thread(() -> { metux.lock(); System.out.println(String.format("%s-thread-1获取锁成功休眠3秒..." , LocalDateTime.now())); try { Thread.sleep(3000 ); } catch (InterruptedException e) { } metux.unlock(); System.out.println(String.format("%s-thread-1获解锁成功..." , LocalDateTime.now())); return ; }, "thread-1" ).start(); new Thread(() -> { metux.lock(); System.out.println(String.format("%s-thread-2获取锁成功..." ,LocalDateTime.now())); return ; }, "thread-2" ).start(); Thread.sleep(Integer.MAX_VALUE); } }
某个时间的某次运行结果如下:
2019 -04 -07T11:49 :27.858791200 -thread-1 获取锁成功休眠3 秒...2019 -04 -07T11:49 :30.876567 -thread-2 获取锁成功...2019 -04 -07T11:49 :30.876567 -thread-1 获解锁成功...
二元栅栏 二元栅栏是CountDownLatch
的简化版,只允许一个线程阻塞,由另一个线程负责唤醒。
public class BooleanLatch { private static class Sync extends AbstractQueuedSynchronizer { boolean isSignalled () { return getState() != 0 ; } @Override protected int tryAcquireShared (int ignore) { return isSignalled() ? 1 : -1 ; } @Override protected boolean tryReleaseShared (int ignore) { setState(1 ); return true ; } } private final Sync sync = new Sync(); public boolean isSignalled () { return sync.isSignalled(); } public void signal () { sync.releaseShared(1 ); } public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public static void main (String[] args) throws Exception { BooleanLatch latch = new BooleanLatch(); new Thread(() -> { try { Thread.sleep(3000 ); } catch (InterruptedException e) { } latch.signal(); }).start(); System.out.println(String.format("[%s]-主线程进入阻塞..." , LocalDateTime.now())); latch.await(); System.out.println(String.format("[%s]-主线程进被唤醒..." , LocalDateTime.now())); } }
某个时间的某次运行结果如下:
[2019 -04 -07T11:55 :12.647816200 ]-主线程进入阻塞... [2019 -04 -07T11:55 :15.632088 ]-主线程进被唤醒...
小结 在JUC
的重要并发类库或者容器中,AQS
起到了基础框架的作用,理解同步器的实现原理,有助于理解和分析其他并发相关类库的实现。这篇文章前后耗费了接近1个月时间编写,DEBUG
过程最好使用多线程断点,否则很难模拟真实的情况。AQS
里面的逻辑是相对复杂的,很敬佩并发大师Doug Lea
如此精巧的类库设计,此所谓巨人的肩膀。
参考资料:
(本文完 c-30-d e-a-20190407 r-a-20200723 ProcessOn重新修订所有插图,强迫症发作修正病句和错字)
公众号回复序列号:aqs