JUC并发编程第七章 JUC工具类

1. AQS

AQS(AbstractQueuedSynchronizer)是 Java 并发编程中的一个重要的框架,通常用于多线程编程和并发控制。它提供了一种管理共享资源、同步线程执行以及协调多个线程之间操作的机制。AQS是Java中实现锁、信号量、倒计时门栅等同步工具的基础框架,它在Java并发包中发挥着重要的作用。

AQS的核心思想是定义了一个队列(通常是FIFO队列),用于存放等待获取某个资源或执行某个操作的线程。AQS通过内部状态的维护来管理这个队列,并提供了一组抽象方法,子类可以实现这些方法来定制化不同的同步工具。

AQS的主要方法和概念包括:

  1. acquire():用于获取资源执行操作的方法。如果资源不可用,线程将会被阻塞并加入到等待队列中,直到资源可用。

  2. release():用于释放资源的方法。当一个线程释放资源时,AQS会唤醒等待队列中的一个或多个线程,让它们有机会获取资源。

  3. 状态管理:AQS内部通过状态变量来表示资源的可用性。子类可以通过修改状态来实现不同的同步语义。

    • 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
    • getState - 获取 state 状态
    • setState - 设置 state 状态
    • compareAndSetState - cas 机制设置 state 状态

    AQS的核心思想是使用一个状态变量来表示资源的状态,不同的同步器可以根据这个状态来实现不同的同步策略。在AQS中,状态变量通常是一个整数,可以正数、负数、零等不同值,具体含义取决于具体的同步器。

    AQS中的状态变量是一个重要的概念,它的含义和用途可以根据具体的同步器来理解。以下是一些常见的AQS中的状态变量的含义和用途:

    1. 排他锁(Exclusive Lock):在独占锁的情况下,状态变量通常表示锁的拥有情况。0表示锁未被任何线程持有,1表示锁已经被某个线程持有
    2. 共享锁(Shared Lock):在共享锁的情况下,状态变量通常表示共享资源的数量。正数表示有多个线程同时访问资源,0表示没有线程访问资源。
    3. 信号量(Semaphore):在信号量的情况下,状态变量表示可用的许可数量。 线程可以通过获取许可来执行某个操作,当状态变量为0时,线程需要等待,直到有可用的许可。
    4. CountDownLatch:CountDownLatch是一种倒计数器,状态变量表示需要等待的计数值。当计数值变为0时,等待的线程可以继续执行。
    5. CyclicBarrier:CyclicBarrier是一种循环屏障,状态变量表示等待的线程数量。当等待的线程数量达到状态变量指定的值时,所有线程可以继续执行,并且状态变量会重置为初始值。
    6. Phaser:Phaser也是一种多线程同步工具,状态变量表示当前阶段的编号。线程可以在不同的阶段等待,当所有线程都到达同一阶段时,状态变量会递增。
  4. 等待队列:AQS内部维护了一个等待队列,用于存放等待获取资源或执行操作的线程。这个队列通常是一个双向链表,按照FIFO顺序管理线程。

  5. 共享模式和独占模式:AQS支持两种模式,共享模式独占模式。共享模式允许多个线程同时获取资源,而独占模式只允许一个线程获取资源。

常见的AQS子类包括:

  1. ReentrantLock:可重入锁的实现,支持独占模式
  2. Semaphore:信号量的实现,支持共享模式
  3. CountDownLatch:倒计时门栅的实现,支持共享模式
  4. CyclicBarrier:循环屏障的实现,支持共享模式

AQS提供了一个强大的基础,使得开发者可以构建各种复杂的同步工具和并发控制机制。通过合理使用AQS,可以确保多线程程序的正确性和性能。

请注意,AQS是Java并发包的一部分,因此上述内容是针对Java编程语言的。其他编程语言和平台可能具有类似的概念和机制,但具体实现和用法可能会有所不同。

子类主要实现这样一些方法(默认抛出 UnsupportedOperationException):

  1. tryAcquire

    • tryAcquire(int arg) 是用于尝试获取独占资源的方法。
    • 当线程想要获取资源时,它会调用 tryAcquire 方法。
    • 如果资源可以被当前线程获取,tryAcquire 方法返回 true,表示成功获取资源;否则返回 false,表示获取失败。
    • 在子类中,您可以实现自己的 tryAcquire 方法,定义资源获取的逻辑,例如检查状态变量或者条件,然后返回相应的结果。
  2. tryRelease

    • tryRelease(int arg) 是用于释放独占资源的方法。
    • 当线程释放资源时,它会调用 tryRelease 方法。
    • tryRelease 方法通常执行一些释放资源的操作,然后可能唤醒等待队列中的线程,使其有机会获取资源。
    • 在子类中,您可以实现自己的 tryRelease 方法,定义资源释放的逻辑,如状态变量的更新等。
  3. tryAcquireShared

    • tryAcquireShared(int arg) 是用于尝试获取共享资源的方法。
    • 在共享模式下,多个线程可以同时获取资源,但数量可能受限。
    • 当线程想要获取共享资源时,它会调用 tryAcquireShared 方法。
    • 如果资源可以被当前线程获取,tryAcquireShared 方法返回一个非负整数,表示获取的资源数量;如果获取失败,返回一个负数或零
    • 在子类中,您可以实现自己的 tryAcquireShared 方法,定义共享资源获取的逻辑。
  4. tryReleaseShared

    • tryReleaseShared(int arg) 是用于释放共享资源的方法。
    • 在共享模式下,线程释放资源时,它会调用 tryReleaseShared 方法。
    • tryReleaseShared 方法通常执行一些释放资源的操作,然后唤醒等待队列中的线程,使其有机会获取资源。
    • 在子类中,您可以实现自己的 tryReleaseShared 方法,定义共享资源释放的逻辑。
  5. isHeldExclusively

    • isHeldExclusively() 方法用于检查当前线程是否持有独占资源
    • 这个方法通常在子类中实现,用于验证当前线程是否已经成功获取了资源。
    • 如果当前线程持有独占资源,isHeldExclusively 应该返回 true,否则返回 false

获取锁的姿势:

// 如果获取锁失败
if (!tryAcquire(arg)) {
	// 入队, 可以选择阻塞当前线程 park unpark
}

释放锁的姿势:

// 如果释放锁成功
if (tryRelease(arg)) {
	// 让阻塞线程恢复运行
}

1.1 实现不可重入锁

注意,这里的代码很重要!!!

  1. 首先自定义同步器:

    final class MySync extends AbstractQueuedSynchronizer{
        /**
         * 尝试加锁(独占)
         *
         * @param arg
         * @return 是否加锁成功
         */
        @Override
        protected boolean tryAcquire(int arg) {
            if (arg == 1){
                if (compareAndSetState(0, 1)) {
                    //设置线程持有者为当前线程
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
            }
            return false;
        }
    
        /**
         * 尝试释放锁(独占)
         *
         * @param acquires
         * @return 是否释放锁成功
         */
        @Override
        protected boolean tryRelease(int acquires) {
            if(acquires == 1) {
                if(getState() == 0) {
                    throw new IllegalMonitorStateException();
                }
                //释放锁的时候,只需要改变state变量的值就可以
                setExclusiveOwnerThread(null);
                //状态改为0表示解锁,这里语句的顺序不能颠倒,因为state是volatile修饰的
                //要保证之前所作的操作对其他线程可见
                setState(0);
                return true;
            }
            return false;
        }
    
        /**
         * 创建条件变量
         *
         * @return
         */
        protected Condition newCondition() {
            //ConditionObject是AbstractQueuedSynchronizer所拥有的
            return new ConditionObject();
        }
    
        /**
         * 检查当前线程是否持有独占资源
         *
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1; //是否被某个线程持有锁
        }
    }
    
  2. 自定义锁

    有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁:

    class MyLock implements Lock {
        static MySync sync = new MySync();
        @Override
        // 尝试,不成功,进入等待队列
        public void lock() {
            sync.acquire(1); //调用sync父类方法
        }
        @Override
        // 尝试,不成功,进入等待队列,可打断
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);//调用sync父类方法
        }
        @Override
        // 尝试一次,不成功返回,不进入队列
        public boolean tryLock() {
            return sync.tryAcquire(1);//调用sync子类方法
        }
        @Override
        // 尝试,不成功,进入等待队列,有时限
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(time));//调用sync父类方法
        }
        @Override
        // 释放锁
        public void unlock() {
            sync.release(1);
        }
        @Override
        // 生成条件变量
        public Condition newCondition() {
            return sync.newCondition();
        }
    }
    
  3. 测试一下

    public static void main(String[] args) {
        MyLock lock = new MyLock();
        new Thread(() -> {
            lock.lock();
            try {
                log.debug("locking...");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.debug("unlocking...");
                lock.unlock();
            }
        },"t1").start();
        new Thread(() -> {
            lock.lock();
            try {
                log.debug("locking...");
            } finally {
                log.debug("unlocking...");
                lock.unlock();
            }
        },"t2").start();
    }
    

    执行:

    09:16:31.796 [t1] DEBUG com.example.javatest.AQS - locking...
    09:16:32.805 [t1] DEBUG com.example.javatest.AQS - unlocking...
    09:16:32.805 [t2] DEBUG com.example.javatest.AQS - locking...
    09:16:32.805 [t2] DEBUG com.example.javatest.AQS - unlocking...
    

    不可重入测试
    如果改为下面代码,会发现自己也会被挡住(只会打印一次 locking)

    lock.lock();
    log.debug("locking...");
    lock.lock();
    log.debug("locking...");
    

2. ReentrantLock 原理

可见,ReentrantLock,里面也有同步器(Sync),而Sync也是继承了AQS,并且Sync下面还有子类:NonfairSync(非公平锁)和FairSync(公平锁)

2.1 非公平锁实现原理

2.2.1 加锁解锁流程

先从构造器开始看,默认为非公平锁实现:

构造器源码——默认实现非公平锁

ReentrantLock的加锁方法

跟进去,可见是一个自定义同步器

我们查看他的非公平锁实现

可见和我们前面自定义的差不同

NonfairSync 继承自 AQS

没有竞争时:

无竞争时,lock能够肯定能够成功

当第一个线程过来时(Thread-0),exclusiveOwnerThread指向Thread-0,然后state改为了1,表示该资源已被加锁。

第一个竞争出现时:

现在Thread-1进来了

当Thread-1过来时,尝试将state从0改为1,但是已经有线程对他加锁了(state=1)所以就会CAS失败

源码

Thread-1 执行了

  1. CAS 尝试将 state 由 0 改为 1,结果失败
  2. 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
  3. 接下来进入 addWaiter 逻辑,构造 Node 队列(上图已标注)
  • 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
  • Node 的创建是懒惰的
  • 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位(其实就是虚拟头节点的意思),并不关联线程

当前线程进入 acquireQueued 逻辑

  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false