public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask
/*
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
public class FutureTask<V> implements RunnableFuture<V> {
public void run() {
// 只能一个线程进入
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
// 只能设置一次
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
// 状态<=COMPLETING,进入阻塞队列
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
// 进入等待队列
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
不使用lock的wait,因为只有一个线程设置结果,多线程获取结果,不存在竞争,所以只用把消费者线程park,并将线程挂在对队列上
FutureTask<Boolean> future = new FutureTask<>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return true;
}
});
//托管给线程池处理
Future<?> futureResult = Executors.newCachedThreadPool().submit(future);
//托管给单独线程处理
new Thread(future).start();
System.out.println(future.get());
关于cancel方法,这里要补充说几点: 首先有以下三种情况之一的,cancel操作一定是失败的:
其它情况下,cancel操作将返回true。值得注意的是,cancel操作返回true并不代表任务真的就是被取消了,这取决于发动cancel状态时,任务所处的状态:
mayInterruptIfRunning
参数了:
mayInterruptIfRunning
为true, 则当前在执行的任务会被中断mayInterruptIfRunning
为false, 则可以允许正在执行的任务继续运行,直到它执行完我们来看看FutureTask是怎么实现cancel方法的这几个规范的:
首先,对于“任务已经执行完成了或者任务已经被取消过了,则cancel操作一定是失败的(返回false)”这两条,是通过简单的判断state值是否为NEW
实现的,因为我们前面说过了,只要state不为NEW,说明任务已经执行完毕了。从代码中可以看出,只要state不为NEW,则直接返回false。
如果state还是NEW状态,我们再往下看:
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
这一段是根据mayInterruptIfRunning
的值将state的状态由NEW
设置成INTERRUPTING
或者CANCELLED
,当这一操作也成功之后,就可以执行后面的try语句了,但无论怎么,该方法最后都返回了true
。
我们再接着看try块干了点啥:
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
我们知道,runner
属性中存放的是当前正在执行任务的线程,因此,这个try块的目的就是中断当前正在执行任务的线程,最后将state的状态设为INTERRUPTED
,当然,中断操作完成后,还需要通过finishCompletion()
来唤醒所有在Treiber栈中等待的线程。
我们现在总结一下,cancel方法实际上完成以下两种状态转换之一:
NEW -> CANCELLED
(对应于mayInterruptIfRunning=false
)NEW -> INTERRUPTING -> INTERRUPTED
(对应于mayInterruptIfRunning=true
)对于第一条路径,虽说cancel方法最终返回了true,但它只是简单的把state状态设为CANCELLED,并不会中断线程的执行。但是这样带来的后果是,任务即使执行完毕了,也无法设置任务的执行结果,因为前面分析run方法的时候我们知道,设置任务结果有一个中间态,而这个中间态的设置,是以当前state状态为NEW为前提的。
对于第二条路径,则会中断执行任务的线程,我们在倒回上面的run方法看看:
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
虽然第二条路径中断了当前正在执行的线程,但是,响不响应这个中断是由执行任务的线程自己决定的,更具体的说,这取决于c.call()
方法内部是否对中断进行了响应,是否将中断异常抛出。
那call方法中是怎么处理中断的呢?从上面的代码中可以看出,catch语句处理了所有的Throwable
的异常,这自然也包括了中断异常。
然而,值得一提的是,即使这里进入了catch (Throwable ex){}
代码块,setException(ex)
的操作一定是失败的,因为在我们取消任务执行的线程中,我们已经先把state状态设为INTERRUPTING
了,而setException(ex)
的操作要求设置前线程的状态为NEW
。所以这里响应cancel方法所造成的中断最大的意义不是为了对中断进行处理,而是简单的停止任务线程的执行,节省CPU资源。
那读者可能会问了,既然这个setException(ex)
的操作一定是失败的,那放在这里有什么用呢?事实上,这个setException(ex)
是用来处理任务自己在正常执行过程中产生的异常的,在我们没有主动去cancel任务时,任务的state状态在执行过程中就会始终是NEW
,如果任务此时自己发生了异常,则这个异常就会被setException(ex)
方法成功的记录到outcome
中。
反正无论如何,run方法最终都会进入finally块,而这时候它会发现s >= INTERRUPTING
,如果检测发现s = INTERRUPTING
,说明cancel方法还没有执行到中断当前线程的地方,那就等待它将state状态设置成INTERRUPTED
。到这里,对cancel方法的分析就和上面对run方法的分析对接上了。
cancel方法到这里就分析完了,如果你一条条的去对照Future接口对于cancel方法的规范,它每一条都是实现了的,而它实现的核心机理,就是对state的当前状态的判断和设置。由此可见,state属性是贯穿整个FutureTask的最核心的属性。
https://segmentfault.com/a/1190000016572591