一文详解 FutureTask:从源码到深度思考的全方位解析

内容分享6小时前发布
0 0 0

在 Java 并发编程中,FutureTask 是一个非常重要的类,它实现了
Future
接口,提供了异步计算的能力。本文将从源码级别深度剖析 FutureTask 的设计原理、状态转换、核心方法实现,帮助你真正理解其工作原理并掌握最佳实践。


一、FutureTask 概述

FutureTask 是 Java 并发包
java.util.concurrent
中的一个类,它实现了
RunnableFuture
接口(
Runnable
+
Future
),可以作为任务提交给线程执行,并通过
Future
接口获取执行结果。

核心特点

异步计算:可以将耗时操作提交到线程池执行,主线程继续执行其他任务结果获取:通过
get()
方法阻塞等待结果,或通过
get(timeout)
带超时获取任务取消:通过
cancel()
方法尝试取消正在执行的任务状态管理:内部维护任务状态,确保结果只计算一次

基本使用示例


// 创建 Callable 任务
Callable<String> task = () -> {
    Thread.sleep(2000);
    return "Hello, FutureTask";
};

// 包装为 FutureTask
FutureTask<String> futureTask = new FutureTask<>(task);

// 提交到线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(futureTask);

// 获取结果(阻塞等待)
String result = futureTask.get(); // 会等待 2 秒后返回

二、类图与继承关系


public class FutureTask<V> implements RunnableFuture<V> {
    // 实现 RunnableFuture 接口,该接口继承了 Runnable 和 Future
    // 使得 FutureTask 既可作为 Runnable 提交执行,又可作为 Future 获取结果
}

类图结构


FutureTask
├── RunnableFuture (接口)
│   ├── Runnable
│   └── Future
└── Future<V>

关键接口说明


Runnable
:定义
run()
方法,用于执行任务
Future
:定义
get()

cancel()

isDone()
等方法,用于获取结果和管理任务状态
RunnableFuture
:组合了
Runnable

Future
,是 FutureTask 的直接父接口


三、关键属性解析


public class FutureTask<V> implements RunnableFuture<V> {
    // 任务状态(volatile 保证可见性)
    private volatile int state;
    private static final int NEW          = 0;           // 新建
    private static final int COMPLETING   = 1;           // 正在完成
    private static final int NORMAL       = 2;           // 正常完成
    private static final int EXCEPTIONAL  = 3;           // 异常完成
    private static final int CANCELLED    = 4;           // 已取消
    private static final int INTERRUPTING = 5;           // 正在中断
    private static final int INTERRUPTED  = 6;           // 已中断
    
    // 任务的 Callable 对象
    private Callable<V> callable;
    
    // 任务结果或异常(非 volatile,通过 state 保证可见性)
    private Object outcome;
    
    // 执行任务的线程(volatile 保证可见性)
    private volatile Thread runner;
    
    // 等待线程的链表结构(用于阻塞等待)
    private volatile WaitNode waiters;
}

关键属性详解


state
:任务状态,是 FutureTask 的核心状态机

仅在
set()

setException()

cancel()
方法中被修改状态转换路径:

NEW -> COMPLETING -> NORMAL
:正常完成
NEW -> COMPLETING -> EXCEPTIONAL
:异常完成
NEW -> CANCELLED
:被取消
NEW -> INTERRUPTING -> INTERRUPTED
:被中断


callable
:任务的 Callable 对象,执行完成后置为 null

任务执行完后,
callable
被置为 null,避免内存泄漏


outcome
:任务结果或异常

通过
state
保证可见性,避免直接读取导致的可见性问题任务正常完成时存储结果,异常完成时存储异常


runner
:执行任务的线程

通过 CAS 操作设置,确保只有一个线程能执行任务任务执行完成后置为 null


waiters
:等待线程的链表结构

用于管理调用
get()
方法等待结果的线程使用
WaitNode
类实现链表


四、状态机解析

状态转换图


NEW
│
├─→ COMPLETING → NORMAL (正常完成)
│
├─→ COMPLETING → EXCEPTIONAL (异常完成)
│
├─→ CANCELLED (被取消)
│
└─→ INTERRUPTING → INTERRUPTED (被中断)

状态转换逻辑

新建状态 (NEW):任务创建后初始状态正在完成状态 (COMPLETING):任务执行完成,正在设置结果或异常正常完成状态 (NORMAL):任务成功执行,结果已设置异常完成状态 (EXCEPTIONAL):任务执行中抛出异常,异常已设置已取消状态 (CANCELLED):任务被取消,未执行正在中断状态 (INTERRUPTING):任务正在被中断已中断状态 (INTERRUPTED):任务已被中断

状态转换关键点

状态转换是原子的:通过 CAS 操作保证状态转换的原子性状态转换不可逆:一旦进入终止状态(NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED),状态不会再改变状态转换是单向的:从 NEW 状态开始,只能通过指定路径转换到终止状态


五、核心方法源码分析

1. 构造方法


public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW; // 状态初始化为 NEW
}

关键点

检查
callable
是否为 null初始化
state

NEW
任务创建后,状态为
NEW

2.
run()
方法


public void run() {
    // 1. 检查状态,如果不是 NEW,说明任务已执行或已取消,直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    
    // 2. 执行任务
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 执行任务
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                // 任务执行中抛出异常
                result = null;
                ran = false;
                // 设置异常
                setException(ex);
            }
            // 任务执行成功,设置结果
            if (ran)
                set(result);
        }
    } finally {
        // 任务执行完成,重置 runner
        runner = null;
        // 状态变为 COMPLETING(已由 set() 设置)
        int s = state;
        // 如果是取消状态,尝试中断线程
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

关键点

CAS 操作:通过
UNSAFE.compareAndSwapObject
确保只有一个线程能执行任务任务执行:调用
callable.call()
执行任务结果处理:成功时调用
set()
,异常时调用
setException()
状态管理:任务执行完成后,重置
runner
,并处理可能的中断

3.
set()
方法


protected void set(V v) {
    if (state == NEW) {
        // 通过 CAS 设置状态为 COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            // 通过 CAS 设置状态为 NORMAL
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            // 唤醒等待线程
            finishCompletion();
        }
    }
}

关键点

状态转换:从
NEW

COMPLETING

NORMAL
原子操作:使用 CAS 确保状态转换的原子性结果存储:将结果存储在
outcome
唤醒等待线程:通过
finishCompletion()
唤醒所有等待的线程

4.
setException()
方法


protected void setException(Throwable t) {
    if (state == NEW) {
        // 通过 CAS 设置状态为 COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            // 通过 CAS 设置状态为 EXCEPTIONAL
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
            // 唤醒等待线程
            finishCompletion();
        }
    }
}

关键点

状态转换:从
NEW

COMPLETING

EXCEPTIONAL
异常存储:将异常存储在
outcome
唤醒等待线程:通过
finishCompletion()
唤醒所有等待的线程

5.
get()
方法


public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果状态不是 NEW,说明任务已完成或已取消
    if (s <= COMPLETING)
        // 如果状态是 NEW 或 COMPLETING,阻塞等待
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    // 与 get() 类似,但支持超时
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

关键点

状态检查:如果状态已不是 NEW,直接返回结果阻塞等待:如果状态是 NEW 或 COMPLETING,调用
awaitDone()
阻塞等待结果报告:通过
report()
方法处理结果或异常

6.
awaitDone()
方法


private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode node = null;
    // 自旋等待
    for (;;) {
        // 检查状态
        int s = state;
        if (s > COMPLETING)
            return s;
        // 如果是超时等待,检查是否超时
        if (timed && nanos <= 0)
            return state;
        // 如果线程被中断,抛出 InterruptedException
        if (Thread.interrupted()) {
            removeWaiter(node);
            throw new InterruptedException();
        }
        // 如果没有等待节点,创建一个新的等待节点
        if (node == null)
            node = new WaitNode();
        // 将等待节点加入等待队列
        else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, node.next, node))
            node = null;
        else {
            // 阻塞当前线程
            LockSupport.park(this);
            // 唤醒后检查状态
            if (timed && nanos <= 0)
                removeWaiter(node);
        }
    }
}

关键点

自旋等待:通过循环检查任务状态等待队列:使用
WaitNode
链表实现等待队列阻塞机制:使用
LockSupport.park()
阻塞线程中断处理:如果线程被中断,抛出
InterruptedException

7.
finishCompletion()
方法


private void finishCompletion() {
    // 从等待队列中移除所有节点
    for (WaitNode w = waiters; w != null; ) {
        WaitNode next = w.next;
        w.next = null;
        // 唤醒等待线程
        LockSupport.unpark(w.thread);
        w = next;
    }
    // 清空等待队列
    waiters = null;
    
    // 执行完成后的钩子方法
    done();
    
    // 重置 callable
    callable = null; // 释放引用,避免内存泄漏
}

关键点

唤醒等待线程:通过
LockSupport.unpark()
唤醒所有等待的线程清理等待队列:将等待队列置为空释放资源:将
callable
置为 null,避免内存泄漏执行钩子:调用
done()
钩子方法,供子类扩展


六、深度思考

1. 为什么 FutureTask 可以保证只执行一次任务?

答案:FutureTask 通过
runner
属性和 CAS 操作来保证任务只执行一次。


if (state != NEW ||
    !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
    return;


runner
初始为 null通过 CAS 操作将
runner
从 null 设置为当前线程如果 CAS 失败(
runner
不为 null),说明已有线程在执行任务,直接返回

思考:这种设计避免了任务重复执行,保证了任务执行的原子性和唯一性。

2. 为什么 FutureTask 使用
volatile
修饰
state
而不是
outcome

答案
state
是状态标志,需要保证可见性;
outcome
是存储结果或异常的变量,通过
state
保证可见性。


private volatile int state;
private Object outcome; // 非 volatile,通过 state 保证可见性

思考


state
是状态机的核心,需要保证可见性
outcome
通过
state
保证可见性,不需要额外的 volatile 修饰这种设计减少了不必要的内存屏障,提高了性能

3. 为什么 FutureTask 使用链表实现等待队列?

答案:FutureTask 使用链表实现等待队列,可以高效地添加和移除等待线程。


static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

思考

链表实现可以高效地添加和移除节点通过
waiters
指针指向链表头节点使用
LockSupport.park()

LockSupport.unpark()
实现线程阻塞和唤醒

4. 为什么 FutureTask 不直接使用
synchronized
而是使用 CAS?

答案:FutureTask 使用 CAS 而不是
synchronized
是为了提高性能和减少锁竞争。

思考


synchronized
会导致线程阻塞,而 CAS 是无锁操作在高并发场景下,CAS 的性能优于
synchronized
FutureTask 的状态转换是简单的原子操作,适合 CAS

5. 为什么 FutureTask 在
run()
方法中使用
finally
块重置
runner

答案:在
run()
方法中使用
finally
块重置
runner
是为了确保即使任务执行中抛出异常,
runner
也能被正确重置。


finally {
    runner = null;
    // ...
}

思考

重置
runner
确保后续任务可以被正确执行即使任务执行中抛出异常,
runner
也会被重置避免任务执行完成后
runner
仍指向已结束的线程


七、最佳实践

1. 任务提交与结果获取


// 创建 Callable 任务
Callable<String> task = () -> {
    Thread.sleep(2000);
    return "Task completed";
};

// 创建 FutureTask
FutureTask<String> futureTask = new FutureTask<>(task);

// 提交到线程池
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(futureTask);

// 获取结果
try {
    String result = futureTask.get(); // 阻塞等待
    System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

// 关闭线程池
executor.shutdown();

最佳实践

使用
ExecutorService
提交任务,避免直接创建线程使用
try-catch
处理
InterruptedException

ExecutionException
确保线程池正确关闭

2. 超时获取结果


try {
    String result = futureTask.get(1, TimeUnit.SECONDS); // 最多等待 1 秒
    System.out.println("Result: " + result);
} catch (TimeoutException e) {
    System.out.println("Task timed out");
    futureTask.cancel(true); // 取消任务
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

最佳实践

使用带超时的
get()
防止无限等待超时后尝试取消任务使用
cancel(true)
中断正在执行的任务

3. 任务取消


// 取消任务
boolean isCancelled = futureTask.cancel(true); // 中断正在执行的任务

// 检查任务是否被取消
if (isCancelled) {
    System.out.println("Task was cancelled");
}

最佳实践

使用
cancel(true)
中断正在执行的任务检查取消是否成功取消后不要继续使用 FutureTask

4. 避免内存泄漏


// 在任务完成后,确保释放资源
try {
    String result = futureTask.get();
    // 使用结果
} finally {
    // 清理资源
    futureTask = null;
}

最佳实践

在使用完 FutureTask 后,将其置为 null避免持有 FutureTask 的引用导致内存泄漏对于长时间运行的任务,考虑使用
WeakReference


八、常见问题与解答

Q1: FutureTask 的
get()
方法会阻塞当前线程吗?

A: 是的,
get()
方法会阻塞当前线程,直到任务完成或超时。

Q2: FutureTask 可以多次调用
get()
吗?

A: 可以,但每次调用
get()
都会返回相同的结果(任务已完成)。

Q3: 为什么 FutureTask 的
get()
方法会抛出
ExecutionException

A: 如果任务执行中抛出异常,
get()
方法会包装为
ExecutionException
抛出。

Q4: FutureTask 的
cancel()
方法会立即取消任务吗?

A: 不会立即取消,
cancel(true)
会尝试中断正在执行的任务,但任务可能不会立即停止。

Q5: FutureTask 与
CompletableFuture
有什么区别?

A:


FutureTask
是 JDK 1.5 引入的,基于
Future
接口
CompletableFuture
是 JDK 1.8 引入的,提供了更强大的异步编程能力
CompletableFuture
支持链式调用、组合、异常处理等


九、总结

FutureTask 是 Java 并发编程中非常重要的类,它实现了
RunnableFuture
接口,提供了异步计算的能力。通过深入源码分析,我们理解了其核心设计思想:

状态机管理:通过
state
字段管理任务状态,确保状态转换的原子性和正确性线程安全:通过 CAS 操作和
volatile
保证线程安全结果获取:通过
get()
方法阻塞等待结果,支持超时和取消资源管理:通过
finishCompletion()
清理资源,避免内存泄漏

在实际开发中,FutureTask 是实现异步计算的基础,适用于各种耗时操作的场景,如数据库查询、网络请求、文件处理等。

通过本文的深度解析,你已经掌握了 FutureTask 的核心原理和最佳实践,可以自信地在项目中使用它,构建高性能的并发应用。

💡 最后思考:FutureTask 的设计体现了 Java 并发编程的精髓——通过状态机和原子操作保证线程安全,通过阻塞和唤醒机制实现线程协作。理解这些设计思想,将帮助你更好地理解和使用其他并发类,如
ReentrantLock

CountDownLatch
等。


附录:FutureTask 源码关键部分(JDK 1.8)


public class FutureTask<V> implements RunnableFuture<V> {
    // 状态常量
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    
    // 任务的 Callable 对象
    private Callable<V> callable;
    
    // 任务结果或异常
    private Object outcome;
    
    // 执行任务的线程
    private volatile Thread runner;
    
    // 等待线程的链表结构
    private volatile WaitNode waiters;
    
    // Unsafe 实例
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    
    // 构造方法
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW; // ensure visibility of callable
    }
    
    // run() 方法
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    // set() 方法
    protected void set(V v) {
        if (state == NEW) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
                finishCompletion();
            }
        }
    }
    
    // setException() 方法
    protected void setException(Throwable t) {
        if (state == NEW) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = t;
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
                finishCompletion();
            }
        }
    }
    
    // get() 方法
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    
    // finishCompletion() 方法
    private void finishCompletion() {
        for (WaitNode w = waiters; w != null; ) {
            WaitNode next = w.next;
            w.next = null;
            LockSupport.unpark(w.thread);
            w = next;
        }
        waiters = null;
        done();
        callable = null; // 释放引用
    }
    
    // 其他方法...
}

附录:FutureTask 与 CompletableFuture 对比

特性 FutureTask CompletableFuture
JDK 版本 JDK 1.5 JDK 1.8
接口实现 实现 Future 实现 Future
链式调用 不支持 支持(thenApply, thenCompose)
异步回调 不支持 支持(thenAccept, thenRun)
异常处理 通过 get() 抛出 通过 exceptionally, handle
组合能力 有限 强大(allOf, anyOf)
使用场景 传统异步计算 现代异步编程

通过本文的学习,你已经深入理解了 FutureTask 的设计原理和实现细节,可以自信地在项目中使用它,构建高性能的并发应用。希望这篇文章对你有所帮助!

© 版权声明

相关文章

暂无评论

none
暂无评论...