铁锤的IT奇妙探险 Java Back-End Coder

AQS的原理和实现

2020-07-20


具体介绍了AQS的原理,如何实现,以及它在CDL中的应用。

AQS

所谓AQS,提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,ReentrantLockSemaphoreCountDownLatch等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板方法,然后将子类作为同步组件的内部类。

为什么需要AQS

如果没有 AQS,那就需要每个线程协作工具类自己去实现至少以下内容,包括:

  • 状态的原子性管理
  • 线程的阻塞与解除阻塞
  • 队列的管理

把 AQS 和线程协作工具类给“拟人化”,比作是 HR 和面试官。

  • 群面和单面也有很多相同的地方(或者称为流程或环节),而这些相同的地方往往都是由 HR 负责的。比如面试者来了,HR 需要安排候选人签到、就坐等待、排队,然后 HR 要按顺序叫号,从而避免发生多个候选人冲突的情况,这一系列的内容就类似与AQS的工作。
  • 刚才所说的让候选人休息,就是指把线程进行阻塞,不要持续耗费 CPU;而后续叫号让候选人去面试,则意味着去唤醒线程。
  • 群面的流程类似于 CountDownLatch,CountDownLatch 会先设置需要倒数的初始值,假设为 10,每来一个候选人,计数减 1,如果 10 个人都到齐了,就开始面试。同样,单面可以理解为是 Semaphore 信号量,假设有 5 个许可证,每个线程每次获取 1 个许可证,这就类似于有 5 个面试官并行面试,候选人在面试之前需要先获得许可证,面试结束后归还许可证。
  • 对于 CountDownLatch 和 Semaphore 等工具类而言,它要确定自己的“要人”规则,是凑齐 10 个候选人一起面试,像群面一样,还是出 1 进 1,像单面一样。

  • AQS 最核心的三大部分就是状态、队列和期望协作工具类去实现的获取/释放等重要方法。

状态

private volatile int state;

state的含义并不是一成不变的,它会根据具体实现类的作用不同而表示不同的含义。

  • 信号量里面,state 表示的是剩余许可证的数量;
  • ountDownLatch 工具类里面,state 表示的是需要“倒数”的数量,当每次调用 CountDown 方法时,state 就会减 1,一直减到 0 的时候就代表这个门闩被放开;
  • state在ReentrantLock 中表示的是锁的占有情况。0表示没有任何线程占有锁, state 变成 1,则就代表这个锁已经被某一个线程所持有了,当锁被同一个线程多次获取时,state会递增(可重入)

怎么保证state的线程安全

state 是会被多个线程共享的,会被并发地修改,所以所有去修改 state 的方法都必须要保证 state 是线程安全的。

  1. compareAndSetState方法:该方法基于CAS操作保证原子性
  2. setState方法:该方法只是直接对state赋值,但因为state被volatile修饰,所以对基本类型的变量进行直接赋值时,是可以保证线程安全的。

FIFO队列

内部是双向链表的形式,主要作用是存储等待的线程。

获取/释放方法

详情见ReentrantLock详解

用法

利用AQS自定义线程协助工具类的步骤如下:

  1. 新建一个自己的线程协作工具类,在内部写一个 Sync 类,该 Sync 类继承 AbstractQueuedSynchronizer,即 AQS
  2. Sync类里,根据是否是独占,来重写对应的方法。如果是独占,则重写 tryAcquire 和 tryRelease 等方法;如果是非独占,则重写 tryAcquireShared 和 tryReleaseShared 等方法;
  3. 在自己的线程协作工具类中,实现获取/释放的相关方法,并在里面调用 AQS 对应的方法,如果是独占调用 acquire 或 release 等方法,非独占则调用 acquireShared 或 releaseShared 或 acquireSharedInterruptibly 等方法。

AQS定义为类的原因

实现为接口的话,需要重写所有的抽象方法,但实际上只要根据需要实现包括 tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared 等方法的一部分即可。

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

protected int tryAcquireShared(int arg) {
  throw new UnsupportedOperationException();
}

protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

获取/释放的方法如上所示,所以AQS的子类必须重写对应的方法,否则执行时会直接抛出异常。

在CDL的应用

CountDownLatch 里面有一个子类,该类的类名叫 Sync,这个类正是继承自 AQS。

//第一步:新建自己的线程协作工具类
public class CountDownLatch {
	//并在内部写一个继承AQS的类
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }

        //第二步:根据是否独占重写对应方法
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
            //nextc为0时即倒数达到规定次数
        }
    }
    private final Sync sync;
   //省略其他代码...
}

CDL的方法

释放

public void countDown() {
    sync.releaseShared(1);
}

releaseShared()-->tryReleaseShared()为true-->doReleaseShared()
该方法会将阻塞的线程都唤醒

获取

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly()-->tryAcquireShared()-1-->doAcquireSharedInterruptibly()
该方法会让线程进入阻塞状态
  • countDown()和await()方法对应第三步,在自己的线程协作工具类中,实现获取/释放的相关方法,并根据是否独占在里面调用AQS对应的方法

Similar Posts

下一篇 复习大纲

Comments