TOC
同步 VS. 异步
依赖倒置;压栈操作;通知回调
为什么异步模型比同步模型性能更高?这是一定的么?
过于绝对的答案,通常是不够准确的。这个也不例外。最容易想到的是由于异步模型涉及到线程切换的开销,可能会导致性能下降。但是除此之外,还有许多原因会导致异步模型性能下降。下面展开说说。
CompletableFuture
这是 JUC 包对异步编程最核心的一个类。它同时实现了Future<T>
和CompletionStage<T>
两个接口。意味着它可以异步执行并获取结果,如V get()
、boolean isDone()
这些 Future 中定义的方法都有所实现。更关键的是后者CompletionStage
接口,可以把它翻译为完成阶段,它代表一系列操作的中间状态,包含了当前阶段生成的中间结果。基于一个完成阶段的结果,可以进入下一个阶段进行计算;也可以结合多个完成阶段的结果,来作为进入下一个阶段计算的入参;或者直接结束。
这样说你可能有点绕,别着急,先看一个最简单的例子:
public static void main(String[] args) {
CompletableFuture<Integer> countStage = CompletableFuture.supplyAsync(() -> {
// 清点库存...
return 100; // 返回商品的数量
});
CompletableFuture<Integer> calculateStage = countStage.thenApply(count -> {
// 查询商品的价格...
Integer price = 20;
return count * price; // 返回商品的总价
}, Executors.newFixedThreadPool(10)); // 使用线程池执行
CompletableFuture<Void> reportStage = calculateStage.thenAccept(total -> {
System.out.println("total: " + total); // 输出商品的总价
});
}
在这里设计了一个商品报价系统,似乎多此一举地分了 3 个步骤。但这种回调思想是非常重要的第一步。有几个要点:
- 首先每个步骤都马上返回了一个
Future
对象,不会阻塞主线程,即不影响其他步骤的启动。 - 每个步骤内部定义的逻辑都是异步执行的,甚至可以指定线程池来执行(如calculateStage)。
- 每个步骤都可以通过
thenApply
、thenAccept
等方法来定义下一个阶段的操作,形成一个链式调用。 - 最终的结果可以通过
thenAccept
方法来异步获取,或者通过get()
方法来阻塞等待结果。
CompletableFuture, A Future with CompletionStage
传统的 Future(如 java.util.concurrent.Future)本质上是一个异步结果的占位符,底层实现通常依赖于阻塞等待(如通过 LockSupport.park/unpark 或 Object.wait/notify),其核心是主线程通过 get() 方法阻塞直到结果可用。这种方式的主要局限在于:一旦调用 get(),当前线程就会被挂起,直到任务完成,无法灵活地进行任务编排和组合。
而 CompletableFuture 则实现了 Future 和 CompletionStage 两个接口。它不仅能像传统 Future 一样通过 get() 阻塞获取结果,更重要的是支持基于回调的异步任务编排(CompletionStage),即通过 thenApply、thenAccept、thenCombine 等方法注册回调,任务完成后自动触发后续操作。其底层实现是通过回调链表(Completion 链)和非阻塞的状态流转(CAS 操作 result 字段),实现了无需阻塞主线程即可完成复杂的异步流程。
链式调用与任务编排
链式调用的好处在于可以将多个异步任务串联起来,形成清晰的任务流。除了文章开头的简单链式模式,还能实现复杂的逻辑依赖关系,比如:
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> f3 = f1.thenCombine(f2, Integer::sum); // 基于f1和f2的结果进行组合
f3.thenAccept(System.out::println); // 输出 3
除了thenCombine
,还可以使用allOf
、anyOf
等高级编排方式:
CompletableFuture<Integer> f4 = CompletableFuture.supplyAsync(() -> 4);
CompletableFuture<Integer> f5 = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Void> all = CompletableFuture.allOf(f4, f5); // 等待所有任务完成
all.thenRun(() -> {
// 等待所有任务完成
System.out.println("all done");
});
anyOf
则只要有一个任务完成就会触发:
CompletableFuture<Object> any = CompletableFuture.anyOf(f4, f5); // 等待任意一个任务完成
any.thenAccept(result -> System.out.println("first done: " + result));
此外,CompletableFuture 还提供了更多灵活的任务组合函数:
thenCompose(Function)
: 用于将两个异步操作串联起来,前一个阶段的结果作为后一个阶段的输入,适合“扁平化”异 asynchronous嵌套。CompletableFuture<String> f6 = CompletableFuture.supplyAsync(() -> "hello"); CompletableFuture<String> f7 = f6.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " world")); f7.thenAccept(System.out::println); // 输出 hello world
whenComplete(BiConsumer)
: 在任务完成时(无论正常还是异常)执行指定操作,但不会改变最终结果。CompletableFuture<Integer> f9 = CompletableFuture.supplyAsync(() -> 42); f9.whenComplete((result, ex) -> System.out.println("完成: " + result));
exceptionally(Function)
: 只处理异常情况,发生异常时返回一个默认值。CompletableFuture<Integer> f10 = CompletableFuture.supplyAsync(() -> 1 / 0) .exceptionally(ex -> 0); f10.thenAccept(System.out::println); // 输出 0
这些组合函数让异步任务的编排更加灵活和健壮,能够应对更复杂的业务场景。
边界场景控制
超时控制:
可以通过orTimeout
和completeOnTimeout
方法设置超时:
orTimeout(long timeout, TimeUnit unit)
:超时后抛出异常。completeOnTimeout(T value, long timeout, TimeUnit unit)
:超时后返回默认值。
取消任务:
通过cancel(true)
方法可以尝试取消任务,但如果任务已经开始执行,线程未必会立即停止。例如:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
System.out.println("任务被中断");
}
});
future.cancel(true);
System.out.println("cancel called");
即使调用了cancel
,如果任务内部没有处理中断,线程可能不会立刻停止。
最常见的异步编程场景:异步IO操作
在实际开发中,异步编程最常见的应用场景就是IO操作,尤其是网络请求。很多同学在初学CompletableFuture时,容易犯一个常见的错误:只是把原本的同步IO操作放进supplyAsync
里,表面上看起来是异步了,實際上只是把阻塞操作丢给了线程池,本质上并没有提升系统的并发能力。
下面先看一个“伪异步”的错误用法:
// 错误示范:将同步HTTP请求包装进CompletableFuture,实际还是阻塞线程池
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 这里以HttpURLConnection为例,实际是同步阻塞IO
URL url = new URL("https://www.example.com");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(conn.getInputStream()))) {
return reader.readLine();
}
} catch (IOException e) {
return null;
}
});
future.thenAccept(System.out::println);
这种写法虽然用上了CompletableFuture,但本质上还是同步IO,只是把阻塞转移到了线程池。如果线程池资源耗尽,依然会成为系统瓶颈。
正确的做法应该是利用真正的异步IO能力,让底层IO操作本身就是非阻塞的。Java 11引入的HttpClient
就支持异步HTTP请求(或者使用第三方包AsyncHttpClient),配合CompletableFuture可以实现纯正的异步编程:
// 正确示范:使用HttpClient的sendAsync实现真正的异步HTTP请求
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://www.example.com"))
.build();
CompletableFuture<HttpResponse<String>> responseFuture =
client.sendAsync(request, HttpResponse.BodyHandlers.ofString());
responseFuture.thenAccept(response -> {
System.out.println(response.body());
});
这里的sendAsync
方法底层就是异步IO,不会阻塞线程池中的线程。这样才能真正发挥CompletableFuture异步编排的优势,实现高并发、高吞吐的IO处理。
值得注意的是,使用这种“纯异步”方式后,等待IO事件的工作就不再由Java线程本身负责,而是交给了更底层的JDK甚至操作系统来处理。以Java 11的HttpClient
为例,其底层实现基于NIO(非阻塞IO),通过Selector机制注册感兴趣的IO事件(如连接就绪、数据可读等),由操作系统内核负责监听这些事件。当事件发生时,操作系统会通过回调或事件通知的方式唤醒JVM中的Selector线程,进而触发CompletableFuture的回调链。
这种机制的核心思想是“事件驱动”,即线程不再主动等待某个IO操作完成,而是注册一个事件监听,等操作系统检测到事件发生后再通知应用层处理。这样可以极大提升资源利用率,避免大量线程因阻塞IO而浪费CPU和内存。
涉及到底层的技术方案包括:
- JDK的NIO(基于epoll/kqueue/select等操作系统多路复用机制)
- Netty等高性能网络框架的事件循环模型
因此,只有真正利用这些底层异步能力,才能实现高效的异步编程模型。CompletableFuture只是上层的编排工具,底层的高性能依赖于JDK和操作系统的事件驱动与异步IO支持。
CompletableFuture的实现原理
CompletableFuture
的核心实现依赖于回调链表结构。它的主要成员变量和核心方法如下:
volatile Object result
:保存异步计算的结果或异常。volatile Completion stack
:回调链表的栈顶,保存所有待触发的回调节点。Executor executor
:用于异步执行回调的线程池。
每次调用thenApply
、thenAccept
等方法,都会向stack
链表中添加一个Completion
节点。任务完成时,会遍历链表,依次触发所有回调。
核心流程如下:
// 1. 任务定义
CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
// 计算逻辑
return value;
});
// 2. 注册回调
future.thenApply(...); // 向stack链表添加Completion节点
// 3. 任务完成时
private void completeValue(Object value) {
if (UNSAFE.compareAndSwapObject(this, RESULT, null, value)) {
postComplete(); // 触发所有回调
}
}
private void postComplete() {
Completion h;
while ((h = stack) != null || (h = next) != null) {
h.tryFire(NESTED); // 依次触发回调
// ...existing code...
}
}
总结
本篇主要介绍了Java并发包中异步编程的核心工具——CompletableFuture,分析了其与传统Future的实现区别,详细讲解了链式任务编排、边界场景控制(如超时与取消),并介绍了在最常见的IO场景之一——HTTP请求中的典型错误用法和正确用法。最后,还简要介绍了CompletableFuture的底层实现原理。