J.U.C之Callable, Future, FutureTask

概述

创建线程通过有两者方式:一种是继承Thread,一种是实现Runnable接口。但是着两种方式都存在着一个不足,即执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果。
自从Java1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Callable接口

Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,这个方法叫做call()。该接口还是一个泛型接口,返回的结果类型就为传过来的类型。

public interface Callable<V> {
 
// 返回计算结果或者抛出一个错误
V call() throws Exception;
}

Callable需要结合ExecutorService来使用,在ExecutorService接口中声明了若干个submit方法的重载版本。从下面的方法可以知道,Runnable任务也可以进行提交执行。

// task表示需要执行的任务
// <T> 返回结果的类型
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

Future接口

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果或者被取消。
Future类位于java.util.concurrent包下,它是一个接口:

public interface Future<V> {
// 取消任务,参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否取消
boolean isCancelled();
// 任务是否完成
boolean isDone();
// 这个方法会产生阻塞,会一直等到任务执行完毕才返回
V get() throws InterruptedException, ExecutionException;
// 获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException

在Future接口中声明了5个方法,下面依次解释每个方法的作用:

  1. cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true;
  2. isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true;
  3. isDone方法表示任务是否已经完成,若任务完成,则返回true;
  4. get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
  5. get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
    也就是说Future提供了三种功能:1. 判断任务是否完成;2. 能够中断任务;3. 能够获取任务执行结果。

FutureTask类

上图中是FutureTack继承关系图,最上层的FuctionalInterface是一个注解,标识了拥有该注解的接口支持函数式编程。FutureTask类实现了RunnableFuture接口:

public class FutureTask<V> implements RunnableFuture<V>

我们看一下RunnableFuture接口的实现:

public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
事实上,FutureTask是Future接口的一个唯一实现类。

下面对FutureTask的源码进行具体分析:

源码分析

  1. 属性

    // 状态机:存在以下7中状态
    private volatile int state;
    // 任务新建
    private static final int NEW = 0;
    // 任务完成中
    private static final int COMPLETING = 1;
    // 任务正常完成
    private static final int NORMAL = 2;
    // 任务异常
    private static final int EXCEPTIONAL = 3;
    // 任务取消
    private static final int CANCELLED = 4;
    // 任务中断中
    private static final int INTERRUPTING = 5;
    // 任务已中断
    private static final int INTERRUPTED = 6;

    // 可能状态转换:
    // NEW -> COMPLETING -> NORMAL
    // NEW -> COMPLETING -> EXCEPTIONAL
    // NEW -> CANCELLED
    // NEW -> INTERRUPTING -> INTERRUPTED

    // 支持结果返回的任务
    private Callable<V> callable;

    // 任务执行结果:包含正常和异常的结果,通过get方法获取
    private Object outcome;

    // 任务执行线程
    private volatile Thread runner;

    // 栈结构的等待队列,该节点是栈中的最顶层节点
    private volatile WaitNode waiters;

    上述的WaitNode 结构定义如下:

    // 使用一个简单的链表节点来记录等待结果的线程 
    // 链表的具体实现为 Treiber Stack
    // 注: Treiber Stack 为一个无锁并发栈, 这意味着先进来的等待线程
    // 会最后获得结果.
    // Treiber Stack 实现思路为CAS+不断重试.
    // 参考:https://www.cnblogs.com/micrari/p/7719408.html
    static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread();
    }
    }
  2. 构造器

    FutureTask提供了2个构造器:

    // 直接传入callable任务
    public FutureTask(Callable<V> callable) {
    if (callable == null)
    throw new NullPointerException();
    this.callable = callable;
    // 初始状态为NEW
    this.state = NEW;
    }

    // 传入runnable任务及结果变量
    public FutureTask(Runnable runnable, V result) {
    // 将Runnable 任务转换为Callable任务
    this.callable = Executors.callable(runnable, result);
    this.state = NEW; // ensure visibility of callable
    }

    FutureTask提供了两个构造方法,支持runnable和callable两种任务,但其实最终都是转换为callable任务。

    public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
    throw new NullPointerException();
    // 利用适配器模式进行转换
    return new RunnableAdapter<T>(task, result);
    }

    // RunnableAdapter实现了Callable接口, 以实现适配器模式
    static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
    this.task = task;
    this.result = result;
    }
    public T call() {
    task.run();
    return result;
    }
    }

    runnable转为callable的方法,其实就是通过RunnableAdapter适配器,RunnableAdapter本身是实现了callable接口,然后在call方法中,实际执行的是runnable的run方法,另外将传入的结果参数原封不动的作为结果返回。

  3. run方法
    run方法是FutureTask任务实际执行体,它主要完成包装的callable的call方法执行,并将执行结果保存到outcome中,同时捕获了call方法执行出现的异常,并保存异常信息,而不是直接抛出。另外,run方法存在的另一个意义就是通过它对状态机进行了维护,比如NEW-COMPLETEING-NORMAL 或 NEW-COMPLETEING-EXCEPTIONAL,保证了任务的处理流程。run方法一开始通过CAS更新runner为当前线程,从而避免了多线程下run被执行多次的调用,若runner线程不为null,则CAS失败。

    public void run() {
    // 状态机不为NEW表示执行完成或任务被取消了,直接返回
    // 状态机为NEW,同时将runner设置为当前线程,保证同一时刻只有一个线程执行run方法,如果设置失败也直接返回
    // 只有runnner 为null时才会进行设置
    if (state != NEW ||
    !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
    return;
    try {
    Callable<V> c = callable;
    // 取出任务检测不为空 且 再次检查状态为NEW
    if (c != null && state == NEW) {
    V result;
    boolean ran;
    try {
    // 执行任务
    result = c.call();
    ran = true;
    } catch (Throwable ex) {
    result = null;
    ran = false;
    // 任务执行抛出异常时,保存异常信息,而不直接抛出
    setException(ex);
    }
    if (ran)
    // 执行成功则保存结果
    set(result);
    }
    } finally {
    // 在此之前执行器必须不能null,
    // 与上面的CAS配合以此避免多线程并发调用run()的情况
    runner = null;
    // 任务取消之后必须重新获取state的状态,防止错过处理中断请求
    int s = state;
    // 如果被置为了中断状态则进行中断的处理
    if (s >= INTERRUPTING)
    handlePossibleCancellationInterrupt(s);
    }
    }
  4. get方法
    get方法有两种,分别是一直阻塞和超时阻塞获取;get方法本意是直接获取任务执行结果,但是任务没执行完成时,会将当前线程进行阻塞等待,直到任务执行完成时才会唤醒。

    // 阻塞获取
    public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 任务非最终完成状态前通过awaitDone方法进行阻塞等待
    if (s <= COMPLETING)
    s = awaitDone(false, 0L);
    return report(s);
    }

    // 超时阻塞获取
    public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
    throw new NullPointerException();
    int s = state;
    // 阻塞获取并达到阻塞超时时间后抛出超时异常
    if (s <= COMPLETING &&
    (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
    throw new TimeoutException();
    return report(s);
    }
  5. awaitDone方法
    调用awaitDone方法将请求线程进行阻塞。请求线程阻塞时,会创建一个waiter节点,然后加入到阻塞等待的栈中。当任务执行完成时或设置了阻塞超时时间的线程超时时,会将该线程从阻塞栈中移除,移除的方法很复杂,充分考虑了多线程并发的情况。

    // 线程阻塞等待方法
    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    // 若timed 为true,则计算阻塞超时时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    // 默认不阻塞
    boolean queued = false;
    for (;;) {
    // 如果阻塞线程被中断则将当前线程从阻塞队列中移除
    if (Thread.interrupted()) {
    removeWaiter(q);
    throw new InterruptedException();
    }

    int s = state;
    if (s > COMPLETING) {
    // 任务已经完成时直接返回结果
    if (q != null)
    q.thread = null;
    return s;
    }
    else if (s == COMPLETING)
    //如果任务执行完成,但还差最后一步最终完成,则让出CPU给任务执行线程继续执行
    Thread.yield();
    else if (q == null)
    // 新进来的线程添加等待节点
    q = new WaitNode();
    else if (!queued)
    // 上一步节点创建完,还没将其添加到waiters栈中,
    // 因此在下一个循环就会执行此处进行入栈操作,并将当前线程的等待节点置于栈顶
    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
    else if (timed) {
    // 如果设置了阻塞超时时间,则进行检查是否达到阻塞超时时间,
    // 达到了则删除当前线程的等待节点并退出循环返回,否则继续阻塞
    nanos = deadline - System.nanoTime();
    if (nanos <= 0L) {
    removeWaiter(q);
    return state;
    }
    // 阻塞线程
    LockSupport.parkNanos(this, nanos);
    }else
    // 非超时阻塞
    LockSupport.park(this);
    }
    }
  6. report方法
    report方法会根据任务状返回结果。

    // 获取任务结果方法:正常执行则直接返回结果,否则抛出异常
    private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
    return (V)x;
    if (s >= CANCELLED)
    throw new CancellationException();
    throw new ExecutionException((Throwable)x);
    }
  7. cancel方法
    任务取消时会先检查是否允许取消,当任务已经完成或者正在完成(正常执行并继续处理结果 或 执行异常处理异常结果)时不允许取消。
    cancel方法有个boolean入参,若为false,则只唤醒所有等待的线程,不中断正在执行的任务线程。若为true则直接中断任务执行线程,同时修改状态机为INTERRUPTED。

    public boolean cancel(boolean mayInterruptIfRunning) {
    // 不允许取消的情况:状态机不是NEW 或CAS更新状态机失败
    if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    return false;
    try {
    // 如果要求中断执行中的任务,则直接中断任务执行线程,并更新状态机为最终状态INTERRUPTED
    if (mayInterruptIfRunning) {
    try {
    Thread t = runner;
    if (t != null)
    t.interrupt();
    } finally {
    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
    }
    }
    } finally {
    // 移除和唤醒所有线程, 并调用done方法,并设置cableable为null
    finishCompletion();
    }
    return true;
    }
  8. 其他方法
    setException方法主要用于任务执行异常对处理,主要完成异常信息保存到outcom结果、状态机从NEW到EXCEPTIONAL的变化更新,以及唤醒阻塞在waiters队列中请求get的所有线程。

    // 任务执行异常处理
    protected void setException(Throwable t) {
    // 将状态机由NEW更新为COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    // 将异常信息保存到输出结果中
    outcome = t;
    // 更新状态机为处理异常的最终状态-EXCEPTIONAL
    UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
    // 通用的完成操作,主要作用就是唤醒阻塞在waiters队列中请求get的线程
    finishCompletion();
    }
    }

    set方法任务正常处理和异常处理流程基本一样,不一样的是状态的变化为NEW-COMPLETEING-NORMAL。

    protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = v;
    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    finishCompletion();
    }
    }

    handlePossibleCancellationInterrupt()方法处理可能的取消中断,其实它的作用就是—当发起中断的线程A将状态机更新为INTERRUPTING,还没继续中断任务线程前,CPU切换到任务执行线程B了,此时线程B执行本方法让出CPU,让发起中断的线程A能继续处理中断B的操作。

    private void handlePossibleCancellationInterrupt(int s) {
    if (s == INTERRUPTING)
    while (state == INTERRUPTING)
    Thread.yield();
    }

    removeWaiter方法主要的功能就是移除等待节点,当线程被中断或超时时会调用该方法来移除等待节点。

    private void removeWaiter(WaitNode node) {
    if (node != null) {
    node.thread = null;
    retry:
    for (;;) {
    for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
    s = q.next;
    if (q.thread != null)
    pred = q;
    else if (pred != null) {
    red.next = s;
    if (pred.thread == null)
    continue retry;
    }
    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
    continue retry;
    }
    break;
    }
    }
    }

    finishCompletion方法会移除和唤醒所有等待线程,并调用done方法,并将callable设置为null。

    // 移除和唤醒所有等待线程, 并调用done方法, 并将callable设置为null
    private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
    if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
    for (;;) {
    Thread t = q.thread;
    if (t != null) {
    q.thread = null;
    LockSupport.unpark(t);
    }
    WaitNode next = q.next;
    if (next == null)
    break;
    q.next = null; // unlink to help gc
    q = next;
    }
    break;
    }
    }

    done();
    callable = null; // to reduce footprint
    }

使用实例

  1. 使用Callable + Future获取执行结果

    public class FutureExample {
    // 利用callable创建任务
    static class Task implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
    System.out.println("子线程正在进行计算");
    Thread.sleep(3000);
    int sum = 0;
    for (int i = 0; i < 1000; i++) {
    sum += i;
    }
    return sum;
    }
    }

    public static void main(String[] args) {
    // 创建线程池
    ExecutorService executorService = Executors.newCachedThreadPool();
    Task task = new Task();
    // Future 对执行结果进行操作, submit方法底层也会将task包装成一个FutureTask对象
    Future<Integer> future = executorService.submit(task);
    executorService.shutdown();

    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    System.out.println("主线程正在执行任务");

    try {
    System.out.println("Task 运行结果" + future.get());
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }

    System.out.println("主线程执行任务完毕");

    }
    }

2. 使用Callable + FutureTask获取执行结果

public class FutureTaskExample {

// 利用callable创建任务
static class Task implements Callable<Integer> {

@Override
public Integer call() throws Exception {
System.out.println("子线程正在进行计算");
Thread.sleep(3000);
int sum = 0;
for (int i = 0; i < 1000; i++) {
sum += i;
}
return sum;
}
}

public static void main(String[] args) {
// 方式一
/*ExecutorService executorService = Executors.newCachedThreadPool();
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
executorService.submit(futureTask);
executorService.shutdown();*/

// 方式二
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
// 因为futureTask实现了RunnableFuture接口, 而
RunnableFuture接口继承了Runnable
Thread thread = new Thread(futureTask);
thread.start();

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("主线程正在执行任务");

try {
System.out.println("Task 运行结果" + futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

System.out.println("主线程执行任务完毕");

}
}

总结

  1. FutureTask实现了Runnable接口,因此可以作为一个线程执行任务处理,比如在线程池中submit方法就是用FutureTask类包装了一个runnable或callable任务;
  2. FutureTask内部有个状态机,用于记录任务的处理状态,比如有三种最终状态:正常完成、执行异常、任务取消;
  3. 通过get方法阻塞获取任务执行结果,同时内部维护了一个阻塞等待栈,用于多线程并发调用get方法时,同时将这些线程阻塞并保存它们的阻塞信息,以便在任务执行完成后进行唤醒;
  4. 支持任务的取消操作,但是前提是任务还没完全执行成功的情况下才允许取消,取消分为两种:只唤醒阻塞等待结果的线程、唤醒线程同时强制中断任务执行线程。

参考资料

[1] https://www.cnblogs.com/dolphin0520/p/3949310.html
[2] https://juejin.im/post/6844904181824749582

Author: HB
Link: http://www.huangbin.fun/J-U-C之Callable-Future-FutureTask.html
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.