侧边栏壁纸
博主头像
一啖焗捞丁

砖头man👷🏻‍♂️

  • 累计撰写 16 篇文章
  • 累计创建 3 个标签
  • 累计收到 1 条评论
标签搜索

目 录CONTENT

文章目录

CountDownLatch实现原理

一啖焗捞丁
2021-08-28 / 0 评论 / 0 点赞 / 996 阅读 / 3,656 字

CountDownLatch是一个同步器,它可以让一个或多个线程进入等待状态,直到其他线程完成某些操作。

CountDownLatch是什么

什么是CountDownLatch?官方文档是这样阐述的:

/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 *
 * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
 * The {@link #await await} methods block until the current count reaches
 * zero due to invocations of the {@link #countDown} method, after which
 * all waiting threads are released and any subsequent invocations of
 * {@link #await await} return immediately.  This is a one-shot phenomenon
 * -- the count cannot be reset.  If you need a version that resets the
 * count, consider using a {@link CyclicBarrier}.
 *
 * <p>A {@code CountDownLatch} is a versatile synchronization tool
 * and can be used for a number of purposes.  A
 * {@code CountDownLatch} initialized with a count of one serves as a
 * simple on/off latch, or gate: all threads invoking {@link #await await}
 * wait at the gate until it is opened by a thread invoking {@link
 * #countDown}.  A {@code CountDownLatch} initialized to <em>N</em>
 * can be used to make one thread wait until <em>N</em> threads have
 * completed some action, or some action has been completed N times.
 *
 * <p>A useful property of a {@code CountDownLatch} is that it
 * doesn't require that threads calling {@code countDown} wait for
 * the count to reach zero before proceeding, it simply prevents any
 * thread from proceeding past an {@link #await await} until all
 * threads could pass.
 * 
 * ...
 *
 * @since 1.5
 * @author Doug Lea
 */
public class CountDownLatch {
}

这里笔者大概列出以下几点:

  1. CountDownLatch是一个同步器,它可以让一个或多个线程进入等待,直到其他线程完成某些操作。
  2. CountDownLatch构建时会对成员变量count进行初始化,并在每次执行countDown方法时将count-1
    • count!=0时,执行await方法会进入阻塞状态。
    • count==0时,执行await方法会立刻返回(count不能被重置)。
  3. CountDownLatch是一个多功能的同步器工具,可被用于多种用途:
    • count被初始化为1CountDownLatch可以作为一个简单的开关门,所有调用方法await的线程将进入等待,直到有一个线程调用方法countDown来打开门。
    • count被初始化为NCountDownLatch可以被用来使一个线程进入等待状态,直到N个线程完成操作或者操作被完成N次。

即,在CountDownLatch创建时会初始化一个count变量,然后在count!=0的期间调用await方法会让线程进入阻塞状态,直到执行countDown方法使得count==0为止。

此处需要注意:

  • CountDownLatch中会发生阻塞的仅仅是await方法,对countDown方法调用并不会被阻塞。
  • CountDownLatch是一次性消费的同步器工具,计数器count不会被重置并且后续(count==0await方法的调用会立刻返回(不会发生阻塞)。如果需要可重置count版本的同步器可以考虑使用CyclicBarrier

CountDownLatch的使用

对于CountDownLatch的使用,官方也提供了两个例子供我们参考,这里笔者整理了出来:

public class Driver { // ...
    void main() throws InterruptedException {
        // 用于通知线程开始执行,在驱动准备工作做好后才通知线程开始执行
        CountDownLatch startSignal = new CountDownLatch(1);
        // 用于等待所有线程完成,在工作线程全部执行完后才通知驱动结束线程
        CountDownLatch doneSignal = new CountDownLatch(N);
    
        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();
        
        // 完成驱动准备工作
        doSomethingElse();            // don't let run yet
        
        startSignal.countDown();      // let all threads proceed
        
        doSomethingElse();

        // 等待所有工作线程完成后驱动才结束线程
        doneSignal.await();           // wait for all to finish
    }

    public static class Worker implements Runnable {
        private final CountDownLatch startSignal;
        private final CountDownLatch doneSignal;
        Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
            this.startSignal = startSignal;
            this.doneSignal = doneSignal;
        }
        public void run() {
            try {
                // 等待驱动准备工作完成
                startSignal.await();
                doWork();
                // 通知驱动当前线程执行完成
                doneSignal.countDown();
            } catch (InterruptedException ex) {} // return;
        }
        
        void doWork() { ... }
    }
}
public class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch doneSignal = new CountDownLatch(N);
        Executor e = ...
 
        // 把任务分为N个,分别放入线程池执行
        for (int i = 0; i < N; ++i) // create and start threads
            e.execute(new WorkerRunnable(doneSignal, i));

        // 所有N个任务完成后才结束线程
        doneSignal.await();           // wait for all to finish
    }

    public static class WorkerRunnable implements Runnable {
        private final CountDownLatch doneSignal;
        private final int i;
        WorkerRunnable(CountDownLatch doneSignal, int i) {
            this.doneSignal = doneSignal;
            this.i = i;
        }
        public void run() {
            try {
                // 完成任务
                doWork(i);
                // 通知驱动当前线程执行完成
                doneSignal.countDown();
            } catch (InterruptedException ex) {} // return;
        }
        void doWork() { ... }
    }
}

这里的两个例子几乎已经包含了我们在开发中使用到的场景了,比如说,当我们查询多个第三方接口时,可以通过CountDownLatch的机制让所有的接口请求完成后,再将他们返回的结构全部组装起来(在使用上类似于第二个例子)。

CountDownLatch的实现原理

AQS

JavaCountDownLatch是基于AQS实现的,而所谓AQS(即,AbstractQueuedSynchronizer)是JDK提供给我们用于实现同步器的框架。关于AQS的实现原理主要分为两部分:

  1. 通过一个原子变量state来确定同步器的状态(例如:N表示等待,0表示释放)。
  2. 通过一个FIFO队列存储获取资源失败的线程以实现阻塞等待的效果。

即,在AQS中通过原子变量state实现同步器语义,FIFO队列实现阻塞效果。但对于使用者的我们是不需要太过关注其中的实现原理,而只需要对同步器的获取与释放语义进行定义即可,具体方法如下所示:

方法 描述
tryAcquire 表示在排他模式(EXCLUSIVE)下去获取资源,如果返回true表示获取成功,否则表示获取失败。其中,在方法的实现中我们应该判断当前是否能在独占模式获取资源。
tryRelease 表示在排他模式(EXCLUSIVE)下去释放资源,如果返回true表示全部释放成功,否则表示释放失败或者部分释放。
tryAcquireShared 表示在共享模式(SHARED)下去去获取资源,如果返回大于0表示获取成功并且其后继节点也可能成功获取资源;如果返回等于0表示获取成功但其后继节点不能再成功获取资源了;如果返回小于0则表示获取失败。其中,在方法的实现中我们应该判断当前是否能够在共享模式下获取资源。
tryReleaseShared 表示在共享模式(SHARED)下去释放资源,如果返回true表示释放成功,否则表示释放失败。
isHeldExclusively 表示资源是否被独占地持有,如果返回true表示被独占持有,否则表示没有被独占持有。

即,当要实现独占语义时需要实现tryAcquiretryReleaseisHeldExclusively;当要实现共享语义时则需要实现tryAcquireSharedtryReleaseShared

最终,我们就可以在实现类中使用以下方法了:

方法 描述
tryAcquire 表示在排他模式(EXCLUSIVE)下去获取资源,如果返回true表示获取成功,否则表示获取失败。
tryRelease 表示在排他模式(EXCLUSIVE)下去释放资源,如果返回true表示全部释放成功,否则表示释放失败或者部分释放。
tryAcquireShared 表示在共享模式(SHARED)下去去获取资源,如果返回大于0表示获取成功并且其后继节点也可能成功获取资源;如果返回等于0表示获取成功但其后继节点不能再成功获取资源了;如果返回小于0则表示获取失败。
tryReleaseShared 表示在共享模式(SHARED)下去释放资源,如果返回true表示释放成功,否则表示释放失败。
isHeldExclusively 表示资源是否被独占地持有,如果返回true表示被独占持有,否则表示没有被独占持有。
acquire 表示在排他模式(EXCLUSIVE)下去获取资源,如果获取失败会陷入阻塞(进入等待队列)直到获取成功。
acquireInterruptibly 表示在排他模式(EXCLUSIVE)下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常。
tryAcquireNanos 表示在排他模式(EXCLUSIVE)下在规定时间内去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常,其中如果在规定时间内获取成功会返回true,超时则返回false
release 表示在排他模式(EXCLUSIVE)下去释放资源,如果释放成功返回true,否则返回false
acquireShared 表示在共享模式(SHARED)下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功(与排他模式相比,此方法可以让多个线程同时获取到资源)。
acquireSharedInterruptibly 表示在共享模式(SHARED)下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常(与排他模式相比,此方法可以让多个线程同时获取到资源)。
tryAcquireSharedNanos 表示在共享模式(SHARED)下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常,其中如果在规定时间内获取成功会返回true,超时则返回false。(与排他模式相比,此方法可以让多个线程同时获取到资源)。
releaseShared 表示在共享模式(SHARED)下去释放资源,如果释放成功返回true,否则返回false

如果需要更深入学习AQS的话可以阅读笔者之前的文章《什么是AQS》

CountDownLatch

虽然说CountDownLatch是基于AQS实现的,但是在CountDownLatch上并没有直接继承AQS,而是通过其内部类Sync来继承AQS,并将CountDownLatch的方法委托给Sync进行处理。

因为CountDownLatch是基于AQS的共享模式来实现的,所以在Sync类中只需要实现tryAcquireSharedtryReleaseShared方法即可。

/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        // 初始设置state为count
        setState(count);
    }

    protected int tryAcquireShared(int acquires) {
        // 如果state==0恒定返回1,即一直传播下去
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            // 递减1处理,当递减到0会返回true,唤醒后续线程
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

CountDownLatch.Sync类中通过很巧妙的方式实现了同步器,即

  • tryAcquireShared方法(资源获取的语义)中定义只要state不等于0就返回-1表示无资源可用进入阻塞状态,而在state等于0时就返回1表示可以无限获取资源。
  • tryReleaseShared方法(资源释放的语义)中定义只有在state递减至0时才表示释放成功返回true(如果state本身就为0或者递减后不为0则表示失败返回false)。

结合Sync构造方法传入的计数器count,整体可理解为:

  1. 首先在构建时根据传入参数将计数器count初始化
  2. 然后在每次获取资源(执行tryAcquireShared方法)时由于count不为0则获取失败并进入阻塞
  3. 接着不断的释放资源(执行tryReleaseShared方法)将计数器count不断地递减1直到减至0
  4. 最后资源完全释放成功并唤醒阻塞线程。由于此时计数器count恒定等于0,所以唤醒的线程在后续资源获取中都会一直成功(在共享模式下会不断地向后传播唤醒信号)

最终,在CountDownLatch构造方法中将计数器传入Sync进行创建后,再将后续相应的方法调用委托给了Sync

  1. 初始化计数器(构建Sync内部类)

    public class CountDownLatch {
    
        /**
        * Constructs a {@code CountDownLatch} initialized with the given count.
        *
        * @param count the number of times {@link #countDown} must be invoked
        *        before threads can pass through {@link #await}
        * @throws IllegalArgumentException if {@code count} is negative
        */
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            // 传入初始化state的值
            this.sync = new Sync(count);
        }
    }
    
  2. await阻塞操作

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    

    await方法中调用acquireSharedInterruptibly方法使得当前线程进入等待状态,直到阻塞中断或者计数器count递减到0

    对于可超时可中断的方法await(long timeout, TimeUnit unit)这里就不展开讲解了,它与await()方法相比只是多了超时时间(如果超时则返回false),而在实现上则是通过调用tryAcquireSharedNanos(1, unit.toNanos(timeout))方法。

  3. countDown释放操作

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }
    

    countDown方法中调用releaseShared方法对资源进行释放(递减1)。此处需要注意,调用releaseShared方法传入的1其实并没什么用,因为在实现tryReleaseShared方法时已经写死了递减1

总结

至此,笔者对CountDownLatch的介绍、用法和原理都已经阐述完毕了,如果还想更深一步了解CountDownLatch可以再去学习一下AQS

0

评论区