Semaphore
就好像一个令牌桶,令牌的数量是固定的,当令牌被获取成功时数量会相应地进行递减,而在获取失败后存在两种失败策略,分别是阻塞等待和快速失败。
Semaphore
是什么
什么是Semaphore
?官方文档是这样阐述的:
/**
* A counting semaphore. Conceptually, a semaphore maintains a set of
* permits. Each {@link #acquire} blocks if necessary until a permit is
* available, and then takes it. Each {@link #release} adds a permit,
* potentially releasing a blocking acquirer.
* However, no actual permit objects are used; the {@code Semaphore} just
* keeps a count of the number available and acts accordingly.
*
* ...
*
* <p>A semaphore initialized to one, and which is used such that it
* only has at most one permit available, can serve as a mutual
* exclusion lock. This is more commonly known as a <em>binary
* semaphore</em>, because it only has two states: one permit
* available, or zero permits available. When used in this way, the
* binary semaphore has the property (unlike many {@link java.util.concurrent.locks.Lock}
* implementations), that the "lock" can be released by a
* thread other than the owner (as semaphores have no notion of
* ownership). This can be useful in some specialized contexts, such
* as deadlock recovery.
*
* <p> The constructor for this class optionally accepts a
* <em>fairness</em> parameter. When set false, this class makes no
* guarantees about the order in which threads acquire permits. In
* particular, <em>barging</em> is permitted, that is, a thread
* invoking {@link #acquire} can be allocated a permit ahead of a
* thread that has been waiting - logically the new thread places itself at
* the head of the queue of waiting threads. When fairness is set true, the
* semaphore guarantees that threads invoking any of the {@link
* #acquire() acquire} methods are selected to obtain permits in the order in
* which their invocation of those methods was processed
* (first-in-first-out; FIFO). Note that FIFO ordering necessarily
* applies to specific internal points of execution within these
* methods. So, it is possible for one thread to invoke
* {@code acquire} before another, but reach the ordering point after
* the other, and similarly upon return from the method.
* Also note that the untimed {@link #tryAcquire() tryAcquire} methods do not
* honor the fairness setting, but will take any permits that are
* available.
*
* <p>Generally, semaphores used to control resource access should be
* initialized as fair, to ensure that no thread is starved out from
* accessing a resource. When using semaphores for other kinds of
* synchronization control, the throughput advantages of non-fair
* ordering often outweigh fairness considerations.
*
* <p>This class also provides convenience methods to {@link
* #acquire(int) acquire} and {@link #release(int) release} multiple
* permits at a time. Beware of the increased risk of indefinite
* postponement when these methods are used without fairness set true.
*
* <p>Memory consistency effects: Actions in a thread prior to calling
* a "release" method such as {@code release()}
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions following a successful "acquire" method such as {@code acquire()}
* in another thread.
*
* @since 1.5
* @author Doug Lea
*/
public class Semaphore implements java.io.Serializable {
}
这里笔者大概列出以下几点:
Semaphore
是一个计数信号量。从概念来说,Semaphore
维护了一组许可证(Semaphore
实现中并没有使用实际的许可证对象,它只是持有一个可用数量的计数器):- 每次调用
acquire
方法都会获取一个许可证,如无许可证则进入阻塞状态(直到有许可证可用为止) - 每次调用
release
方法都会添加一个许可证,并唤醒阻塞的获取线程(释放)
- 每次调用
Semaphore
支持公平策略与非公平策略。在Semaphore
的构造方法中可通过参数fairness
来设置Semaphore
是否公平:- 当将参数
fairness
设置为false
时,Semaphore
将不保证线程获取许可证的顺序(因为在非公平策略下等待时间最长的线程会与最新获取许可证的线程发生竞争,所以即使等待时间最久也不一定能优先获取到许可证) - 当将参数
fairness
设置为true
时,Semaphore
将会保证线程获取许可证的顺序是FIFO
先进先出(FIFO
的排序发生在内部某个特殊的执行点上,因此存在一个线程在其他线程之前调用acquire
,但是其他线程比它先到达排序点)
- 当将参数
Semaphore
提供了可以一次性获取和释放多个许可证的方法,分别是acquire(int)
和release(int)
。但是需要注意的是,在非公平情况下使用这些方法可能会增加正在等待的线程陷入无限等待的风险。
许可证为
1
的Semaphore
,在使用时最多只有一个许可证可用,即我们可以把它作为一个互斥锁。这种Semaphore
通常被称为二元信号量,因为它只有两种状态:1
个或0
个可用许可证。但是,通过这种方式将信号量充当互斥锁来使用与一般意义的Lock
有点不一样,即持有的锁可以被锁拥有者以外的线程释放(这个属性在一些特殊的场景还是很有用的,比如死锁恢复)。
通常来说,被用来控制资源访问的
Semaphore
应该被设置为公平策略,确保没有线程因资源访问而饿死。而当使用Semaphore
进行其他类型的同步控制时,非公平的Semaphore
的吞吐量通常会比公平的Semaphore
有优势。
Semaphore
的使用
构造方法
public class Semaphore implements java.io.Serializable {
/**
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
*/
public Semaphore(int permits) {...}
/**
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
*/
public Semaphore(int permits, boolean fair) {...}
}
在Semaphore
中有两个构造方法,其中构造参数permits
表示Semaphore
拥有的许可证;fair
则表示Semaphore
是否公平(默认情况下Semaphore
采用不公平策略)。
acquire
方法
在Semaphore
中,我们可以通过acquire
(泛指)方法获取许可证(如获取失败则进入阻塞状态),其中根据每次获取许可证的数量可将其分为两种类型:
-
每次只能获取一个许可证,即
acuqire a permit
在
Semaphore
中,我们可以通过acquire
方法(泛指)获取1
个许可证,如果获取成功则将许可证数量递减1
。其中,根据acquire
方法不同的特性又将它分为以下4
种方 法:方法 描述 acquire()
获取一个许可证,如果成功则返回 true
,否则进入阻塞状态直到发生中断或者获取成功。acquireUninterruptibly()
获取一个许可证,如果成功则返回 true
,否则进入阻塞状态直到获取成功(不支持中断退出,但是会有中断状态)。tryAcquire()
试图获取一个许可证,如果获取成功则返回 true
,否则返回false
(不存在阻塞)。tryAcquire(long timeout, TimeUnit unit)
获取一个许可证,如果获取成功返回 true
(在给定时间内),否则进入阻塞状态直到发生中断、超时或者获取成功(在给 定时间内)。另外,如果超时时间小于等于0
,该方法将不会进行等待。上述
4
种方法中存在3
种方法会在许可证获取失败的情况进入到阻塞状态,对于这3
种进入到阻塞状态的acquire
方法(泛指)会在以下事件发生后被唤醒:- 其他线程调用
Semaphore
的release
方法释放一个许可证,阻塞线程被唤醒并且成功分配到一个许可证。 - 其他线程调用
Thread#interrupt
将阻塞线程中断了(如果方法支持中断,例如acquire()
和tryAcquire(long timeout, TimeUnit unit)
)。 - 阻塞线程等待时间超过指定时间(如果方法支持超时,例如
tryAcquire(long timeout, TimeUnit unit)
)。
对于公平策略下的
Semaphore
,tryAcquire
方法的调用无论是否存在其他线程正在等待,都可以成功获取到许可证(即忽略了Semaphore
的公平性)。如果您想遵循公平策略 的设置则可以使用tryAcquire(long, TimeUnit)
。 - 其他线程调用
-
每次可以获取多个许可证,即
acuqire the given number of permit
在
Semaphore
中也我们可以通过带permits
参数的acquire
方法(泛指)一次获取permits
个许可证,如果获取成功则将许可证数量递减permits
。同样,根据acquire
方法不同的特性又将它分为以下4
种方法:方法 描述 acquire(int permits)
获取 permits
个许可证,如果成功则返回true
,否则进入阻塞状态直到发生中断或者获取成功。acquireUninterruptibly(int permits)
获取 permits
个许可证,如果成功则返回true
,否则进入阻塞状态直到获取成功(不支持中断退出,但是会有中断状态) 。tryAcquire(int permits)
试图获取 permits
个许可证,如果获取成功则返回true
,否则返回false
(不存在阻塞)。tryAcquire(int permits, long timeout, TimeUnit unit)
获取 permits
个许可证,如果获取成功返回true
(在给定时间内),否则进入阻塞状态直到发生 中 断、超时或者获取成功(在给定时间内)。另外,如果超时时间小于等于0
,该方法将不会进行等待。除此之外,其他的具体逻辑都与一次获取
1
个许可证的acquire
方法(泛指)相同,在这里就不再展开了。需要注意的是,对于多个许可证的获取是符合原子性的,即要不全部获取成功,要不全部获取失败。
release
方法
在Semaphore
中,我们可以通过release
方法释放许可证,其中根据每次释放许可证的数量可将其分为两种类型:
- 每次只能释放一个许可证,即
release a permit
- 每次可以释放多个许可证,即
release the given number of permit
相比于acquire
方法(泛指),release
方法则没那么复杂,其仅仅存在无参的Semaphore#release
方法和带permits
参数的Semaphore#release
方法,即:
方法 | 描述 |
---|---|
release() |
释放1 个许可证。 |
release(int permits) |
释放permits 个许可证。 |
在release
方法释放许可证后,如果存在线程阻塞等待许可证,则将其唤醒并获取许可证。
需要注意的是,如果唤醒阻塞线程但是并没有足够的许可证,则被唤醒的线程将会继续进行阻塞等待,直到有足够的许可证为止。另外,释放许可证的线程并不需要与获取许可证的线程相同,即并非许可证拥有者才能释放许可证。
使用例子
书店限流:假设有一个书店,其店内最多容纳十个人,剩余的人将在店铺外进行等待,直到有人退出来。
/**
* 书店
*/
public class ShopMall{
/**
* 许可证的数量
*/
private final static int MAX_COUNT = 10;
/**
* 许可证
*/
private final Semaphore permits = new Semaphore(MAX_COUNT, true);
/**
* 进入书店,店内最多允许10个人
*/
public void enter() throws InterruptedException {
// 获取许可证,否则进行等待
permits.acquire();
view();
// 释放许可证
permits.release();
}
}
Semaphore
的实现原理
在了解完Semaphore
的使用后,下面笔者将对Semaphore
实现原理进行进一步的分析。
AQS
在Java
中Semaphore
是基于AQS
实现的,而所谓AQS
(即,AbstractQueuedSynchronizer
)是JDK
提供给我们用于实现同步器的框架。关于AQS
的实现原理主要分为两部分:
- 通过一个原子变量
state
来确定资源的数量(例如:N
表示存在N
个许可证,0
表示不存在许可证)。 - 通过一个
FIFO队列
存储获取资源失败的线程以实现阻塞等待的效果。
即,在AQS
中通过原子变量state
实现同步器语义,FIFO队列
实现阻塞效果。但对于使用者的我们是不需要太过关注其中的实现原理,而只需要对同步器的获取与释放语义进行定义即可,具体方法如下所示:
方法 | 描述 |
---|---|
tryAcquire |
表示在排他模式(EXCLUSIVE )下去获取资源,如果返回true 表示获取成功,否则表示获取失败。其中,在方法的实现中我们应该判断当前是否能在独占模式获取资源。 |
tryRelease |
表示在排他模式(EXCLUSIVE )下去释放资源,如果返回true 表示全部释放成功,否则表示释放失败或者部分释放。 |
tryAcquireShared |
表示在共享模式(SHARED )下去去获取资源,如果返回大于0 表示获取成功并且其后继节点也可能成功获取资源;如果返回等于0 表示获取成功但其后继节点不能再成功获取资源了;如果返回小于0 则表示获取失败。其中,在方法的实现中我们应该判断当前是否能够在共享模式下获取资源。 |
tryReleaseShared |
表示在共享模式(SHARED )下去释放资源,如果返回true 表示释放成功,否则表示释放失败。 |
isHeldExclusively |
表示资源是否被独占地持有,如果返回true 表示被独占持有,否则表示没有被独占持有。 |
即,当要实现独占语义时需要实现tryAcquire
、tryRelease
和isHeldExclusively
;当要实现共享语义时则需要实现tryAcquireShared
和tryReleaseShared
。
最终,我们就可以在实现类中使用以下方法了:
方法 | 描述 |
---|---|
tryAcquire |
表示在排他模式(EXCLUSIVE )下去获取资源,如果返回true 表示获取成功,否则表示获取失败。 |
tryRelease |
表示在排他模式(EXCLUSIVE )下去释放资源,如果返回true 表示全部释放成功,否则表示释放失败或者部分释放。 |
tryAcquireShared |
表示在共享模式(SHARED )下去去获取资源,如果返回大于0 表示获取成功并且其后继节点也可能成功获取资源;如果返回等于0 表示获取成功但其后继节点不能再成功获取资源了;如果返回小于0 则表示获取失败。 |
tryReleaseShared |
表示在共享模式(SHARED )下去释放资源,如果返回true 表示释放成功,否则表示释放失败。 |
isHeldExclusively |
表示资源是否被独占地持有,如果返回true 表示被独占持有,否则表示没有被独占持有。 |
acquire |
表示在排他模式(EXCLUSIVE )下去获取资源,如果获取失败会陷入阻塞(进入等待队列)直到获取成功。 |
acquireInterruptibly |
表示在排他模式(EXCLUSIVE )下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常。 |
tryAcquireNanos |
表示在排他模式(EXCLUSIVE )下在规定时间内去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常,其中如果在规定时间内获取成功会返回true ,超时则返回false 。 |
release |
表示在排他模式(EXCLUSIVE )下去释放资源,如果释放成功返回true ,否则返回false 。 |
acquireShared |
表示在共享模式(SHARED )下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功(与排他模式相比,此方法可以让多个线程同时获取到资源)。 |
acquireSharedInterruptibly |
表示在共享模式(SHARED )下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常(与排他模式相比,此方法可以让多个线程同时获取到资源)。 |
tryAcquireSharedNanos |
表示在共享模式(SHARED )下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常,其中如果在规定时间内获取成功会返回true ,超时则返回false 。(与排他模式相比,此方法可以让多个线程同时获取到资源)。 |
releaseShared |
表示在共享模式(SHARED )下去释放资源,如果释放成功返回true ,否则返回false 。 |
如果需要更深入学习
AQS
的话可以阅读笔者之前的文章《什么是AQS》
Semaphore
与ReentrantLock
一样,Semaphore
也存在两种策略,分别是公平策略(FairSync
)和非公平策略(NonfairSync
)。从上文也得知了在Semaphore
是通过参数来进行控制是否公平的,即构建时选择相应策略实现赋值给sync
,然后将所有Semaphore
方法调用委托给sync
进行处理。
首先我们先来看Semaphore
的构造方法:
public class Semaphore implements java.io.Serializable {
/** All mechanics via AbstractQueuedSynchronizer subclass */
private final Sync sync;
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
// 构建公平/非公平的Sync并赋值给sync
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
}
在Semaphore
构造方法首先会根据fair
参数选择相应的实现策略(公平或者非公平策略,默认为非公平策略),然后将颁发的许可证数量传入相应的策略实现中进行初始化。
然后,将Semaphore
的方法调用都委托给了具体的策略实现类Sync
:
public class Semaphore implements java.io.Serializable {
private final Sync sync;
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public void acquireUninterruptibly() { sync.acquireShared(1); }
public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; }
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
public void release() { sync.releaseShared(1); }
// 为了看起来简洁,以下方法忽略了语句:if (permits < 0) throw new IllegalArgumentException();
public void acquire(int permits) throws InterruptedException { sync.acquireSharedInterruptibly(permits); }
public void acquireUninterruptibly(int permits) { sync.acquireShared(permits); }
public boolean tryAcquire(int permits) { return sync.nonfairTryAcquireShared(permits) >= 0; }
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException { return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }
public void release(int permits) { sync.releaseShared(permits); }
}
也就是说,Semaphore
具体的实现逻辑都委托到了Sync
中进行处理,其中Sync
是基于AQS
进行实现的。根据AQS
的实现在共享模式下(Semaphore
属于共享模式)只需要实现tryAcquireShared
和tryReleaseShared
方法。
然而,对于Semaphore
公平策略和非公平策略之间的区别仅仅在tryAcquireShared
上存在不同,两者在tryReleaseShared
方法上的行为则相同的,所以在Sync
实现中将tryAcquireShared
的实现下放到子类FairSync
和NonfairSync
中,下面我们先来看看Sync
:
/**
* Synchronization implementation for semaphore. Uses AQS state
* to represent permits. Subclassed into fair and nonfair
* versions.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// ...
}
在Sync
中对于tryReleaseShared
方法的实现是通过CAS
机制来增加传入许可证的数量以此来达到释放的效果,并最终唤醒因为获取许可证失败而进入阻塞中的线程。
接下来,我们再来看看在FairSync
和NonfairSync
上是如何实现tryAcquireShared
的(为了便于理解,笔者在不影响逻辑的情况下简单改写了一下tryAcquireShared
方法):
-
非公平策略
NonfairSync
/** * NonFair version */ static final class NonfairSync extends Sync { NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { // 此处作用就是通过CAS原子性的减去acquires个许可证 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
在
NonfairSync#tryAcquireShared
方法中是通过CAS
机制来递减传入的许可证,并返回剩余许可证的数量,即在剩余许可证大于等于0
表示获取成功,否则(负数)表示获取失败进入阻塞状态。 -
公平策略
FairSync
/** * Fair version */ static final class FairSync extends Sync { FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
与非公平策略的
NonfairSync
相比,公平策略FairSync
在tryAcquireShared
方法的实现中仅仅是加多了hasQueuedPredecessors
方法的判断,即判断当前获取线程是否在等待队列中等待时间最长,以此来达到公平的效果。hasQueuedPredecessors
方法用于查看等待队列中是否存在比当前节点(线程)等待时间长的等待节点(线程),即是否存在等待节点(线程)位于当前节点(线程)的前面。换句话说,只有当前线程前面没有线程正在等待时才尝试获取许可证,否则进入到队列中进行等待。最终通过这样的方式就实现了公平策略。
总结
至此,本文已经对Semaphore
所有关键操作进行了分析。简单来说,Semaphore
是一个典型的基于AQS
共享模式的实现类,如果还想更深一步了解Semaphore
可以再去学习一下AQS
。
评论区