侧边栏壁纸
博主头像
一啖焗捞丁

砖头man👷🏻‍♂️

  • 累计撰写 16 篇文章
  • 累计创建 3 个标签
  • 累计收到 1 条评论
标签搜索

目 录CONTENT

文章目录

Future为什么能作为异步运算结果

一啖焗捞丁
2021-09-20 / 0 评论 / 0 点赞 / 740 阅读 / 6,755 字

Future可用于表示异步执行的结果,那你知道Future是怎么实现的吗?

Future是什么

什么是Future?官方文档是这样描述它的(JDK 1.8)。

/**
 * A {@code Future} represents the result of an asynchronous
 * computation.  Methods are provided to check if the computation is
 * complete, to wait for its completion, and to retrieve the result of
 * the computation.  The result can only be retrieved using method
 * {@code get} when the computation has completed, blocking if
 * necessary until it is ready.  Cancellation is performed by the
 * {@code cancel} method.  Additional methods are provided to
 * determine if the task completed normally or was cancelled. Once a
 * computation has completed, the computation cannot be cancelled.
 * If you would like to use a {@code Future} for the sake
 * of cancellability but not provide a usable result, you can
 * declare types of the form {@code Future<?>} and
 * return {@code null} as a result of the underlying task.
 *
 * ...
 *
 */
public interface Future<V> {
}

Future可用于表示异步计算的结果,我们可以通过其中的一些方法来检查异步计算是否已经完成,如果已完成可调用get方法获取结果(若尚未完成就调用了get方法则会进入阻塞直至完成)。除此之外,我们还可以调用cancel方法来取消任务的执行。下面我们来看看官方文档给出的Future使用demo

public interface ArchiveSearcher { String search(String target); }

public class App {
    
    ExecutorService executor = ...
    ArchiveSearcher searcher = ...
    
    void showSearch(final String target) throws InterruptedException {
        
        Future<String> future = executor.submit(new Callable<String>() {
            public String call() {
                return searcher.search(target);
            }
        });
        
        displayOtherThings(); // do other things while searching
        
        try {
            displayText(future.get()); // use future
        } catch (ExecutionException ex) { 
            cleanup(); 
            return; 
        }
    }
}

demo中,我们可以看到它首先将查询数据这些耗时操作放到了异步线程池执行并返回Future,接着执行其他的一些耗时的操作,最后在其他耗时操作执行完成后才通过Future获取查询结果,以此来避免在进行IO阻塞时CPU处于空闲状态。同时,这也体现了Future存在的意义。

另外,在Future类中定义了一些方法,这里笔者总结了下来:

方法 说明
cancel 用于试图去取消任务的执行。如果任务已完成、已取消(或者其他不能被取消的原因)则调用失败;如果任务还没有开始,则调用成功,并且任务将永远不会执行;如果任务已经开始,则由参数mayInterruptIfRunning决定正在执行任务的线程是否应该被中断。
isCancelled 用于判断任务是否被取消。如果任务已取消,则返回true,否则返回false
isDone 用于判断任务是否已完成。如果任务已完成,则返回true,否则返回false。其中”已完成“包括正常终止、异常终止或被取消,这些情况都会返回true
get 用于获取计算结果,在必要时需要等待运算完成。
get(timeout) 用于获取计算结果,在必要时需要等待运算完成(有最大等待时间)。

到这里已经完成了对Future接口的分析,但也只是Future对其定义的解析和约束,具体还需要去往其实现子类中寻找答案。

Future的实现

对于Future接口,它最经典的实现即是FutureTask,所以接下来我们将对FutureTask进一步分析。

FutureTask是什么

/**
 * A cancellable asynchronous computation.  This class provides a base
 * implementation of {@link Future}, with methods to start and cancel
 * a computation, query to see if the computation is complete, and
 * retrieve the result of the computation.  The result can only be
 * retrieved when the computation has completed; the {@code get}
 * methods will block if the computation has not yet completed.  Once
 * the computation has completed, the computation cannot be restarted
 * or cancelled (unless the computation is invoked using
 * {@link #runAndReset}).
 *
 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
 * {@link Runnable} object.  Because {@code FutureTask} implements
 * {@code Runnable}, a {@code FutureTask} can be submitted to an
 * {@link Executor} for execution.
 *
 * ...
 *
 */
public class FutureTask<V> implements RunnableFuture<V> {...}

FutureTask继承自RunnableFuture接口,并基于它所实现的一个可取消的异步计算结果对象。在FutureTask的使用上,它可以被包装为CallbackRunnable对象,也正因如此FutureTask可以被提交到Executor执行。

RunnableFuture接口则继承了RunnableFuture两个接口,具体如下所示。

/**
 * A {@link Future} that is {@link Runnable}. Successful execution of
 * the {@code run} method causes completion of the {@code Future}
 * and allows access to its results.
 * @see FutureTask
 * @see Executor
 * @since 1.6
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 */
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

FutureTask的使用

对于FutureTask的显式使用也许在日常开发中很少会接触到,但对于它的间接使用却是十分频繁。例如,平时我们通过线程池去执行submit时就会用到FutureTask

下面我们再借用上文所提及的demo来展示FutureTask的用法。

public interface ArchiveSearcher { String search(String target); }

public class App {
    ExecutorService executor = ...
    ArchiveSearcher searcher = ...
    
    void showSearch(final String target) throws InterruptedException {
        
        Future<String> future = executor.submit(new Callable<String>() {
            public String call() {
                return searcher.search(target);
            }
        });
        
        displayOtherThings(); // do other things while searching
        
        try {
            displayText(future.get()); // use future
        } catch (ExecutionException ex) { 
            cleanup(); 
            return; 
        }
    }
}

即,将任务提交到submit方法中执行并获取其返回的结果FutureTask,然后在后续执行其他耗时操作后再通过get方法获取其执行结果。

其实,submit操作可以等价于将FutureTask作为参数传入execute方法中执行,如下所示:

FutureTask<String> future =
  new FutureTask<String>(new Callable<String>() {
    public String call() {
      return searcher.search(target);
  }});
executor.execute(future);

FutureTask的更新

另外,在JDK 1.8上可以看到这么一段注释:

/*
 * Revision notes: This differs from previous versions of this
 * class that relied on AbstractQueuedSynchronizer, mainly to
 * avoid surprising users about retaining interrupt status during
 * cancellation races. Sync control in the current design relies
 * on a "state" field updated via CAS to track completion, along
 * with a simple Treiber stack to hold waiting threads.
 *
 * Style note: As usual, we bypass overhead of using
 * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
 */

这里主要对FutureTask的版本更新进行说明,简单来说就是当前版本的实现与之前版本(JDK 1.8前)有所不同。即,之前版本是通过AQS实现的,但通过AQS实现在取消期间保留了中断状态会让用户很惊讶,所以在JDK 1.8就改成现在这样的实现。

关于JDK 1.8之前的实现原理可以阅读下面两个资料链接:

FutureTask实现原理

状态机

FutureTask对其任务执行结果定义了一个状态机,分别用数字06来表示。同时通过这种方式也让我们可以很方便的利用范围比较来进行对状态的判断(JDK一贯的代码风格)。下面我们来看看这部分代码:

public class FutureTask<V> implements RunnableFuture<V> {

    /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
}

在代码注释上已经对整个状态机作了大体的说明,也列举出所有状态的转移逻辑,这里就不再一一分析了。下面笔者将所有的状态转移描绘为一张图,如下所示:

                                                                      
                                  +--------------+          +---------------+
                           +----->+ INTERRUPTING +--------->+  INTERRUPTED  |
                           |      +--------------+          +---------------+
                           |
                           |
                           |                                +---------------+
             +---------+   |                           +--->+    NORMAL     |
 init +----->+   NEW   +---+                           |    +---------------+
             +----+----+   |      +--------------+     |
                  |        +----->+  COMPLETING  +-----+
                  |               +--------------+     |
                  |                                    |    +---------------+
                  |                                    +--->+  EXCEPTIONAL  |
                  v                                         +---------------+
           +------+-------+
           |  CANCELLED   |
           +--------------+

FutureTask中所实现的任务进度跟踪也是通过此状态机来实现的,下面我们可以看到isCancelled方法与isDone方法:

public boolean isCancelled() {
    return state >= CANCELLED;
}

public boolean isDone() {
    return state != NEW;
}

从代码中可以看到,对于检测任务是否被取消的isCancelled方法和检测任务是否已完成的isDone方法都是通过对状态机进行判断来实现的。

构造

在了解完FutureTask的状态机后,下面我们通过FutureTask的使用进一步分析其实现原理,即:

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

这里有3submit方法,它们的实现原理都是先通过将参数RunnableCallable转化为RunnableFutureFutureTask的父类),然后再将RunnableFuture传入execute方法执行(RunnableFuture继承自Runnable),最后把RunnableFuture返回给调用者。

submit方法本质上还是将Runnable传入execute方法中执行,只不过它通过FutureTask进行了一定程度的封装使其可以将执行结果(含异常)暂存起来。

那么在submit方法中是如何将RunnableCallable转化为RunnableFuture的呢?这里我们一起来看看实现转换的newTaskFor方法:

/**
 * Returns a {@code RunnableFuture} for the given runnable and default
 * value.
 *
 * @param runnable the runnable task being wrapped
 * @param value the default value for the returned future
 * @param <T> the type of the given value
 * @return a {@code RunnableFuture} which, when run, will run the
 * underlying runnable and which, as a {@code Future}, will yield
 * the given value as its result and provide for cancellation of
 * the underlying task
 * @since 1.6
 */
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

/**
 * Returns a {@code RunnableFuture} for the given callable task.
 *
 * @param callable the callable task being wrapped
 * @param <T> the type of the callable's result
 * @return a {@code RunnableFuture} which, when run, will call the
 * underlying callable and which, as a {@code Future}, will yield
 * the callable's result as its result and provide for
 * cancellation of the underlying task
 * @since 1.6
 */
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

newTaskFor方法中只是简单的将RunnableCallable作为构造参数传入FutureTask,然后将参数保存到成员变量中,即:

public class FutureTask<V> implements RunnableFuture<V> {

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters; 

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Callable}.
     *
     * @param  callable the callable task
     * @throws NullPointerException if the callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Runnable}, and arrange that {@code get} will return the
     * given result on successful completion.
     *
     * @param runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don't need a particular result, consider using
     * constructions of the form:
     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
     * @throws NullPointerException if the runnable is null
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
}

此时,FutureTask就将传入的RunnableCallable(提交的任务)存储到成员变量callable中(如果是Runnable则会通过Executors.callable进行转换),并且将任务执行状态state初始化为NEW

对于FutureTask中的4个成员变量作用如下所示:

成员变量 描述
callable 表示我们需要执行的任务。
outcome 表示最终返回的执行结果,或者执行时抛出的异常。
runner 表示当前执行callable的线程。
waiters 表示阻塞等待的线程(通过链式存储多个等待线程)。

另外,关于RunnableCallable的区别可以阅读:《Runnable与Callable的区别》

执行

在构造完FutureTask后,我们就可以它传入execute方法中执行任务了。而因为FutureTask继承自Runnable,所以在传入execute方法执行过程中会调用其中的Runnable#run方法,即:

public class FutureTask<V> implements RunnableFuture<V> {

    public void run() {
        /** 判断当前任务是否能执行,并且将运行线程设置为当前线程,如果失败则结束执行。 **/
        
        // 如果计算结果的状态不等于NEW,则直接返回
        if (state != NEW) {
            return;
        }
        // 如果将当前执行线程设置为运行线程失败,则直接返回
        if(!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) {
            return;
        }

        /** 执行传入的异步任务,并保存其执行结果。 **/
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                // 用于表示是否运行完成
                boolean ran;
                try {
                    // 执行任务
                    result = c.call();
                    // 运行完成则设置为true
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    // 运行失败则设置为false
                    ran = false;
                    // 保存异常信息
                    setException(ex);
                }
                // 如果运行完成,则保存结果
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            // 如果当前任务由于取消而被中断(只有取消操作才会使得任务执行状态变更为INTERRUPTING或INTERRUPTED)
            if (s >= INTERRUPTING)
                // 处理因取消而产生的中断
                handlePossibleCancellationInterrupt(s);
        }
    }

    /**
     * Sets the result of this future to the given value unless
     * this future has already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the {@link #run} method
     * upon successful completion of the computation.
     *
     * @param v the value
     */
    protected void set(V v) {
        // 将当前状态设置为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 将执行结果存储到成员变量outcome
            outcome = v;
            // 将当前状态设置为NORMAL
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            // 执行任务完成处理,即唤醒等待当前任务结果的线程
            finishCompletion();
        }
    }

    /**
     * Causes this future to report an {@link ExecutionException}
     * with the given throwable as its cause, unless this future has
     * already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the {@link #run} method
     * upon failure of the computation.
     *
     * @param t the cause of failure
     */
    protected void setException(Throwable t) {
        // 将当前状态设置为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 将异常结果存储到成员变量outcome
            outcome = t;
            // 将当前状态设置为EXCEPTIONAL
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            // 执行任务完成处理,即唤醒等待当前任务结果的线程
            finishCompletion();
        }
    }
}

FutureTask#run方法中,它会将传入的任务执行完成,并在完成后将其结果保存到成员变量中。具体步骤如下所示:

  1. 判断当前任务是否能被执行,如果可以则继续往下执行,否则直接返回。
    • 判断state状态是否为NEW,如果不是则直接返回
    • 通过CAS操作将运行线程runner设置为当前线程,如果更新失败则直接返回。
  2. 执行传入的异步任务,并保存其执行结果(通过调用成员变量callablecall方法)。
    • 如果执行成功,则将(正常)结果存储到成员变量outcome中,更新任务完成状态(NORMAL),通知等待结果的线程。
    • 如果执行失败,则将(异常)结果存储到成员变量outcome中,更新任务完成状态(EXCEPTIONAL),通知等待结果的线程。
  3. 处理执行过程中因为取消而产生的中断(因为只有取消操作才会使得任务执行状态变更为INTERRUPTINGINTERRUPTED)。

等到执行结果被计算完成后,它就可以通过get方法获取执行结果,或者通知因调用get方法而进入阻塞的线程(在任务完成前调用get方法会进入阻塞状态)。其中,在FutureTask中是通过finishCompletion实现对阻塞线程发出唤醒通知的,即:

/**
 * Removes and signals all waiting threads, invokes done(), and
 * nulls out callable.
 */
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        // 将等待队列设置为null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    // 唤醒线程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

finishCompletion方法会遍历成员变量waiters逐个唤醒阻塞等待中的线程,具体执行步骤如下所示:

  1. 逐个唤醒所有等待线程。
  2. 逐个移除所有等待线程。
  3. 执行done()方法(抽象方法)。
  4. 清空所要执行的任务(将成员变量callable设置为null)。

关于处理因取消而产生中断的handlePossibleCancellationInterrupt方法。

因为只有取消操作才会使得任务执行状态变更为INTERRUPTINGINTERRUPTED,所以handlePossibleCancellationInterrupt方法仅用于处理取消而产生的中断。

/**
 * Ensures that any interrupt from a possible cancel(true) is only
 * delivered to a task while in run or runAndReset.
 */
private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}

handlePossibleCancellationInterrupt方法只是简单地通过自旋等待INTERRUPTING状态转变为INTERRUPTED,并没有对中断做额外的处理,例如中断标识地擦除等。对此,在方法实现中也进行了相应的解释:不对中断标识进行擦除是因为它可以通过这种方式让执行的任务向调用者进行通信。但,这又与上文关于“FutureTask的更新”章节所描述的有所出入,最终在权衡之下要实现这样的效果可以通过在取消操作时通过参数控制其不发生中断,即cancel(false)

至此,对FutureTask任务的执行步骤分析完毕。

获取

而在任务执行完成后,我们可以调用FutureTaskget方法来获取最终执行结果。但是,如果在任务尚未完成或者正在执行时就调用了get方法获取结果,则会进入阻塞状态(此时会将阻塞线程加入到waiters队列中进行等待)。下面我们可以看到FutureTask#get方法的实现。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 表示还没计算完成
    if (s <= COMPLETING) {
        // 等待执行完成
        s = awaitDone(false, 0L);
    }
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null) {
        throw new NullPointerException();
    }
    int s = state;
    // 表示还没计算完成
    if (s <= COMPLETING) {
        // 等待执行完成
        s = awaitDone(true, unit.toNanos(timeout));
        if (s <= COMPLETING){
            throw new TimeoutException();
        }
    }
    return report(s);
}

get方法中,首先会通过state状态判断是否已经执行完成,如果没有则会执行awaitDone方法进入阻塞状态;如果已经执行完成则调用report方法返回最后的结果。

下面我们首先来看awaitDone方法是如何进行阻塞等待的。

/**
 * Awaits completion or aborts on interrupt or timeout.
 *
 * @param timed true if use timed waits
 * @param nanos time to wait, if timed
 * @return state upon completion
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {

        /** 判断当前节点是否有必要插入到等待队列中 **/

        // 如果当前线程发生中断,则从等待队列中移除,并抛出异常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;

        // 如果执行完成,则直接返回
        if (s > COMPLETING) {
            if (q != null)
                // 将当前节点thread变量情况,为了让后续执行移除操作removeWaiter做
                q.thread = null;
            return s;
        }

        // 如果任务处于COMPLETING状态(正在完成中),表示任务即将完成,所以此处不进入沉睡,而是简单出让一下CPU
        else if (s == COMPLETING) { // cannot time out yet
            Thread.yield();
        }

        /** 线程节点插入等待队列操作 **/

        // 如果当前线程节点为空,则创建等待节点(若下次任务还处于未完成节点才插入到等待队列中)
        else if (q == null) {
            q = new WaitNode();  // 创建等待节点
        }

        // 如果当前节点线程不为空(上一次循环中创建了等待节点),则通过头插法将节点插入到等待队列中
        else if (!queued) {
            q.next = waiters;
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next, q);  // 把等待节点通过头插法插入到等待队列
        }

        /** 线程暂停操作,区分为有超时和无超时 **/
        
        // 存在超时策略的阻塞等待
        else if (timed) {
            nanos = deadline - System.nanoTime();
            // 如果阻塞超时,则会调用removeWaiter方法移除节点
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }

        // 无超时策略的阻塞等待
        else {
            LockSupport.park(this);
        }
    }
}

awaitDone方法会在一个无限循环中执行如下3个步骤,即:

  1. 判断当前节点是否有必要插入到等待队列中。
    • 如果当前线程发生中断,则从等待队列中移除,并抛出异常。
    • 如果当前任务执行完成,则直接返回结果。
    • 如果当前任务处于COMPLETING状态(正在完成中),则调用Thread.yield()方法让出CPU,而不进入等待状态(任务即将完成,稍后进入下次循环中)。
  2. 将线程构造为节点并插入到等待队列中(头插法)。
  3. 执行线程暂停操作,其中分为有超时策略与无超时策略。

其中,对于等待队列中的移除操作则是交由removeWaiter方法来处理的,具体实现逻辑如下所示:

/**
 * Tries to unlink a timed-out or interrupted wait node to avoid
 * accumulating garbage.  Internal nodes are simply unspliced
 * without CAS since it is harmless if they are traversed anyway
 * by releasers.  To avoid effects of unsplicing from already
 * removed nodes, the list is retraversed in case of an apparent
 * race.  This is slow when there are a lot of nodes, but we don't
 * expect lists to be long enough to outweigh higher-overhead
 * schemes.
 */
private void removeWaiter(WaitNode node) {
    if (node != null) {
        // 通过把thread置为空,表示当前节点将要移除,下面会通过是否为空来判断是否移除
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;

                /** 如果thread!=null,则表示当前节点q无需被移除(节点完成后会将thread设置为null) **/
                 
                if (q.thread != null)
                    pred = q;
                 
                /** 如果thread==null,则表示当前节点q需要被移除 **/

                // 如果前驱节点不等于null,则可以直接通过前驱节点指向其后继节点的方式来移除当前节点
                else if (pred != null) {
                    pred.next = s;
                    // 此处表示可能存在并发情况将q的前驱节点也标记为需要被移除的节点,所以需要重新遍历
                    // 因为此for循环是无法倒回去操作的,所以将通过retry使得遍历从头开始
                    if (pred.thread == null) // check for race
                        continue retry;
                }

                // 如果前驱节点等于null,则表示当前节点为头节点,因此这里直接通过CAS将其后继节点置换为头节点(waiters引用)
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                    continue retry;
            }
            break;
        }
    }
}

removeWaiter方法中,它会从头节点一直往后遍历,并移除其中属性threadnull的节点。在实现上,它是通过将前驱节点指向后继节点来实现移除操作的(如存在前驱节点),具体逻辑在这里就不详细分析了,有兴趣的可以阅读上述源码,笔者在相应关键处都添加了注释说明。

在等到结果计算完成后,在get方法中就会调用report方法对结果进行输出,即:

/**
 * Returns result or throws exception for completed task.
 *
 * @param s completed state value
 */
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 如果正常执行完成,则返回结果
    if (s == NORMAL)
        return (V)x;
    // 如果任务被取消,则抛出异常
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

report方法就是直接获取成员变量outcome,并进行指定的类型转换,然后返回结果。只不过它会作以下判断:

  • 如果任务执行状态为NORMAL(正常执行完成),则直接返回outcome
  • 如果任务执行状态为CANCELLED(取消执行),则抛出异常CancellationException
  • 如果任务执行状态为EXCEPTIONAL(发生异常)或者其他中断状态,则直接抛出outcome所存储的异常对象。

至此,对FutureTask任务的获取步骤分析完毕。

取消

另外,对于执行中的任务我们也可以通过cancel方法执行取消操作。

public boolean cancel(boolean mayInterruptIfRunning) {
    // 只有在任务执行状态为NEW时才能被取消,即如果处于其他状态则返回false
    if (state != NEW) {        
        return false;
    }
    // 通过参数mayInterruptIfRunning决定是取消任务还是中断任务 
    int updateState = mayInterruptIfRunning ? INTERRUPTING : CANCELLED;
    // 将任务执行状态更新为指定的状态,如果更新失败则返回false
    if(!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, updateState)) {
        return false;
    }
    
    try {    // in case call to interrupt throws exception
        // 通过mayInterruptIfRunning参数来决定是否中断任务
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    // 发出中断
                    t.interrupt();
            } finally { // final state
                // 将任务执行状态更新为INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 执行任务完成处理,即唤醒等待当前任务结果的线程(取消后没有必要继续等待了)
        finishCompletion();
    }
    return true;
}

cancel方法中,我们可以通过mayInterruptIfRunning参数来决定是否中断正在执行的任务(处于NEW状态的任务),如果允许则会调用Thread#interrupt方法执行线程中断,然后将任务执行状态变更为INTERRUPTED,最后进行任务完成的处理(含唤醒等待中的线程);否则将会忽视运行中的任务(可能处于执行中的阶段),直接执行任务完成的处理(含唤醒等待中的线程)。

至此,对FutureTask的实现原理分析完毕。

总结

总的来说,FutureTask就是通过暂存数据的成员变量和追踪任务的状态机来实现Future的语义的,即通过把结果存储在成员变量outcome中,并通过判断状态state来有选择地进行返回。而对于Future,它也不只有FutureTask一个实现,但是它们的实现原理都是相似的,只要我们搞懂了其中一个(例如,FutureTask),其他的就能够轻易击破。

0

评论区