侧边栏壁纸
博主头像
一啖焗捞丁

砖头man👷🏻‍♂️

  • 累计撰写 16 篇文章
  • 累计创建 3 个标签
  • 累计收到 1 条评论
标签搜索

目 录CONTENT

文章目录

Semaphore实现原理

一啖焗捞丁
2021-08-28 / 0 评论 / 0 点赞 / 751 阅读 / 5,436 字

Semaphore就好像一个令牌桶,令牌的数量是固定的,当令牌被获取成功时数量会相应地进行递减,而在获取失败后存在两种失败策略,分别是阻塞等待和快速失败。

Semaphore是什么

什么是Semaphore?官方文档是这样阐述的:

/**
 * A counting semaphore.  Conceptually, a semaphore maintains a set of
 * permits.  Each {@link #acquire} blocks if necessary until a permit is
 * available, and then takes it.  Each {@link #release} adds a permit,
 * potentially releasing a blocking acquirer.
 * However, no actual permit objects are used; the {@code Semaphore} just
 * keeps a count of the number available and acts accordingly.
 *
 * ...
 *
 * <p>A semaphore initialized to one, and which is used such that it
 * only has at most one permit available, can serve as a mutual
 * exclusion lock.  This is more commonly known as a <em>binary
 * semaphore</em>, because it only has two states: one permit
 * available, or zero permits available.  When used in this way, the
 * binary semaphore has the property (unlike many {@link java.util.concurrent.locks.Lock}
 * implementations), that the &quot;lock&quot; can be released by a
 * thread other than the owner (as semaphores have no notion of
 * ownership).  This can be useful in some specialized contexts, such
 * as deadlock recovery.
 *
 * <p> The constructor for this class optionally accepts a
 * <em>fairness</em> parameter. When set false, this class makes no
 * guarantees about the order in which threads acquire permits. In
 * particular, <em>barging</em> is permitted, that is, a thread
 * invoking {@link #acquire} can be allocated a permit ahead of a
 * thread that has been waiting - logically the new thread places itself at
 * the head of the queue of waiting threads. When fairness is set true, the
 * semaphore guarantees that threads invoking any of the {@link
 * #acquire() acquire} methods are selected to obtain permits in the order in
 * which their invocation of those methods was processed
 * (first-in-first-out; FIFO). Note that FIFO ordering necessarily
 * applies to specific internal points of execution within these
 * methods.  So, it is possible for one thread to invoke
 * {@code acquire} before another, but reach the ordering point after
 * the other, and similarly upon return from the method.
 * Also note that the untimed {@link #tryAcquire() tryAcquire} methods do not
 * honor the fairness setting, but will take any permits that are
 * available.
 *
 * <p>Generally, semaphores used to control resource access should be
 * initialized as fair, to ensure that no thread is starved out from
 * accessing a resource. When using semaphores for other kinds of
 * synchronization control, the throughput advantages of non-fair
 * ordering often outweigh fairness considerations.
 *
 * <p>This class also provides convenience methods to {@link
 * #acquire(int) acquire} and {@link #release(int) release} multiple
 * permits at a time.  Beware of the increased risk of indefinite
 * postponement when these methods are used without fairness set true.
 *
 * <p>Memory consistency effects: Actions in a thread prior to calling
 * a "release" method such as {@code release()}
 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * actions following a successful "acquire" method such as {@code acquire()}
 * in another thread.
 *
 * @since 1.5
 * @author Doug Lea
 */
public class Semaphore implements java.io.Serializable {
}

这里笔者大概列出以下几点:

  1. Semaphore是一个计数信号量。从概念来说,Semaphore维护了一组许可证(Semaphore实现中并没有使用实际的许可证对象,它只是持有一个可用数量的计数器):
    • 每次调用acquire方法都会获取一个许可证,如无许可证则进入阻塞状态(直到有许可证可用为止)
    • 每次调用release方法都会添加一个许可证,并唤醒阻塞的获取线程(释放)
  2. Semaphore支持公平策略与非公平策略。在Semaphore的构造方法中可通过参数fairness来设置Semaphore是否公平:
    • 当将参数fairness设置为false时,Semaphore将不保证线程获取许可证的顺序(因为在非公平策略下等待时间最长的线程会与最新获取许可证的线程发生竞争,所以即使等待时间最久也不一定能优先获取到许可证)
    • 当将参数fairness设置为true时,Semaphore将会保证线程获取许可证的顺序是FIFO先进先出(FIFO的排序发生在内部某个特殊的执行点上,因此存在一个线程在其他线程之前调用acquire,但是其他线程比它先到达排序点)
  3. Semaphore提供了可以一次性获取和释放多个许可证的方法,分别是acquire(int)release(int)。但是需要注意的是,在非公平情况下使用这些方法可能会增加正在等待的线程陷入无限等待的风险。

许可证为1Semaphore,在使用时最多只有一个许可证可用,即我们可以把它作为一个互斥锁。这种Semaphore通常被称为二元信号量,因为它只有两种状态:1个或0个可用许可证。但是,通过这种方式将信号量充当互斥锁来使用与一般意义的Lock有点不一样,即持有的锁可以被锁拥有者以外的线程释放(这个属性在一些特殊的场景还是很有用的,比如死锁恢复)。

通常来说,被用来控制资源访问的Semaphore应该被设置为公平策略,确保没有线程因资源访问而饿死。而当使用Semaphore进行其他类型的同步控制时,非公平的Semaphore的吞吐量通常会比公平的Semaphore有优势。

Semaphore的使用

构造方法

public class Semaphore implements java.io.Serializable {
    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and nonfair fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     */
    public Semaphore(int permits) {...}

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and the given fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     * @param fair {@code true} if this semaphore will guarantee
     *        first-in first-out granting of permits under contention,
     *        else {@code false}
     */
    public Semaphore(int permits, boolean fair) {...}
}

Semaphore中有两个构造方法,其中构造参数permits表示Semaphore拥有的许可证;fair则表示Semaphore是否公平(默认情况下Semaphore采用不公平策略)。

acquire方法

Semaphore中,我们可以通过acquire(泛指)方法获取许可证(如获取失败则进入阻塞状态),其中根据每次获取许可证的数量可将其分为两种类型:

  • 每次只能获取一个许可证,即acuqire a permit

    Semaphore中,我们可以通过acquire方法(泛指)获取1个许可证,如果获取成功则将许可证数量递减1。其中,根据acquire方法不同的特性又将它分为以下4种方 法:

    方法 描述
    acquire() 获取一个许可证,如果成功则返回true,否则进入阻塞状态直到发生中断或者获取成功。
    acquireUninterruptibly() 获取一个许可证,如果成功则返回true,否则进入阻塞状态直到获取成功(不支持中断退出,但是会有中断状态)。
    tryAcquire() 试图获取一个许可证,如果获取成功则返回true,否则返回false(不存在阻塞)。
    tryAcquire(long timeout, TimeUnit unit) 获取一个许可证,如果获取成功返回true(在给定时间内),否则进入阻塞状态直到发生中断、超时或者获取成功(在给 定时间内)。另外,如果超时时间小于等于0,该方法将不会进行等待。

    上述4种方法中存在3种方法会在许可证获取失败的情况进入到阻塞状态,对于这3种进入到阻塞状态的acquire方法(泛指)会在以下事件发生后被唤醒:

    • 其他线程调用Semaphorerelease方法释放一个许可证,阻塞线程被唤醒并且成功分配到一个许可证。
    • 其他线程调用Thread#interrupt将阻塞线程中断了(如果方法支持中断,例如acquire()tryAcquire(long timeout, TimeUnit unit))。
    • 阻塞线程等待时间超过指定时间(如果方法支持超时,例如tryAcquire(long timeout, TimeUnit unit))。

    对于公平策略下的SemaphoretryAcquire方法的调用无论是否存在其他线程正在等待,都可以成功获取到许可证(即忽略了Semaphore的公平性)。如果您想遵循公平策略 的设置则可以使用tryAcquire(long, TimeUnit)

  • 每次可以获取多个许可证,即acuqire the given number of permit

    Semaphore中也我们可以通过带permits参数的acquire方法(泛指)一次获取permits个许可证,如果获取成功则将许可证数量递减permits。同样,根据 acquire方法不同的特性又将它分为以下4种方法:

    方法 描述
    acquire(int permits) 获取permits个许可证,如果成功则返回true,否则进入阻塞状态直到发生中断或者获取成功。
    acquireUninterruptibly(int permits) 获取permits个许可证,如果成功则返回true,否则进入阻塞状态直到获取成功(不支持中断退出,但是会有中断状态) 。
    tryAcquire(int permits) 试图获取permits个许可证,如果获取成功则返回true,否则返回false(不存在阻塞)。
    tryAcquire(int permits, long timeout, TimeUnit unit) 获取permits个许可证,如果获取成功返回true(在给定时间内),否则进入阻塞状态直到发生 中 断、超时或者获取成功(在给定时间内)。另外,如果超时时间小于等于0,该方法将不会进行等待。

    除此之外,其他的具体逻辑都与一次获取1个许可证的acquire方法(泛指)相同,在这里就不再展开了。

    需要注意的是,对于多个许可证的获取是符合原子性的,即要不全部获取成功,要不全部获取失败。

release方法

Semaphore中,我们可以通过release方法释放许可证,其中根据每次释放许可证的数量可将其分为两种类型:

  • 每次只能释放一个许可证,即release a permit
  • 每次可以释放多个许可证,即release the given number of permit

相比于acquire方法(泛指),release方法则没那么复杂,其仅仅存在无参的Semaphore#release方法和带permits参数的Semaphore#release方法,即:

方法 描述
release() 释放1个许可证。
release(int permits) 释放permits个许可证。

release方法释放许可证后,如果存在线程阻塞等待许可证,则将其唤醒并获取许可证。

需要注意的是,如果唤醒阻塞线程但是并没有足够的许可证,则被唤醒的线程将会继续进行阻塞等待,直到有足够的许可证为止。另外,释放许可证的线程并不需要与获取许可证的线程相同,即并非许可证拥有者才能释放许可证。

使用例子

书店限流:假设有一个书店,其店内最多容纳十个人,剩余的人将在店铺外进行等待,直到有人退出来。

/**
 * 书店
 */
public class ShopMall{
    /**
     * 许可证的数量
     */
    private final static int MAX_COUNT = 10;
    /**
     * 许可证
     */
    private final Semaphore permits = new Semaphore(MAX_COUNT, true);

    /**
     * 进入书店,店内最多允许10个人
     */
    public void enter() throws InterruptedException {
        // 获取许可证,否则进行等待
        permits.acquire();

        view();

        // 释放许可证
        permits.release();
    }
}

Semaphore的实现原理

在了解完Semaphore的使用后,下面笔者将对Semaphore实现原理进行进一步的分析。

AQS

JavaSemaphore是基于AQS实现的,而所谓AQS(即,AbstractQueuedSynchronizer)是JDK提供给我们用于实现同步器的框架。关于AQS的实现原理主要分为两部分:

  1. 通过一个原子变量state来确定资源的数量(例如:N表示存在N个许可证,0表示不存在许可证)。
  2. 通过一个FIFO队列存储获取资源失败的线程以实现阻塞等待的效果。

即,在AQS中通过原子变量state实现同步器语义,FIFO队列实现阻塞效果。但对于使用者的我们是不需要太过关注其中的实现原理,而只需要对同步器的获取与释放语义进行定义即可,具体方法如下所示:

方法 描述
tryAcquire 表示在排他模式(EXCLUSIVE)下去获取资源,如果返回true表示获取成功,否则表示获取失败。其中,在方法的实现中我们应该判断当前是否能在独占模式获取资源。
tryRelease 表示在排他模式(EXCLUSIVE)下去释放资源,如果返回true表示全部释放成功,否则表示释放失败或者部分释放。
tryAcquireShared 表示在共享模式(SHARED)下去去获取资源,如果返回大于0表示获取成功并且其后继节点也可能成功获取资源;如果返回等于0表示获取成功但其后继节点不能再成功获取资源了;如果返回小于0则表示获取失败。其中,在方法的实现中我们应该判断当前是否能够在共享模式下获取资源。
tryReleaseShared 表示在共享模式(SHARED)下去释放资源,如果返回true表示释放成功,否则表示释放失败。
isHeldExclusively 表示资源是否被独占地持有,如果返回true表示被独占持有,否则表示没有被独占持有。

即,当要实现独占语义时需要实现tryAcquiretryReleaseisHeldExclusively;当要实现共享语义时则需要实现tryAcquireSharedtryReleaseShared

最终,我们就可以在实现类中使用以下方法了:

方法 描述
tryAcquire 表示在排他模式(EXCLUSIVE)下去获取资源,如果返回true表示获取成功,否则表示获取失败。
tryRelease 表示在排他模式(EXCLUSIVE)下去释放资源,如果返回true表示全部释放成功,否则表示释放失败或者部分释放。
tryAcquireShared 表示在共享模式(SHARED)下去去获取资源,如果返回大于0表示获取成功并且其后继节点也可能成功获取资源;如果返回等于0表示获取成功但其后继节点不能再成功获取资源了;如果返回小于0则表示获取失败。
tryReleaseShared 表示在共享模式(SHARED)下去释放资源,如果返回true表示释放成功,否则表示释放失败。
isHeldExclusively 表示资源是否被独占地持有,如果返回true表示被独占持有,否则表示没有被独占持有。
acquire 表示在排他模式(EXCLUSIVE)下去获取资源,如果获取失败会陷入阻塞(进入等待队列)直到获取成功。
acquireInterruptibly 表示在排他模式(EXCLUSIVE)下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常。
tryAcquireNanos 表示在排他模式(EXCLUSIVE)下在规定时间内去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常,其中如果在规定时间内获取成功会返回true,超时则返回false
release 表示在排他模式(EXCLUSIVE)下去释放资源,如果释放成功返回true,否则返回false
acquireShared 表示在共享模式(SHARED)下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功(与排他模式相比,此方法可以让多个线程同时获取到资源)。
acquireSharedInterruptibly 表示在共享模式(SHARED)下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常(与排他模式相比,此方法可以让多个线程同时获取到资源)。
tryAcquireSharedNanos 表示在共享模式(SHARED)下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常,其中如果在规定时间内获取成功会返回true,超时则返回false。(与排他模式相比,此方法可以让多个线程同时获取到资源)。
releaseShared 表示在共享模式(SHARED)下去释放资源,如果释放成功返回true,否则返回false

如果需要更深入学习AQS的话可以阅读笔者之前的文章《什么是AQS》

Semaphore

ReentrantLock一样,Semaphore也存在两种策略,分别是公平策略(FairSync)和非公平策略(NonfairSync)。从上文也得知了在Semaphore是通过参数来进行控制是否公平的,即构建时选择相应策略实现赋值给sync,然后将所有Semaphore方法调用委托给sync进行处理。

首先我们先来看Semaphore的构造方法:

public class Semaphore implements java.io.Serializable {

    /** All mechanics via AbstractQueuedSynchronizer subclass */
    private final Sync sync;

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        // 构建公平/非公平的Sync并赋值给sync
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

}

Semaphore构造方法首先会根据fair参数选择相应的实现策略(公平或者非公平策略,默认为非公平策略),然后将颁发的许可证数量传入相应的策略实现中进行初始化。

然后,将Semaphore的方法调用都委托给了具体的策略实现类Sync

public class Semaphore implements java.io.Serializable {

    private final Sync sync;

    public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
    public void acquireUninterruptibly() { sync.acquireShared(1); }
    public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; }
    public boolean tryAcquire(long timeout, TimeUnit unit) 
        throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }

    public void release() { sync.releaseShared(1); }

    // 为了看起来简洁,以下方法忽略了语句:if (permits < 0) throw new IllegalArgumentException();
    public void acquire(int permits) throws InterruptedException { sync.acquireSharedInterruptibly(permits); }
    public void acquireUninterruptibly(int permits) { sync.acquireShared(permits); }
    public boolean tryAcquire(int permits) { return sync.nonfairTryAcquireShared(permits) >= 0; }
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException { return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }

    public void release(int permits) { sync.releaseShared(permits); }

}

也就是说,Semaphore具体的实现逻辑都委托到了Sync中进行处理,其中Sync是基于AQS进行实现的。根据AQS的实现在共享模式下(Semaphore属于共享模式)只需要实现tryAcquireSharedtryReleaseShared方法。

然而,对于Semaphore公平策略和非公平策略之间的区别仅仅在tryAcquireShared上存在不同,两者在tryReleaseShared方法上的行为则相同的,所以在Sync实现中将tryAcquireShared的实现下放到子类FairSyncNonfairSync中,下面我们先来看看Sync

/**
 * Synchronization implementation for semaphore.  Uses AQS state
 * to represent permits. Subclassed into fair and nonfair
 * versions.
 */
abstract static class Sync extends AbstractQueuedSynchronizer {

    Sync(int permits) {
        setState(permits);
    }

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }

    // ...
}

Sync中对于tryReleaseShared方法的实现是通过CAS机制来增加传入许可证的数量以此来达到释放的效果,并最终唤醒因为获取许可证失败而进入阻塞中的线程。

接下来,我们再来看看在FairSyncNonfairSync上是如何实现tryAcquireShared的(为了便于理解,笔者在不影响逻辑的情况下简单改写了一下tryAcquireShared方法):

  • 非公平策略NonfairSync

    /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
    
        NonfairSync(int permits) {
            super(permits);
        }
    
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                // 此处作用就是通过CAS原子性的减去acquires个许可证
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
    

    NonfairSync#tryAcquireShared方法中是通过CAS机制来递减传入的许可证,并返回剩余许可证的数量,即在剩余许可证大于等于0表示获取成功,否则(负数)表示获取失败进入阻塞状态。

  • 公平策略FairSync

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
    
        FairSync(int permits) {
            super(permits);
        }
    
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
    

    与非公平策略的NonfairSync相比,公平策略FairSynctryAcquireShared方法的实现中仅仅是加多了hasQueuedPredecessors方法的判断,即判断当前获取线程是否在等待队列中等待时间最长,以此来达到公平的效果。

    hasQueuedPredecessors方法用于查看等待队列中是否存在比当前节点(线程)等待时间长的等待节点(线程),即是否存在等待节点(线程)位于当前节点(线程)的前面。换句话说,只有当前线程前面没有线程正在等待时才尝试获取许可证,否则进入到队列中进行等待。最终通过这样的方式就实现了公平策略。

总结

至此,本文已经对Semaphore所有关键操作进行了分析。简单来说,Semaphore是一个典型的基于AQS共享模式的实现类,如果还想更深一步了解Semaphore可以再去学习一下AQS

0

评论区