具体介绍了AQS的原理,如何实现,以及它在CDL中的应用。
AQS
所谓AQS,提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,ReentrantLock
、Semaphore
、CountDownLatch
等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板方法,然后将子类作为同步组件的内部类。
为什么需要AQS
如果没有 AQS,那就需要每个线程协作工具类自己去实现至少以下内容,包括:
- 状态的原子性管理
- 线程的阻塞与解除阻塞
- 队列的管理
把 AQS 和线程协作工具类给“拟人化”,比作是 HR 和面试官。
- 群面和单面也有很多相同的地方(或者称为流程或环节),而这些相同的地方往往都是由 HR 负责的。比如面试者来了,HR 需要安排候选人签到、就坐等待、排队,然后 HR 要按顺序叫号,从而避免发生多个候选人冲突的情况,这一系列的内容就类似与AQS的工作。
- 刚才所说的让候选人休息,就是指把线程进行阻塞,不要持续耗费 CPU;而后续叫号让候选人去面试,则意味着去唤醒线程。
- 群面的流程类似于
CountDownLatch
,CountDownLatch 会先设置需要倒数的初始值,假设为 10,每来一个候选人,计数减 1,如果 10 个人都到齐了,就开始面试。同样,单面可以理解为是Semaphore
信号量,假设有 5 个许可证,每个线程每次获取 1 个许可证,这就类似于有 5 个面试官并行面试,候选人在面试之前需要先获得许可证,面试结束后归还许可证。 -
对于 CountDownLatch 和 Semaphore 等工具类而言,它要确定自己的“要人”规则,是凑齐 10 个候选人一起面试,像群面一样,还是出 1 进 1,像单面一样。
- AQS 最核心的三大部分就是状态、队列和期望协作工具类去实现的获取/释放等重要方法。
状态
private volatile int state;
state的含义并不是一成不变的,它会根据具体实现类的作用不同而表示不同的含义。
- 在
信号量
里面,state 表示的是剩余许可证的数量; - 在
ountDownLatch
工具类里面,state 表示的是需要“倒数”的数量,当每次调用CountDown
方法时,state 就会减 1,一直减到 0 的时候就代表这个门闩被放开; - state在
ReentrantLock
中表示的是锁的占有情况。0表示没有任何线程占有锁, state 变成 1,则就代表这个锁已经被某一个线程所持有了,当锁被同一个线程多次获取时,state会递增(可重入)
怎么保证state的线程安全
state 是会被多个线程共享的,会被并发地修改,所以所有去修改 state 的方法都必须要保证 state 是线程安全的。
- compareAndSetState方法:该方法基于CAS操作保证原子性
- setState方法:该方法只是直接对state赋值,但因为state被volatile修饰,所以对基本类型的变量进行直接赋值时,是可以保证线程安全的。
FIFO队列
内部是双向链表的形式,主要作用是存储等待的线程。
获取/释放方法
用法
利用AQS自定义线程协助工具类的步骤如下:
- 新建一个自己的线程协作工具类,在内部写一个 Sync 类,该 Sync 类继承 AbstractQueuedSynchronizer,即
AQS
; - 在
Sync
类里,根据是否是独占,来重写对应的方法。如果是独占
,则重写 tryAcquire 和 tryRelease 等方法;如果是非独占
,则重写 tryAcquireShared 和 tryReleaseShared 等方法; - 在自己的线程协作工具类中,实现
获取/释放
的相关方法,并在里面调用 AQS 对应的方法,如果是独占
调用 acquire 或 release 等方法,非独占
则调用 acquireShared 或 releaseShared 或 acquireSharedInterruptibly 等方法。
AQS定义为类的原因
实现为接口的话,需要重写所有的抽象方法,但实际上只要根据需要实现包括 tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared 等方法的一部分即可。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
获取/释放的方法如上所示,所以AQS的子类必须重写对应的方法,否则执行时会直接抛出异常。
在CDL的应用
在 CountDownLatch
里面有一个子类,该类的类名叫 Sync
,这个类正是继承自 AQS。
//第一步:新建自己的线程协作工具类
public class CountDownLatch {
//并在内部写一个继承AQS的类
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//第二步:根据是否独占重写对应方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
//nextc为0时即倒数达到规定次数
}
}
private final Sync sync;
//省略其他代码...
}
CDL的方法
释放
public void countDown() {
sync.releaseShared(1);
}
releaseShared()-->tryReleaseShared()为true-->doReleaseShared()
该方法会将阻塞的线程都唤醒
获取
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly()-->tryAcquireShared()为-1-->doAcquireSharedInterruptibly()
该方法会让线程进入阻塞状态
- countDown()和await()方法对应第三步,在自己的线程协作工具类中,实现获取/释放的相关方法,并根据是否独占在里面调用AQS对应的方法