CountDownLatch
是一个同步器,它可以让一个或多个线程进入等待状态,直到其他线程完成某些操作。
CountDownLatch
是什么
什么是CountDownLatch
?官方文档是这样阐述的:
/**
* A synchronization aid that allows one or more threads to wait until
* a set of operations being performed in other threads completes.
*
* <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
* The {@link #await await} methods block until the current count reaches
* zero due to invocations of the {@link #countDown} method, after which
* all waiting threads are released and any subsequent invocations of
* {@link #await await} return immediately. This is a one-shot phenomenon
* -- the count cannot be reset. If you need a version that resets the
* count, consider using a {@link CyclicBarrier}.
*
* <p>A {@code CountDownLatch} is a versatile synchronization tool
* and can be used for a number of purposes. A
* {@code CountDownLatch} initialized with a count of one serves as a
* simple on/off latch, or gate: all threads invoking {@link #await await}
* wait at the gate until it is opened by a thread invoking {@link
* #countDown}. A {@code CountDownLatch} initialized to <em>N</em>
* can be used to make one thread wait until <em>N</em> threads have
* completed some action, or some action has been completed N times.
*
* <p>A useful property of a {@code CountDownLatch} is that it
* doesn't require that threads calling {@code countDown} wait for
* the count to reach zero before proceeding, it simply prevents any
* thread from proceeding past an {@link #await await} until all
* threads could pass.
*
* ...
*
* @since 1.5
* @author Doug Lea
*/
public class CountDownLatch {
}
这里笔者大概列出以下几点:
CountDownLatch
是一个同步器,它可以让一个或多个线程进入等待,直到其他线程完成某些操作。CountDownLatch
构建时会对成员变量count
进行初始化,并在每次执行countDown
方法时将count-1
。- 当
count!=0
时,执行await
方法会进入阻塞状态。 - 当
count==0
时,执行await
方法会立刻返回(count
不能被重置)。
- 当
CountDownLatch
是一个多功能的同步器工具,可被用于多种用途:count
被初始化为1
的CountDownLatch
可以作为一个简单的开关门,所有调用方法await
的线程将进入等待,直到有一个线程调用方法countDown
来打开门。count
被初始化为N
的CountDownLatch
可以被用来使一个线程进入等待状态,直到N
个线程完成操作或者操作被完成N
次。
即,在CountDownLatch
创建时会初始化一个count
变量,然后在count!=0
的期间调用await
方法会让线程进入阻塞状态,直到执行countDown
方法使得count==0
为止。
此处需要注意:
CountDownLatch
中会发生阻塞的仅仅是await
方法,对countDown
方法调用并不会被阻塞。CountDownLatch
是一次性消费的同步器工具,计数器count
不会被重置并且后续(count==0
)await
方法的调用会立刻返回(不会发生阻塞)。如果需要可重置count
版本的同步器可以考虑使用CyclicBarrier
。
CountDownLatch
的使用
对于CountDownLatch
的使用,官方也提供了两个例子供我们参考,这里笔者整理了出来:
public class Driver { // ...
void main() throws InterruptedException {
// 用于通知线程开始执行,在驱动准备工作做好后才通知线程开始执行
CountDownLatch startSignal = new CountDownLatch(1);
// 用于等待所有线程完成,在工作线程全部执行完后才通知驱动结束线程
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
// 完成驱动准备工作
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
// 等待所有工作线程完成后驱动才结束线程
doneSignal.await(); // wait for all to finish
}
public static class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
// 等待驱动准备工作完成
startSignal.await();
doWork();
// 通知驱动当前线程执行完成
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
}
public class Driver { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
// 把任务分为N个,分别放入线程池执行
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
// 所有N个任务完成后才结束线程
doneSignal.await(); // wait for all to finish
}
public static class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
// 完成任务
doWork(i);
// 通知驱动当前线程执行完成
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
}
这里的两个例子几乎已经包含了我们在开发中使用到的场景了,比如说,当我们查询多个第三方接口时,可以通过CountDownLatch
的机制让所有的接口请求完成后,再将他们返回的结构全部组装起来(在使用上类似于第二个例子)。
CountDownLatch
的实现原理
AQS
在Java
中CountDownLatch
是基于AQS
实现的,而所谓AQS
(即,AbstractQueuedSynchronizer
)是JDK
提供给我们用于实现同步器的框架。关于AQS
的实现原理主要分为两部分:
- 通过一个原子变量
state
来确定同步器的状态(例如:N
表示等待,0
表示释放)。 - 通过一个
FIFO队列
存储获取资源失败的线程以实现阻塞等待的效果。
即,在AQS
中通过原子变量state
实现同步器语义,FIFO队列
实现阻塞效果。但对于使用者的我们是不需要太过关注其中的实现原理,而只需要对同步器的获取与释放语义进行定义即可,具体方法如下所示:
方法 | 描述 |
---|---|
tryAcquire |
表示在排他模式(EXCLUSIVE )下去获取资源,如果返回true 表示获取成功,否则表示获取失败。其中,在方法的实现中我们应该判断当前是否能在独占模式获取资源。 |
tryRelease |
表示在排他模式(EXCLUSIVE )下去释放资源,如果返回true 表示全部释放成功,否则表示释放失败或者部分释放。 |
tryAcquireShared |
表示在共享模式(SHARED )下去去获取资源,如果返回大于0 表示获取成功并且其后继节点也可能成功获取资源;如果返回等于0 表示获取成功但其后继节点不能再成功获取资源了;如果返回小于0 则表示获取失败。其中,在方法的实现中我们应该判断当前是否能够在共享模式下获取资源。 |
tryReleaseShared |
表示在共享模式(SHARED )下去释放资源,如果返回true 表示释放成功,否则表示释放失败。 |
isHeldExclusively |
表示资源是否被独占地持有,如果返回true 表示被独占持有,否则表示没有被独占持有。 |
即,当要实现独占语义时需要实现tryAcquire
、tryRelease
和isHeldExclusively
;当要实现共享语义时则需要实现tryAcquireShared
和tryReleaseShared
。
最终,我们就可以在实现类中使用以下方法了:
方法 | 描述 |
---|---|
tryAcquire |
表示在排他模式(EXCLUSIVE )下去获取资源,如果返回true 表示获取成功,否则表示获取失败。 |
tryRelease |
表示在排他模式(EXCLUSIVE )下去释放资源,如果返回true 表示全部释放成功,否则表示释放失败或者部分释放。 |
tryAcquireShared |
表示在共享模式(SHARED )下去去获取资源,如果返回大于0 表示获取成功并且其后继节点也可能成功获取资源;如果返回等于0 表示获取成功但其后继节点不能再成功获取资源了;如果返回小于0 则表示获取失败。 |
tryReleaseShared |
表示在共享模式(SHARED )下去释放资源,如果返回true 表示释放成功,否则表示释放失败。 |
isHeldExclusively |
表示资源是否被独占地持有,如果返回true 表示被独占持有,否则表示没有被独占持有。 |
acquire |
表示在排他模式(EXCLUSIVE )下去获取资源,如果获取失败会陷入阻塞(进入等待队列)直到获取成功。 |
acquireInterruptibly |
表示在排他模式(EXCLUSIVE )下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常。 |
tryAcquireNanos |
表示在排他模式(EXCLUSIVE )下在规定时间内去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常,其中如果在规定时间内获取成功会返回true ,超时则返回false 。 |
release |
表示在排他模式(EXCLUSIVE )下去释放资源,如果释放成功返回true ,否则返回false 。 |
acquireShared |
表示在共享模式(SHARED )下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功(与排他模式相比,此方法可以让多个线程同时获取到资源)。 |
acquireSharedInterruptibly |
表示在共享模式(SHARED )下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常(与排他模式相比,此方法可以让多个线程同时获取到资源)。 |
tryAcquireSharedNanos |
表示在共享模式(SHARED )下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常,其中如果在规定时间内获取成功会返回true ,超时则返回false 。(与排他模式相比,此方法可以让多个线程同时获取到资源)。 |
releaseShared |
表示在共享模式(SHARED )下去释放资源,如果释放成功返回true ,否则返回false 。 |
如果需要更深入学习
AQS
的话可以阅读笔者之前的文章《什么是AQS》
CountDownLatch
虽然说CountDownLatch
是基于AQS
实现的,但是在CountDownLatch
上并没有直接继承AQS
,而是通过其内部类Sync
来继承AQS
,并将CountDownLatch
的方法委托给Sync
进行处理。
因为CountDownLatch
是基于AQS
的共享模式来实现的,所以在Sync
类中只需要实现tryAcquireShared
和tryReleaseShared
方法即可。
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
// 初始设置state为count
setState(count);
}
protected int tryAcquireShared(int acquires) {
// 如果state==0恒定返回1,即一直传播下去
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
// 递减1处理,当递减到0会返回true,唤醒后续线程
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
在CountDownLatch.Sync
类中通过很巧妙的方式实现了同步器,即
- 在
tryAcquireShared
方法(资源获取的语义)中定义只要state
不等于0
就返回-1
表示无资源可用进入阻塞状态,而在state
等于0
时就返回1
表示可以无限获取资源。 - 在
tryReleaseShared
方法(资源释放的语义)中定义只有在state
递减至0
时才表示释放成功返回true
(如果state
本身就为0
或者递减后不为0
则表示失败返回false
)。
结合Sync
构造方法传入的计数器count
,整体可理解为:
- 首先在构建时根据传入参数将计数器
count
初始化 - 然后在每次获取资源(执行
tryAcquireShared
方法)时由于count
不为0
则获取失败并进入阻塞 - 接着不断的释放资源(执行
tryReleaseShared
方法)将计数器count
不断地递减1
直到减至0
- 最后资源完全释放成功并唤醒阻塞线程。由于此时计数器
count
恒定等于0
,所以唤醒的线程在后续资源获取中都会一直成功(在共享模式下会不断地向后传播唤醒信号)
最终,在CountDownLatch
构造方法中将计数器传入Sync
进行创建后,再将后续相应的方法调用委托给了Sync
:
-
初始化计数器(构建
Sync
内部类)public class CountDownLatch { /** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); // 传入初始化state的值 this.sync = new Sync(count); } }
-
await
阻塞操作/** * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}. * * <p>If the current count is zero then this method returns immediately. * * <p>If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of two things happen: * <ul> * <li>The count reaches zero due to invocations of the * {@link #countDown} method; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread. * </ul> * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting, * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @throws InterruptedException if the current thread is interrupted * while waiting */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
在
await
方法中调用acquireSharedInterruptibly
方法使得当前线程进入等待状态,直到阻塞中断或者计数器count
递减到0
对于可超时可中断的方法
await(long timeout, TimeUnit unit)
这里就不展开讲解了,它与await()
方法相比只是多了超时时间(如果超时则返回false
),而在实现上则是通过调用tryAcquireSharedNanos(1, unit.toNanos(timeout))
方法。 -
countDown
释放操作/** * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * * <p>If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * * <p>If the current count equals zero then nothing happens. */ public void countDown() { sync.releaseShared(1); }
在
countDown
方法中调用releaseShared
方法对资源进行释放(递减1
)。此处需要注意,调用releaseShared
方法传入的1
其实并没什么用,因为在实现tryReleaseShared
方法时已经写死了递减1
。
总结
至此,笔者对CountDownLatch
的介绍、用法和原理都已经阐述完毕了,如果还想更深一步了解CountDownLatch
可以再去学习一下AQS
。
评论区