Future
可用于表示异步执行的结果,那你知道Future
是怎么实现的吗?
Future
是什么
什么是Future
?官方文档是这样描述它的(JDK 1.8
)。
/**
* A {@code Future} represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation. The result can only be retrieved using method
* {@code get} when the computation has completed, blocking if
* necessary until it is ready. Cancellation is performed by the
* {@code cancel} method. Additional methods are provided to
* determine if the task completed normally or was cancelled. Once a
* computation has completed, the computation cannot be cancelled.
* If you would like to use a {@code Future} for the sake
* of cancellability but not provide a usable result, you can
* declare types of the form {@code Future<?>} and
* return {@code null} as a result of the underlying task.
*
* ...
*
*/
public interface Future<V> {
}
Future
可用于表示异步计算的结果,我们可以通过其中的一些方法来检查异步计算是否已经完成,如果已完成可调用get
方法获取结果(若尚未完成就调用了get
方法则会进入阻塞直至完成)。除此之外,我们还可以调用cancel
方法来取消任务的执行。下面我们来看看官方文档给出的Future
使用demo
。
public interface ArchiveSearcher { String search(String target); }
public class App {
ExecutorService executor = ...
ArchiveSearcher searcher = ...
void showSearch(final String target) throws InterruptedException {
Future<String> future = executor.submit(new Callable<String>() {
public String call() {
return searcher.search(target);
}
});
displayOtherThings(); // do other things while searching
try {
displayText(future.get()); // use future
} catch (ExecutionException ex) {
cleanup();
return;
}
}
}
在demo
中,我们可以看到它首先将查询数据这些耗时操作放到了异步线程池执行并返回Future
,接着执行其他的一些耗时的操作,最后在其他耗时操作执行完成后才通过Future
获取查询结果,以此来避免在进行IO
阻塞时CPU
处于空闲状态。同时,这也体现了Future
存在的意义。
另外,在Future
类中定义了一些方法,这里笔者总结了下来:
方法 | 说明 |
---|---|
cancel |
用于试图去取消任务的执行。如果任务已完成、已取消(或者其他不能被取消的原因)则调用失败;如果任务还没有开始,则调用成功,并且任务将永远不会执行;如果任务已经开始,则由参数mayInterruptIfRunning 决定正在执行任务的线程是否应该被中断。 |
isCancelled |
用于判断任务是否被取消。如果任务已取消,则返回true ,否则返回false 。 |
isDone |
用于判断任务是否已完成。如果任务已完成,则返回true ,否则返回false 。其中”已完成“包括正常终止、异常终止或被取消,这些情况都会返回true 。 |
get |
用于获取计算结果,在必要时需要等待运算完成。 |
get (timeout) |
用于获取计算结果,在必要时需要等待运算完成(有最大等待时间)。 |
到这里已经完成了对Future
接口的分析,但也只是Future
对其定义的解析和约束,具体还需要去往其实现子类中寻找答案。
Future
的实现
对于Future
接口,它最经典的实现即是FutureTask
,所以接下来我们将对FutureTask
进一步分析。
FutureTask
是什么
/**
* A cancellable asynchronous computation. This class provides a base
* implementation of {@link Future}, with methods to start and cancel
* a computation, query to see if the computation is complete, and
* retrieve the result of the computation. The result can only be
* retrieved when the computation has completed; the {@code get}
* methods will block if the computation has not yet completed. Once
* the computation has completed, the computation cannot be restarted
* or cancelled (unless the computation is invoked using
* {@link #runAndReset}).
*
* <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
* {@link Runnable} object. Because {@code FutureTask} implements
* {@code Runnable}, a {@code FutureTask} can be submitted to an
* {@link Executor} for execution.
*
* ...
*
*/
public class FutureTask<V> implements RunnableFuture<V> {...}
FutureTask
继承自RunnableFuture
接口,并基于它所实现的一个可取消的异步计算结果对象。在FutureTask
的使用上,它可以被包装为Callback
或Runnable
对象,也正因如此FutureTask
可以被提交到Executor
执行。
而RunnableFuture
接口则继承了Runnable
和Future
两个接口,具体如下所示。
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
FutureTask
的使用
对于FutureTask
的显式使用也许在日常开发中很少会接触到,但对于它的间接使用却是十分频繁。例如,平时我们通过线程池去执行submit
时就会用到FutureTask
。
下面我们再借用上文所提及的demo
来展示FutureTask
的用法。
public interface ArchiveSearcher { String search(String target); }
public class App {
ExecutorService executor = ...
ArchiveSearcher searcher = ...
void showSearch(final String target) throws InterruptedException {
Future<String> future = executor.submit(new Callable<String>() {
public String call() {
return searcher.search(target);
}
});
displayOtherThings(); // do other things while searching
try {
displayText(future.get()); // use future
} catch (ExecutionException ex) {
cleanup();
return;
}
}
}
即,将任务提交到submit
方法中执行并获取其返回的结果FutureTask
,然后在后续执行其他耗时操作后再通过get
方法获取其执行结果。
其实,
submit
操作可以等价于将FutureTask
作为参数传入execute
方法中执行,如下所示:FutureTask<String> future = new FutureTask<String>(new Callable<String>() { public String call() { return searcher.search(target); }}); executor.execute(future);
FutureTask
的更新
另外,在JDK 1.8
上可以看到这么一段注释:
/*
* Revision notes: This differs from previous versions of this
* class that relied on AbstractQueuedSynchronizer, mainly to
* avoid surprising users about retaining interrupt status during
* cancellation races. Sync control in the current design relies
* on a "state" field updated via CAS to track completion, along
* with a simple Treiber stack to hold waiting threads.
*
* Style note: As usual, we bypass overhead of using
* AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
*/
这里主要对FutureTask
的版本更新进行说明,简单来说就是当前版本的实现与之前版本(JDK 1.8
前)有所不同。即,之前版本是通过AQS
实现的,但通过AQS
实现在取消期间保留了中断状态会让用户很惊讶,所以在JDK 1.8
就改成现在这样的实现。
关于
JDK 1.8
之前的实现原理可以阅读下面两个资料链接:
FutureTask
实现原理
状态机
FutureTask
对其任务执行结果定义了一个状态机,分别用数字0
到6
来表示。同时通过这种方式也让我们可以很方便的利用范围比较来进行对状态的判断(JDK
一贯的代码风格)。下面我们来看看这部分代码:
public class FutureTask<V> implements RunnableFuture<V> {
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
}
在代码注释上已经对整个状态机作了大体的说明,也列举出所有状态的转移逻辑,这里就不再一一分析了。下面笔者将所有的状态转移描绘为一张图,如下所示:
+--------------+ +---------------+
+----->+ INTERRUPTING +--------->+ INTERRUPTED |
| +--------------+ +---------------+
|
|
| +---------------+
+---------+ | +--->+ NORMAL |
init +----->+ NEW +---+ | +---------------+
+----+----+ | +--------------+ |
| +----->+ COMPLETING +-----+
| +--------------+ |
| | +---------------+
| +--->+ EXCEPTIONAL |
v +---------------+
+------+-------+
| CANCELLED |
+--------------+
在
FutureTask
中所实现的任务进度跟踪也是通过此状态机来实现的,下面我们可以看到isCancelled
方法与isDone
方法:public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; }
从代码中可以看到,对于检测任务是否被取消的
isCancelled
方法和检测任务是否已完成的isDone
方法都是通过对状态机进行判断来实现的。
构造
在了解完FutureTask
的状态机后,下面我们通过FutureTask
的使用进一步分析其实现原理,即:
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
这里有3
个submit
方法,它们的实现原理都是先通过将参数Runnable
或Callable
转化为RunnableFuture
(FutureTask
的父类),然后再将RunnableFuture
传入execute
方法执行(RunnableFuture
继承自Runnable
),最后把RunnableFuture
返回给调用者。
submit
方法本质上还是将Runnable
传入execute
方法中执行,只不过它通过FutureTask
进行了一定程度的封装使其可以将执行结果(含异常)暂存起来。
那么在submit
方法中是如何将Runnable
或Callable
转化为RunnableFuture
的呢?这里我们一起来看看实现转换的newTaskFor
方法:
/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @param <T> the type of the given value
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/**
* Returns a {@code RunnableFuture} for the given callable task.
*
* @param callable the callable task being wrapped
* @param <T> the type of the callable's result
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
在newTaskFor
方法中只是简单的将Runnable
或Callable
作为构造参数传入FutureTask
,然后将参数保存到成员变量中,即:
public class FutureTask<V> implements RunnableFuture<V> {
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
}
此时,FutureTask
就将传入的Runnable
或Callable
(提交的任务)存储到成员变量callable
中(如果是Runnable
则会通过Executors.callable
进行转换),并且将任务执行状态state
初始化为NEW
。
对于
FutureTask
中的4
个成员变量作用如下所示:
成员变量 描述 callable
表示我们需要执行的任务。 outcome
表示最终返回的执行结果,或者执行时抛出的异常。 runner
表示当前执行 callable
的线程。waiters
表示阻塞等待的线程(通过链式存储多个等待线程)。 另外,关于
Runnable
与Callable
的区别可以阅读:《Runnable与Callable的区别》
执行
在构造完FutureTask
后,我们就可以它传入execute
方法中执行任务了。而因为FutureTask
继承自Runnable
,所以在传入execute
方法执行过程中会调用其中的Runnable#run
方法,即:
public class FutureTask<V> implements RunnableFuture<V> {
public void run() {
/** 判断当前任务是否能执行,并且将运行线程设置为当前线程,如果失败则结束执行。 **/
// 如果计算结果的状态不等于NEW,则直接返回
if (state != NEW) {
return;
}
// 如果将当前执行线程设置为运行线程失败,则直接返回
if(!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) {
return;
}
/** 执行传入的异步任务,并保存其执行结果。 **/
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
// 用于表示是否运行完成
boolean ran;
try {
// 执行任务
result = c.call();
// 运行完成则设置为true
ran = true;
} catch (Throwable ex) {
result = null;
// 运行失败则设置为false
ran = false;
// 保存异常信息
setException(ex);
}
// 如果运行完成,则保存结果
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 如果当前任务由于取消而被中断(只有取消操作才会使得任务执行状态变更为INTERRUPTING或INTERRUPTED)
if (s >= INTERRUPTING)
// 处理因取消而产生的中断
handlePossibleCancellationInterrupt(s);
}
}
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
// 将当前状态设置为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将执行结果存储到成员变量outcome
outcome = v;
// 将当前状态设置为NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 执行任务完成处理,即唤醒等待当前任务结果的线程
finishCompletion();
}
}
/**
* Causes this future to report an {@link ExecutionException}
* with the given throwable as its cause, unless this future has
* already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon failure of the computation.
*
* @param t the cause of failure
*/
protected void setException(Throwable t) {
// 将当前状态设置为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将异常结果存储到成员变量outcome
outcome = t;
// 将当前状态设置为EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 执行任务完成处理,即唤醒等待当前任务结果的线程
finishCompletion();
}
}
}
在FutureTask#run
方法中,它会将传入的任务执行完成,并在完成后将其结果保存到成员变量中。具体步骤如下所示:
- 判断当前任务是否能被执行,如果可以则继续往下执行,否则直接返回。
- 判断
state
状态是否为NEW
,如果不是则直接返回 - 通过
CAS
操作将运行线程runner
设置为当前线程,如果更新失败则直接返回。
- 判断
- 执行传入的异步任务,并保存其执行结果(通过调用成员变量
callable
的call
方法)。- 如果执行成功,则将(正常)结果存储到成员变量
outcome
中,更新任务完成状态(NORMAL
),通知等待结果的线程。 - 如果执行失败,则将(异常)结果存储到成员变量
outcome
中,更新任务完成状态(EXCEPTIONAL
),通知等待结果的线程。
- 如果执行成功,则将(正常)结果存储到成员变量
- 处理执行过程中因为取消而产生的中断(因为只有取消操作才会使得任务执行状态变更为
INTERRUPTING
或INTERRUPTED
)。
等到执行结果被计算完成后,它就可以通过get
方法获取执行结果,或者通知因调用get
方法而进入阻塞的线程(在任务完成前调用get
方法会进入阻塞状态)。其中,在FutureTask
中是通过finishCompletion
实现对阻塞线程发出唤醒通知的,即:
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 将等待队列设置为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
finishCompletion
方法会遍历成员变量waiters
逐个唤醒阻塞等待中的线程,具体执行步骤如下所示:
- 逐个唤醒所有等待线程。
- 逐个移除所有等待线程。
- 执行
done()
方法(抽象方法)。 - 清空所要执行的任务(将成员变量
callable
设置为null
)。
关于处理因取消而产生中断的
handlePossibleCancellationInterrupt
方法。因为只有取消操作才会使得任务执行状态变更为
INTERRUPTING
或INTERRUPTED
,所以handlePossibleCancellationInterrupt
方法仅用于处理取消而产生的中断。/** * Ensures that any interrupt from a possible cancel(true) is only * delivered to a task while in run or runAndReset. */ private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
handlePossibleCancellationInterrupt
方法只是简单地通过自旋等待INTERRUPTING
状态转变为INTERRUPTED
,并没有对中断做额外的处理,例如中断标识地擦除等。对此,在方法实现中也进行了相应的解释:不对中断标识进行擦除是因为它可以通过这种方式让执行的任务向调用者进行通信。但,这又与上文关于“FutureTask
的更新”章节所描述的有所出入,最终在权衡之下要实现这样的效果可以通过在取消操作时通过参数控制其不发生中断,即cancel(false)
。
至此,对FutureTask
任务的执行步骤分析完毕。
获取
而在任务执行完成后,我们可以调用FutureTask
的get
方法来获取最终执行结果。但是,如果在任务尚未完成或者正在执行时就调用了get
方法获取结果,则会进入阻塞状态(此时会将阻塞线程加入到waiters
队列中进行等待)。下面我们可以看到FutureTask#get
方法的实现。
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 表示还没计算完成
if (s <= COMPLETING) {
// 等待执行完成
s = awaitDone(false, 0L);
}
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null) {
throw new NullPointerException();
}
int s = state;
// 表示还没计算完成
if (s <= COMPLETING) {
// 等待执行完成
s = awaitDone(true, unit.toNanos(timeout));
if (s <= COMPLETING){
throw new TimeoutException();
}
}
return report(s);
}
在get
方法中,首先会通过state
状态判断是否已经执行完成,如果没有则会执行awaitDone
方法进入阻塞状态;如果已经执行完成则调用report
方法返回最后的结果。
下面我们首先来看awaitDone
方法是如何进行阻塞等待的。
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
/** 判断当前节点是否有必要插入到等待队列中 **/
// 如果当前线程发生中断,则从等待队列中移除,并抛出异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 如果执行完成,则直接返回
if (s > COMPLETING) {
if (q != null)
// 将当前节点thread变量情况,为了让后续执行移除操作removeWaiter做
q.thread = null;
return s;
}
// 如果任务处于COMPLETING状态(正在完成中),表示任务即将完成,所以此处不进入沉睡,而是简单出让一下CPU
else if (s == COMPLETING) { // cannot time out yet
Thread.yield();
}
/** 线程节点插入等待队列操作 **/
// 如果当前线程节点为空,则创建等待节点(若下次任务还处于未完成节点才插入到等待队列中)
else if (q == null) {
q = new WaitNode(); // 创建等待节点
}
// 如果当前节点线程不为空(上一次循环中创建了等待节点),则通过头插法将节点插入到等待队列中
else if (!queued) {
q.next = waiters;
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next, q); // 把等待节点通过头插法插入到等待队列
}
/** 线程暂停操作,区分为有超时和无超时 **/
// 存在超时策略的阻塞等待
else if (timed) {
nanos = deadline - System.nanoTime();
// 如果阻塞超时,则会调用removeWaiter方法移除节点
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
// 无超时策略的阻塞等待
else {
LockSupport.park(this);
}
}
}
awaitDone
方法会在一个无限循环中执行如下3
个步骤,即:
- 判断当前节点是否有必要插入到等待队列中。
- 如果当前线程发生中断,则从等待队列中移除,并抛出异常。
- 如果当前任务执行完成,则直接返回结果。
- 如果当前任务处于
COMPLETING
状态(正在完成中),则调用Thread.yield()
方法让出CPU
,而不进入等待状态(任务即将完成,稍后进入下次循环中)。
- 将线程构造为节点并插入到等待队列中(头插法)。
- 执行线程暂停操作,其中分为有超时策略与无超时策略。
其中,对于等待队列中的移除操作则是交由
removeWaiter
方法来处理的,具体实现逻辑如下所示:/** * Tries to unlink a timed-out or interrupted wait node to avoid * accumulating garbage. Internal nodes are simply unspliced * without CAS since it is harmless if they are traversed anyway * by releasers. To avoid effects of unsplicing from already * removed nodes, the list is retraversed in case of an apparent * race. This is slow when there are a lot of nodes, but we don't * expect lists to be long enough to outweigh higher-overhead * schemes. */ private void removeWaiter(WaitNode node) { if (node != null) { // 通过把thread置为空,表示当前节点将要移除,下面会通过是否为空来判断是否移除 node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; /** 如果thread!=null,则表示当前节点q无需被移除(节点完成后会将thread设置为null) **/ if (q.thread != null) pred = q; /** 如果thread==null,则表示当前节点q需要被移除 **/ // 如果前驱节点不等于null,则可以直接通过前驱节点指向其后继节点的方式来移除当前节点 else if (pred != null) { pred.next = s; // 此处表示可能存在并发情况将q的前驱节点也标记为需要被移除的节点,所以需要重新遍历 // 因为此for循环是无法倒回去操作的,所以将通过retry使得遍历从头开始 if (pred.thread == null) // check for race continue retry; } // 如果前驱节点等于null,则表示当前节点为头节点,因此这里直接通过CAS将其后继节点置换为头节点(waiters引用) else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
在
removeWaiter
方法中,它会从头节点一直往后遍历,并移除其中属性thread
为null
的节点。在实现上,它是通过将前驱节点指向后继节点来实现移除操作的(如存在前驱节点),具体逻辑在这里就不详细分析了,有兴趣的可以阅读上述源码,笔者在相应关键处都添加了注释说明。
在等到结果计算完成后,在get
方法中就会调用report
方法对结果进行输出,即:
/**
* Returns result or throws exception for completed task.
*
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
// 如果正常执行完成,则返回结果
if (s == NORMAL)
return (V)x;
// 如果任务被取消,则抛出异常
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
report
方法就是直接获取成员变量outcome
,并进行指定的类型转换,然后返回结果。只不过它会作以下判断:
- 如果任务执行状态为
NORMAL
(正常执行完成),则直接返回outcome
。 - 如果任务执行状态为
CANCELLED
(取消执行),则抛出异常CancellationException
。 - 如果任务执行状态为
EXCEPTIONAL
(发生异常)或者其他中断状态,则直接抛出outcome
所存储的异常对象。
至此,对FutureTask
任务的获取步骤分析完毕。
取消
另外,对于执行中的任务我们也可以通过cancel
方法执行取消操作。
public boolean cancel(boolean mayInterruptIfRunning) {
// 只有在任务执行状态为NEW时才能被取消,即如果处于其他状态则返回false
if (state != NEW) {
return false;
}
// 通过参数mayInterruptIfRunning决定是取消任务还是中断任务
int updateState = mayInterruptIfRunning ? INTERRUPTING : CANCELLED;
// 将任务执行状态更新为指定的状态,如果更新失败则返回false
if(!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, updateState)) {
return false;
}
try { // in case call to interrupt throws exception
// 通过mayInterruptIfRunning参数来决定是否中断任务
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
// 发出中断
t.interrupt();
} finally { // final state
// 将任务执行状态更新为INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 执行任务完成处理,即唤醒等待当前任务结果的线程(取消后没有必要继续等待了)
finishCompletion();
}
return true;
}
在cancel
方法中,我们可以通过mayInterruptIfRunning
参数来决定是否中断正在执行的任务(处于NEW
状态的任务),如果允许则会调用Thread#interrupt
方法执行线程中断,然后将任务执行状态变更为INTERRUPTED
,最后进行任务完成的处理(含唤醒等待中的线程);否则将会忽视运行中的任务(可能处于执行中的阶段),直接执行任务完成的处理(含唤醒等待中的线程)。
至此,对FutureTask
的实现原理分析完毕。
总结
总的来说,FutureTask
就是通过暂存数据的成员变量和追踪任务的状态机来实现Future
的语义的,即通过把结果存储在成员变量outcome
中,并通过判断状态state
来有选择地进行返回。而对于Future
,它也不只有FutureTask
一个实现,但是它们的实现原理都是相似的,只要我们搞懂了其中一个(例如,FutureTask
),其他的就能够轻易击破。
评论区