JUC 工具第五弹之 Exchanger 详解

在 Java 并发编程中,java.util.concurrent.Exchanger 是 JUC 包提供的一个相对“低调”但功能独特的线程同步工具。

什么是 Exchanger?

Exchanger是 Java 5 引入的并发工具类,它的核心作用是:允许两个线程在一个同步点交换彼此的数据。

想象这样一个场景:

线程 A 持有数据 X,线程 B 持有数据 Y。当它们都到达某个约定地点时,彼此交换手中的数据——A 拿到 Y,B 拿到 X。

这正是Exchanger要解决的问题。它不像BlockingQueue那样单向传递数据,而是实现双向对等交换。

类定义

public class Exchanger<V>

泛型V表明要交换的数据类型。

线程安全,可被多个线程共享使用(但逻辑上应仅用于两个线程配对)。

核心 API 详解

Exchanger提供两个主要方法:

1. V exchange(V x)

  • 功能:阻塞当前线程,直到另一个线程也调用exchange(),然后交换数据。
  • 参数:当前线程要交出的对象。
  • 返回值:从对方线程接收到的对象。
  • 异常:若线程在等待过程中被中断,抛出InterruptedException。

2. V exchange(V x, long timeout, TimeUnit unit)

  • 功能:带超时机制的交换。
  • 参数:
    • x:要交换的数据;
    • timeout:最大等待时间;
    • unit:时间单位(如 TimeUnit.SECONDS)。
  • 异常:
    • InterruptedException:线程被中断;
    • TimeoutException:超时未等到配对线程。

⚠️ 注意:一旦超时或中断,该线程不会参与后续交换,也不会“占用”配对槽位。

核心原理

虽然我们一般只需关注 API 使用,但理解其底层机制有助于写出更健壮的代码。

基本流程

  1. 线程 A 调用exchange(objA)→ 进入等待状态,暂存objA。
  2. 线程 B 调用exchange(objB)→ 发现已有等待线程,立即唤醒 A。
  3. A 和 B 相互交换对象:A 得到objB,B 得到objA。
  4. 两线程继续执行。

底层实现(JDK 8+)

  • 初期采用slot 模式:一个共享槽位存储等待线程及其数据。
  • 高并发时自动切换为arena 模式:多个槽位减少竞争(类似ConcurrentHashMap的分段思想)。
  • 基于sun.misc.Unsafe的 CAS 操作实现无锁同步。
  • 等待策略:先自旋(spin),再阻塞(park),兼顾低延迟与低 CPU 占用。

这种设计使得Exchanger在双线程场景下性能极高。

实战案例

1.基础交换

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BasicExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        ExecutorService pool = Executors.newFixedThreadPool(2);

        pool.submit(() -> {
            try {
                String myData = "Hello from Producer";
                System.out.println("Producer 准备交换: " + myDate);
                String received = exchanger.exchange(myData);
                System.out.println("Producer 收到: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        pool.submit(() -> {
            try {
                String myData = "Hi from Consumer";
                System.out.println("Consumer 准备交换: " + myData);
                String received = exchanger.exchange(myData);
                System.out.println("Consumer 收到: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        pool.shutdown();
    }
}

输出(顺序可能不同)

Producer 准备交换: Hello from Producer
Consumer 准备交换: Hi from Consumer
Producer 收到: Hi from Consumer
Consumer 收到: Hello from Producer

2.带超时处理

// 模拟一个可能失败的交换
new Thread(() -> {
    try {
        String result = exchanger.exchange("Data", 1, TimeUnit.SECONDS);
        System.out.println("成功交换,收到: " + result);
    } catch (TimeoutException e) {
        System.out.println("❌ 超时!没有伙伴线程");
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

// 故意不启动第二个线程 → 触发超时

经典场景应用

1.双缓冲(Double Buffering)

在高性能 I/O 或图形渲染中,常使用双缓冲避免数据竞争:

  • 生产者线程:写入缓冲区 A;
  • 消费者线程:读取缓冲区 B;
  • 当生产完成,双方通过Exchanger交换缓冲区引用,无需复制数据。
// 缓冲区类
class Buffer {
    final byte[] data = new byte[1024];
    int size;
}

Exchanger<Buffer> exchanger = new Exchanger<>();
Buffer buffer1 = new Buffer();
Buffer buffer2 = new Buffer();

// 生产者
while (running) {
    fillData(buffer1); // 填充数据
    buffer1 = exchanger.exchange(buffer1); // 交出满的,拿回空的
}

// 消费者
while (running) {
    process(buffer2); // 处理数据
    buffer2 = exchanger.exchange(buffer2); // 交出空的,拿回满的
}

✅ 优势:零拷贝、无锁、高吞吐。

2.结果校验

两个线程用不同算法计算同一任务,交换结果进行比对:

Exchanger<Integer> exchanger = new Exchanger<>();

// 算法 A
int resultA = computeMethodA();
int other = exchanger.exchange(resultA);
assert resultA == other : "计算结果不一致!";

// 算法 B
int resultB = computeMethodB();
int other = exchanger.exchange(resultB);
assert resultB == other;

3.协作式流水线

线程 A 处理前半段数据,线程 B 处理后半段,中间通过Exchanger传递中间结果。

使用注意事项

1. 严格限制为两个线程

虽然Exchanger内部支持多线程(按 FIFO 配对),但业务逻辑应确保每次只有两个线程参与交换。否则容易出现:

  • 第三个线程无限等待;
  • 逻辑错乱(如 A 与 C 交换,而非预期的 A 与 B)。

2. 避免重复使用导致死锁

若多次使用同一个Exchanger,必须保证每次调用都是成对出现。例如:

// ❌ 危险:如果某次只有一方调用,另一方 crash,则永久阻塞
for (int i = 0; i < 10; i++) {
    data = exchanger.exchange(data);
}

提议:在循环中加入超时或健康检查。

3. 正确处理中断与超时

务必捕获异常并恢复中断状态:

try {
    data = exchanger.exchange(data, 5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    log.warn("交换超时");
    return; // 退出,不要继续等待
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); // 恢复中断
    return;
}

4. 内存可见性已保证

Exchanger内部建立了happens-before关系,交换后的数据对双方线程立即可见,无需额外使用volatile。

与其他同步工具对比

工具

适用线程数

数据流向

典型用途

Exchanger

2

双向交换

双缓冲、结果校验

SynchronousQueue

≥2

单向传递

线程池任务移交

CyclicBarrier

≥2

无数据交换

多线程协同启动/阶段同步

CountDownLatch

1:N

无数据

主线程等待多个子任务完成

记住:Exchanger 是唯一支持“双向数据交换”的并发工具

总结

Exchanger虽然使用场景相对小众,但在特定领域(如高性能数据处理、双缓冲架构)中表现出色。它的设计简洁而优雅,体现了 Java 并发包“为特定问题提供精准工具”的哲学。

关键点:

两线程,一同步,交换数据无锁通;双缓冲,校验用,超时中断要善终。

当你遇到需要两个线程“互换东西”的需求时,不妨试试Exchanger——它可能正是你缺失的那一块拼图。

© 版权声明

相关文章

暂无评论

none
暂无评论...