概述
创建线程通过有两者方式:一种是继承Thread,一种是实现Runnable接口。但是着两种方式都存在着一个不足,即执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果。
自从Java1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
Callable接口
Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,这个方法叫做call()。该接口还是一个泛型接口,返回的结果类型就为传过来的类型。
public interface Callable<V> { |
Callable需要结合ExecutorService来使用,在ExecutorService接口中声明了若干个submit方法的重载版本。从下面的方法可以知道,Runnable任务也可以进行提交执行。
// task表示需要执行的任务 |
Future接口
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果或者被取消。
Future类位于java.util.concurrent包下,它是一个接口:
public interface Future<V> { |
在Future接口中声明了5个方法,下面依次解释每个方法的作用:
- cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true;
- isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true;
- isDone方法表示任务是否已经完成,若任务完成,则返回true;
- get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
- get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
也就是说Future提供了三种功能:1. 判断任务是否完成;2. 能够中断任务;3. 能够获取任务执行结果。
FutureTask类
上图中是FutureTack继承关系图,最上层的FuctionalInterface是一个注解,标识了拥有该注解的接口支持函数式编程。FutureTask类实现了RunnableFuture接口:
public class FutureTask<V> implements RunnableFuture<V> |
我们看一下RunnableFuture接口的实现:
public interface RunnableFuture<V> extends Runnable, Future<V> { |
可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
事实上,FutureTask是Future接口的一个唯一实现类。
下面对FutureTask的源码进行具体分析:
源码分析
-
属性
// 状态机:存在以下7中状态
private volatile int state;
// 任务新建
private static final int NEW = 0;
// 任务完成中
private static final int COMPLETING = 1;
// 任务正常完成
private static final int NORMAL = 2;
// 任务异常
private static final int EXCEPTIONAL = 3;
// 任务取消
private static final int CANCELLED = 4;
// 任务中断中
private static final int INTERRUPTING = 5;
// 任务已中断
private static final int INTERRUPTED = 6;
// 可能状态转换:
// NEW -> COMPLETING -> NORMAL
// NEW -> COMPLETING -> EXCEPTIONAL
// NEW -> CANCELLED
// NEW -> INTERRUPTING -> INTERRUPTED
// 支持结果返回的任务
private Callable<V> callable;
// 任务执行结果:包含正常和异常的结果,通过get方法获取
private Object outcome;
// 任务执行线程
private volatile Thread runner;
// 栈结构的等待队列,该节点是栈中的最顶层节点
private volatile WaitNode waiters;上述的WaitNode 结构定义如下:
// 使用一个简单的链表节点来记录等待结果的线程
// 链表的具体实现为 Treiber Stack
// 注: Treiber Stack 为一个无锁并发栈, 这意味着先进来的等待线程
// 会最后获得结果.
// Treiber Stack 实现思路为CAS+不断重试.
// 参考:https://www.cnblogs.com/micrari/p/7719408.html
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread();
}
} -
构造器
FutureTask提供了2个构造器:
// 直接传入callable任务
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
// 初始状态为NEW
this.state = NEW;
}
// 传入runnable任务及结果变量
public FutureTask(Runnable runnable, V result) {
// 将Runnable 任务转换为Callable任务
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}FutureTask提供了两个构造方法,支持runnable和callable两种任务,但其实最终都是转换为callable任务。
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
// 利用适配器模式进行转换
return new RunnableAdapter<T>(task, result);
}
// RunnableAdapter实现了Callable接口, 以实现适配器模式
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}runnable转为callable的方法,其实就是通过RunnableAdapter适配器,RunnableAdapter本身是实现了callable接口,然后在call方法中,实际执行的是runnable的run方法,另外将传入的结果参数原封不动的作为结果返回。
-
run方法
run方法是FutureTask任务实际执行体,它主要完成包装的callable的call方法执行,并将执行结果保存到outcome中,同时捕获了call方法执行出现的异常,并保存异常信息,而不是直接抛出。另外,run方法存在的另一个意义就是通过它对状态机进行了维护,比如NEW-COMPLETEING-NORMAL 或 NEW-COMPLETEING-EXCEPTIONAL,保证了任务的处理流程。run方法一开始通过CAS更新runner为当前线程,从而避免了多线程下run被执行多次的调用,若runner线程不为null,则CAS失败。public void run() {
// 状态机不为NEW表示执行完成或任务被取消了,直接返回
// 状态机为NEW,同时将runner设置为当前线程,保证同一时刻只有一个线程执行run方法,如果设置失败也直接返回
// 只有runnner 为null时才会进行设置
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 取出任务检测不为空 且 再次检查状态为NEW
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 任务执行抛出异常时,保存异常信息,而不直接抛出
setException(ex);
}
if (ran)
// 执行成功则保存结果
set(result);
}
} finally {
// 在此之前执行器必须不能null,
// 与上面的CAS配合以此避免多线程并发调用run()的情况
runner = null;
// 任务取消之后必须重新获取state的状态,防止错过处理中断请求
int s = state;
// 如果被置为了中断状态则进行中断的处理
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
} -
get方法
get方法有两种,分别是一直阻塞和超时阻塞获取;get方法本意是直接获取任务执行结果,但是任务没执行完成时,会将当前线程进行阻塞等待,直到任务执行完成时才会唤醒。// 阻塞获取
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 任务非最终完成状态前通过awaitDone方法进行阻塞等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
// 超时阻塞获取
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// 阻塞获取并达到阻塞超时时间后抛出超时异常
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
} -
awaitDone方法
调用awaitDone方法将请求线程进行阻塞。请求线程阻塞时,会创建一个waiter节点,然后加入到阻塞等待的栈中。当任务执行完成时或设置了阻塞超时时间的线程超时时,会将该线程从阻塞栈中移除,移除的方法很复杂,充分考虑了多线程并发的情况。// 线程阻塞等待方法
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
// 若timed 为true,则计算阻塞超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
// 默认不阻塞
boolean queued = false;
for (;;) {
// 如果阻塞线程被中断则将当前线程从阻塞队列中移除
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
// 任务已经完成时直接返回结果
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
//如果任务执行完成,但还差最后一步最终完成,则让出CPU给任务执行线程继续执行
Thread.yield();
else if (q == null)
// 新进来的线程添加等待节点
q = new WaitNode();
else if (!queued)
// 上一步节点创建完,还没将其添加到waiters栈中,
// 因此在下一个循环就会执行此处进行入栈操作,并将当前线程的等待节点置于栈顶
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
else if (timed) {
// 如果设置了阻塞超时时间,则进行检查是否达到阻塞超时时间,
// 达到了则删除当前线程的等待节点并退出循环返回,否则继续阻塞
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞线程
LockSupport.parkNanos(this, nanos);
}else
// 非超时阻塞
LockSupport.park(this);
}
} -
report方法
report方法会根据任务状返回结果。// 获取任务结果方法:正常执行则直接返回结果,否则抛出异常
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
} -
cancel方法
任务取消时会先检查是否允许取消,当任务已经完成或者正在完成(正常执行并继续处理结果 或 执行异常处理异常结果)时不允许取消。
cancel方法有个boolean入参,若为false,则只唤醒所有等待的线程,不中断正在执行的任务线程。若为true则直接中断任务执行线程,同时修改状态机为INTERRUPTED。public boolean cancel(boolean mayInterruptIfRunning) {
// 不允许取消的情况:状态机不是NEW 或CAS更新状态机失败
if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// 如果要求中断执行中的任务,则直接中断任务执行线程,并更新状态机为最终状态INTERRUPTED
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 移除和唤醒所有线程, 并调用done方法,并设置cableable为null
finishCompletion();
}
return true;
} -
其他方法
setException方法主要用于任务执行异常对处理,主要完成异常信息保存到outcom结果、状态机从NEW到EXCEPTIONAL的变化更新,以及唤醒阻塞在waiters队列中请求get的所有线程。// 任务执行异常处理
protected void setException(Throwable t) {
// 将状态机由NEW更新为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将异常信息保存到输出结果中
outcome = t;
// 更新状态机为处理异常的最终状态-EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 通用的完成操作,主要作用就是唤醒阻塞在waiters队列中请求get的线程
finishCompletion();
}
}set方法任务正常处理和异常处理流程基本一样,不一样的是状态的变化为NEW-COMPLETEING-NORMAL。
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}handlePossibleCancellationInterrupt()方法处理可能的取消中断,其实它的作用就是—当发起中断的线程A将状态机更新为INTERRUPTING,还没继续中断任务线程前,CPU切换到任务执行线程B了,此时线程B执行本方法让出CPU,让发起中断的线程A能继续处理中断B的操作。
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
}removeWaiter方法主要的功能就是移除等待节点,当线程被中断或超时时会调用该方法来移除等待节点。
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) {
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
red.next = s;
if (pred.thread == null)
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
continue retry;
}
break;
}
}
}finishCompletion方法会移除和唤醒所有等待线程,并调用done方法,并将callable设置为null。
// 移除和唤醒所有等待线程, 并调用done方法, 并将callable设置为null
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
使用实例
-
使用Callable + Future获取执行结果
public class FutureExample {
// 利用callable创建任务
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程正在进行计算");
Thread.sleep(3000);
int sum = 0;
for (int i = 0; i < 1000; i++) {
sum += i;
}
return sum;
}
}
public static void main(String[] args) {
// 创建线程池
ExecutorService executorService = Executors.newCachedThreadPool();
Task task = new Task();
// Future 对执行结果进行操作, submit方法底层也会将task包装成一个FutureTask对象
Future<Integer> future = executorService.submit(task);
executorService.shutdown();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程正在执行任务");
try {
System.out.println("Task 运行结果" + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("主线程执行任务完毕");
}
}
2. 使用Callable + FutureTask获取执行结果
public class FutureTaskExample {
// 利用callable创建任务
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程正在进行计算");
Thread.sleep(3000);
int sum = 0;
for (int i = 0; i < 1000; i++) {
sum += i;
}
return sum;
}
}
public static void main(String[] args) {
// 方式一
/*ExecutorService executorService = Executors.newCachedThreadPool();
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
executorService.submit(futureTask);
executorService.shutdown();*/
// 方式二
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
// 因为futureTask实现了RunnableFuture接口, 而
RunnableFuture接口继承了Runnable
Thread thread = new Thread(futureTask);
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程正在执行任务");
try {
System.out.println("Task 运行结果" + futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("主线程执行任务完毕");
}
}
总结
- FutureTask实现了Runnable接口,因此可以作为一个线程执行任务处理,比如在线程池中submit方法就是用FutureTask类包装了一个runnable或callable任务;
- FutureTask内部有个状态机,用于记录任务的处理状态,比如有三种最终状态:正常完成、执行异常、任务取消;
- 通过get方法阻塞获取任务执行结果,同时内部维护了一个阻塞等待栈,用于多线程并发调用get方法时,同时将这些线程阻塞并保存它们的阻塞信息,以便在任务执行完成后进行唤醒;
- 支持任务的取消操作,但是前提是任务还没完全执行成功的情况下才允许取消,取消分为两种:只唤醒阻塞等待结果的线程、唤醒线程同时强制中断任务执行线程。
参考资料
[1] https://www.cnblogs.com/dolphin0520/p/3949310.html
[2] https://juejin.im/post/6844904181824749582