在 Java 并发编程中,FutureTask 是一个非常重要的类,它实现了 接口,提供了异步计算的能力。本文将从源码级别深度剖析 FutureTask 的设计原理、状态转换、核心方法实现,帮助你真正理解其工作原理并掌握最佳实践。
Future
一、FutureTask 概述
FutureTask 是 Java 并发包 中的一个类,它实现了
java.util.concurrent 接口(
RunnableFuture +
Runnable),可以作为任务提交给线程执行,并通过
Future 接口获取执行结果。
Future
核心特点
异步计算:可以将耗时操作提交到线程池执行,主线程继续执行其他任务结果获取:通过 方法阻塞等待结果,或通过
get() 带超时获取任务取消:通过
get(timeout) 方法尝试取消正在执行的任务状态管理:内部维护任务状态,确保结果只计算一次
cancel()
基本使用示例
// 创建 Callable 任务
Callable<String> task = () -> {
Thread.sleep(2000);
return "Hello, FutureTask";
};
// 包装为 FutureTask
FutureTask<String> futureTask = new FutureTask<>(task);
// 提交到线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(futureTask);
// 获取结果(阻塞等待)
String result = futureTask.get(); // 会等待 2 秒后返回
二、类图与继承关系
public class FutureTask<V> implements RunnableFuture<V> {
// 实现 RunnableFuture 接口,该接口继承了 Runnable 和 Future
// 使得 FutureTask 既可作为 Runnable 提交执行,又可作为 Future 获取结果
}
类图结构:
FutureTask
├── RunnableFuture (接口)
│ ├── Runnable
│ └── Future
└── Future<V>
关键接口说明:
:定义
Runnable 方法,用于执行任务
run():定义
Future、
get()、
cancel() 等方法,用于获取结果和管理任务状态
isDone():组合了
RunnableFuture 和
Runnable,是 FutureTask 的直接父接口
Future
三、关键属性解析
public class FutureTask<V> implements RunnableFuture<V> {
// 任务状态(volatile 保证可见性)
private volatile int state;
private static final int NEW = 0; // 新建
private static final int COMPLETING = 1; // 正在完成
private static final int NORMAL = 2; // 正常完成
private static final int EXCEPTIONAL = 3; // 异常完成
private static final int CANCELLED = 4; // 已取消
private static final int INTERRUPTING = 5; // 正在中断
private static final int INTERRUPTED = 6; // 已中断
// 任务的 Callable 对象
private Callable<V> callable;
// 任务结果或异常(非 volatile,通过 state 保证可见性)
private Object outcome;
// 执行任务的线程(volatile 保证可见性)
private volatile Thread runner;
// 等待线程的链表结构(用于阻塞等待)
private volatile WaitNode waiters;
}
关键属性详解
:任务状态,是 FutureTask 的核心状态机
state
仅在 、
set() 和
setException() 方法中被修改状态转换路径:
cancel()
:正常完成
NEW -> COMPLETING -> NORMAL:异常完成
NEW -> COMPLETING -> EXCEPTIONAL:被取消
NEW -> CANCELLED:被中断
NEW -> INTERRUPTING -> INTERRUPTED
:任务的 Callable 对象,执行完成后置为 null
callable
任务执行完后, 被置为 null,避免内存泄漏
callable
:任务结果或异常
outcome
通过 保证可见性,避免直接读取导致的可见性问题任务正常完成时存储结果,异常完成时存储异常
state
:执行任务的线程
runner
通过 CAS 操作设置,确保只有一个线程能执行任务任务执行完成后置为 null
:等待线程的链表结构
waiters
用于管理调用 方法等待结果的线程使用
get() 类实现链表
WaitNode
四、状态机解析
状态转换图
NEW
│
├─→ COMPLETING → NORMAL (正常完成)
│
├─→ COMPLETING → EXCEPTIONAL (异常完成)
│
├─→ CANCELLED (被取消)
│
└─→ INTERRUPTING → INTERRUPTED (被中断)
状态转换逻辑
新建状态 (NEW):任务创建后初始状态正在完成状态 (COMPLETING):任务执行完成,正在设置结果或异常正常完成状态 (NORMAL):任务成功执行,结果已设置异常完成状态 (EXCEPTIONAL):任务执行中抛出异常,异常已设置已取消状态 (CANCELLED):任务被取消,未执行正在中断状态 (INTERRUPTING):任务正在被中断已中断状态 (INTERRUPTED):任务已被中断
状态转换关键点
状态转换是原子的:通过 CAS 操作保证状态转换的原子性状态转换不可逆:一旦进入终止状态(NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED),状态不会再改变状态转换是单向的:从 NEW 状态开始,只能通过指定路径转换到终止状态
五、核心方法源码分析
1. 构造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // 状态初始化为 NEW
}
关键点:
检查 是否为 null初始化
callable 为
state任务创建后,状态为
NEW
NEW
2.
run() 方法
run()
public void run() {
// 1. 检查状态,如果不是 NEW,说明任务已执行或已取消,直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
// 2. 执行任务
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
// 任务执行中抛出异常
result = null;
ran = false;
// 设置异常
setException(ex);
}
// 任务执行成功,设置结果
if (ran)
set(result);
}
} finally {
// 任务执行完成,重置 runner
runner = null;
// 状态变为 COMPLETING(已由 set() 设置)
int s = state;
// 如果是取消状态,尝试中断线程
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
关键点:
CAS 操作:通过 确保只有一个线程能执行任务任务执行:调用
UNSAFE.compareAndSwapObject 执行任务结果处理:成功时调用
callable.call(),异常时调用
set()状态管理:任务执行完成后,重置
setException(),并处理可能的中断
runner
3.
set() 方法
set()
protected void set(V v) {
if (state == NEW) {
// 通过 CAS 设置状态为 COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
// 通过 CAS 设置状态为 NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
// 唤醒等待线程
finishCompletion();
}
}
}
关键点:
状态转换:从 →
NEW →
COMPLETING原子操作:使用 CAS 确保状态转换的原子性结果存储:将结果存储在
NORMAL 中唤醒等待线程:通过
outcome 唤醒所有等待的线程
finishCompletion()
4.
setException() 方法
setException()
protected void setException(Throwable t) {
if (state == NEW) {
// 通过 CAS 设置状态为 COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
// 通过 CAS 设置状态为 EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
// 唤醒等待线程
finishCompletion();
}
}
}
关键点:
状态转换:从 →
NEW →
COMPLETING异常存储:将异常存储在
EXCEPTIONAL 中唤醒等待线程:通过
outcome 唤醒所有等待的线程
finishCompletion()
5.
get() 方法
get()
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果状态不是 NEW,说明任务已完成或已取消
if (s <= COMPLETING)
// 如果状态是 NEW 或 COMPLETING,阻塞等待
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
// 与 get() 类似,但支持超时
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
关键点:
状态检查:如果状态已不是 NEW,直接返回结果阻塞等待:如果状态是 NEW 或 COMPLETING,调用 阻塞等待结果报告:通过
awaitDone() 方法处理结果或异常
report()
6.
awaitDone() 方法
awaitDone()
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode node = null;
// 自旋等待
for (;;) {
// 检查状态
int s = state;
if (s > COMPLETING)
return s;
// 如果是超时等待,检查是否超时
if (timed && nanos <= 0)
return state;
// 如果线程被中断,抛出 InterruptedException
if (Thread.interrupted()) {
removeWaiter(node);
throw new InterruptedException();
}
// 如果没有等待节点,创建一个新的等待节点
if (node == null)
node = new WaitNode();
// 将等待节点加入等待队列
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, node.next, node))
node = null;
else {
// 阻塞当前线程
LockSupport.park(this);
// 唤醒后检查状态
if (timed && nanos <= 0)
removeWaiter(node);
}
}
}
关键点:
自旋等待:通过循环检查任务状态等待队列:使用 链表实现等待队列阻塞机制:使用
WaitNode 阻塞线程中断处理:如果线程被中断,抛出
LockSupport.park()
InterruptedException
7.
finishCompletion() 方法
finishCompletion()
private void finishCompletion() {
// 从等待队列中移除所有节点
for (WaitNode w = waiters; w != null; ) {
WaitNode next = w.next;
w.next = null;
// 唤醒等待线程
LockSupport.unpark(w.thread);
w = next;
}
// 清空等待队列
waiters = null;
// 执行完成后的钩子方法
done();
// 重置 callable
callable = null; // 释放引用,避免内存泄漏
}
关键点:
唤醒等待线程:通过 唤醒所有等待的线程清理等待队列:将等待队列置为空释放资源:将
LockSupport.unpark() 置为 null,避免内存泄漏执行钩子:调用
callable 钩子方法,供子类扩展
done()
六、深度思考
1. 为什么 FutureTask 可以保证只执行一次任务?
答案:FutureTask 通过 属性和 CAS 操作来保证任务只执行一次。
runner
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
初始为 null通过 CAS 操作将
runner 从 null 设置为当前线程如果 CAS 失败(
runner 不为 null),说明已有线程在执行任务,直接返回
runner
思考:这种设计避免了任务重复执行,保证了任务执行的原子性和唯一性。
2. 为什么 FutureTask 使用
volatile 修饰
state 而不是
outcome?
volatile
state
outcome
答案: 是状态标志,需要保证可见性;
state 是存储结果或异常的变量,通过
outcome 保证可见性。
state
private volatile int state;
private Object outcome; // 非 volatile,通过 state 保证可见性
思考:
是状态机的核心,需要保证可见性
state 通过
outcome 保证可见性,不需要额外的 volatile 修饰这种设计减少了不必要的内存屏障,提高了性能
state
3. 为什么 FutureTask 使用链表实现等待队列?
答案:FutureTask 使用链表实现等待队列,可以高效地添加和移除等待线程。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
思考:
链表实现可以高效地添加和移除节点通过 指针指向链表头节点使用
waiters 和
LockSupport.park() 实现线程阻塞和唤醒
LockSupport.unpark()
4. 为什么 FutureTask 不直接使用
synchronized 而是使用 CAS?
synchronized
答案:FutureTask 使用 CAS 而不是 是为了提高性能和减少锁竞争。
synchronized
思考:
会导致线程阻塞,而 CAS 是无锁操作在高并发场景下,CAS 的性能优于
synchronizedFutureTask 的状态转换是简单的原子操作,适合 CAS
synchronized
5. 为什么 FutureTask 在
run() 方法中使用
finally 块重置
runner?
run()
finally
runner
答案:在 方法中使用
run() 块重置
finally 是为了确保即使任务执行中抛出异常,
runner 也能被正确重置。
runner
finally {
runner = null;
// ...
}
思考:
重置 确保后续任务可以被正确执行即使任务执行中抛出异常,
runner 也会被重置避免任务执行完成后
runner 仍指向已结束的线程
runner
七、最佳实践
1. 任务提交与结果获取
// 创建 Callable 任务
Callable<String> task = () -> {
Thread.sleep(2000);
return "Task completed";
};
// 创建 FutureTask
FutureTask<String> futureTask = new FutureTask<>(task);
// 提交到线程池
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(futureTask);
// 获取结果
try {
String result = futureTask.get(); // 阻塞等待
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 关闭线程池
executor.shutdown();
最佳实践:
使用 提交任务,避免直接创建线程使用
ExecutorService 处理
try-catch 和
InterruptedException确保线程池正确关闭
ExecutionException
2. 超时获取结果
try {
String result = futureTask.get(1, TimeUnit.SECONDS); // 最多等待 1 秒
System.out.println("Result: " + result);
} catch (TimeoutException e) {
System.out.println("Task timed out");
futureTask.cancel(true); // 取消任务
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
最佳实践:
使用带超时的 防止无限等待超时后尝试取消任务使用
get() 中断正在执行的任务
cancel(true)
3. 任务取消
// 取消任务
boolean isCancelled = futureTask.cancel(true); // 中断正在执行的任务
// 检查任务是否被取消
if (isCancelled) {
System.out.println("Task was cancelled");
}
最佳实践:
使用 中断正在执行的任务检查取消是否成功取消后不要继续使用 FutureTask
cancel(true)
4. 避免内存泄漏
// 在任务完成后,确保释放资源
try {
String result = futureTask.get();
// 使用结果
} finally {
// 清理资源
futureTask = null;
}
最佳实践:
在使用完 FutureTask 后,将其置为 null避免持有 FutureTask 的引用导致内存泄漏对于长时间运行的任务,考虑使用
WeakReference
八、常见问题与解答
Q1: FutureTask 的
get() 方法会阻塞当前线程吗?
get()
A: 是的, 方法会阻塞当前线程,直到任务完成或超时。
get()
Q2: FutureTask 可以多次调用
get() 吗?
get()
A: 可以,但每次调用 都会返回相同的结果(任务已完成)。
get()
Q3: 为什么 FutureTask 的
get() 方法会抛出
ExecutionException?
get()
ExecutionException
A: 如果任务执行中抛出异常, 方法会包装为
get() 抛出。
ExecutionException
Q4: FutureTask 的
cancel() 方法会立即取消任务吗?
cancel()
A: 不会立即取消, 会尝试中断正在执行的任务,但任务可能不会立即停止。
cancel(true)
Q5: FutureTask 与
CompletableFuture 有什么区别?
CompletableFuture
A:
是 JDK 1.5 引入的,基于
FutureTask 接口
Future 是 JDK 1.8 引入的,提供了更强大的异步编程能力
CompletableFuture 支持链式调用、组合、异常处理等
CompletableFuture
九、总结
FutureTask 是 Java 并发编程中非常重要的类,它实现了 接口,提供了异步计算的能力。通过深入源码分析,我们理解了其核心设计思想:
RunnableFuture
状态机管理:通过 字段管理任务状态,确保状态转换的原子性和正确性线程安全:通过 CAS 操作和
state 保证线程安全结果获取:通过
volatile 方法阻塞等待结果,支持超时和取消资源管理:通过
get() 清理资源,避免内存泄漏
finishCompletion()
在实际开发中,FutureTask 是实现异步计算的基础,适用于各种耗时操作的场景,如数据库查询、网络请求、文件处理等。
通过本文的深度解析,你已经掌握了 FutureTask 的核心原理和最佳实践,可以自信地在项目中使用它,构建高性能的并发应用。
💡 最后思考:FutureTask 的设计体现了 Java 并发编程的精髓——通过状态机和原子操作保证线程安全,通过阻塞和唤醒机制实现线程协作。理解这些设计思想,将帮助你更好地理解和使用其他并发类,如
、
ReentrantLock等。
CountDownLatch
附录:FutureTask 源码关键部分(JDK 1.8)
public class FutureTask<V> implements RunnableFuture<V> {
// 状态常量
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// 任务的 Callable 对象
private Callable<V> callable;
// 任务结果或异常
private Object outcome;
// 执行任务的线程
private volatile Thread runner;
// 等待线程的链表结构
private volatile WaitNode waiters;
// Unsafe 实例
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
// 构造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
// run() 方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
// set() 方法
protected void set(V v) {
if (state == NEW) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
finishCompletion();
}
}
}
// setException() 方法
protected void setException(Throwable t) {
if (state == NEW) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion();
}
}
}
// get() 方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
// finishCompletion() 方法
private void finishCompletion() {
for (WaitNode w = waiters; w != null; ) {
WaitNode next = w.next;
w.next = null;
LockSupport.unpark(w.thread);
w = next;
}
waiters = null;
done();
callable = null; // 释放引用
}
// 其他方法...
}
附录:FutureTask 与 CompletableFuture 对比
| 特性 | FutureTask | CompletableFuture |
|---|---|---|
| JDK 版本 | JDK 1.5 | JDK 1.8 |
| 接口实现 | 实现 Future | 实现 Future |
| 链式调用 | 不支持 | 支持(thenApply, thenCompose) |
| 异步回调 | 不支持 | 支持(thenAccept, thenRun) |
| 异常处理 | 通过 get() 抛出 | 通过 exceptionally, handle |
| 组合能力 | 有限 | 强大(allOf, anyOf) |
| 使用场景 | 传统异步计算 | 现代异步编程 |
通过本文的学习,你已经深入理解了 FutureTask 的设计原理和实现细节,可以自信地在项目中使用它,构建高性能的并发应用。希望这篇文章对你有所帮助!


