深入Java并发包-(5)-异步编程

Wednesday, February 21, 2024

TOC

同步 VS. 异步

依赖倒置;压栈操作;通知回调

为什么异步模型比同步模型性能更高?这是一定的么?

过于绝对的答案,通常是不够准确的。这个也不例外。最容易想到的是由于异步模型涉及到线程切换的开销,可能会导致性能下降。但是除此之外,还有许多原因会导致异步模型性能下降。下面展开说说。

CompletableFuture

这是 JUC 包对异步编程最核心的一个类。它同时实现了Future<T>CompletionStage<T>两个接口。意味着它可以异步执行并获取结果,如V get()boolean isDone()这些 Future 中定义的方法都有所实现。更关键的是后者CompletionStage接口,可以把它翻译为完成阶段,它代表一系列操作的中间状态,包含了当前阶段生成的中间结果。基于一个完成阶段的结果,可以进入下一个阶段进行计算;也可以结合多个完成阶段的结果,来作为进入下一个阶段计算的入参;或者直接结束。 这样说你可能有点绕,别着急,先看一个最简单的例子:

public static void main(String[] args) {
	CompletableFuture<Integer> countStage = CompletableFuture.supplyAsync(() -> {
		// 清点库存...
		return 100; // 返回商品的数量
	});
	CompletableFuture<Integer> calculateStage = countStage.thenApply(count -> {
		// 查询商品的价格...
		Integer price = 20;
		return count * price; // 返回商品的总价
	}, Executors.newFixedThreadPool(10)); // 使用线程池执行
	CompletableFuture<Void> reportStage =  calculateStage.thenAccept(total -> {
		System.out.println("total: " + total); // 输出商品的总价
	});
}

在这里设计了一个商品报价系统,似乎多此一举地分了 3 个步骤。但这种回调思想是非常重要的第一步。有几个要点:

  1. 首先每个步骤都马上返回了一个Future对象,不会阻塞主线程,即不影响其他步骤的启动。
  2. 每个步骤内部定义的逻辑都是异步执行的,甚至可以指定线程池来执行(如calculateStage)。
  3. 每个步骤都可以通过thenApplythenAccept等方法来定义下一个阶段的操作,形成一个链式调用。
  4. 最终的结果可以通过thenAccept方法来异步获取,或者通过get()方法来阻塞等待结果。

CompletableFuture, A Future with CompletionStage

传统的 Future(如 java.util.concurrent.Future)本质上是一个异步结果的占位符,底层实现通常依赖于阻塞等待(如通过 LockSupport.park/unpark 或 Object.wait/notify),其核心是主线程通过 get() 方法阻塞直到结果可用。这种方式的主要局限在于:一旦调用 get(),当前线程就会被挂起,直到任务完成,无法灵活地进行任务编排和组合。

而 CompletableFuture 则实现了 Future 和 CompletionStage 两个接口。它不仅能像传统 Future 一样通过 get() 阻塞获取结果,更重要的是支持基于回调的异步任务编排(CompletionStage),即通过 thenApply、thenAccept、thenCombine 等方法注册回调,任务完成后自动触发后续操作。其底层实现是通过回调链表(Completion 链)和非阻塞的状态流转(CAS 操作 result 字段),实现了无需阻塞主线程即可完成复杂的异步流程。

链式调用与任务编排

链式调用的好处在于可以将多个异步任务串联起来,形成清晰的任务流。除了文章开头的简单链式模式,还能实现复杂的逻辑依赖关系,比如:

CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> f3 = f1.thenCombine(f2, Integer::sum); // 基于f1和f2的结果进行组合
f3.thenAccept(System.out::println); // 输出 3

除了thenCombine,还可以使用allOfanyOf等高级编排方式:

CompletableFuture<Integer> f4 = CompletableFuture.supplyAsync(() -> 4);
CompletableFuture<Integer> f5 = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Void> all = CompletableFuture.allOf(f4, f5); // 等待所有任务完成
all.thenRun(() -> {
    // 等待所有任务完成
    System.out.println("all done");
});

anyOf则只要有一个任务完成就会触发:

CompletableFuture<Object> any = CompletableFuture.anyOf(f4, f5); // 等待任意一个任务完成
any.thenAccept(result -> System.out.println("first done: " + result));

此外,CompletableFuture 还提供了更多灵活的任务组合函数:

  • thenCompose(Function): 用于将两个异步操作串联起来,前一个阶段的结果作为后一个阶段的输入,适合“扁平化”异 asynchronous嵌套。
    CompletableFuture<String> f6 = CompletableFuture.supplyAsync(() -> "hello");
    CompletableFuture<String> f7 = f6.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " world"));
    f7.thenAccept(System.out::println); // 输出 hello world
    
  • whenComplete(BiConsumer): 在任务完成时(无论正常还是异常)执行指定操作,但不会改变最终结果。
    CompletableFuture<Integer> f9 = CompletableFuture.supplyAsync(() -> 42);
    f9.whenComplete((result, ex) -> System.out.println("完成: " + result));
    
  • exceptionally(Function): 只处理异常情况,发生异常时返回一个默认值。
    CompletableFuture<Integer> f10 = CompletableFuture.supplyAsync(() -> 1 / 0)
        .exceptionally(ex -> 0);
    f10.thenAccept(System.out::println); // 输出 0
    

这些组合函数让异步任务的编排更加灵活和健壮,能够应对更复杂的业务场景。

边界场景控制

超时控制:
可以通过orTimeoutcompleteOnTimeout方法设置超时:

  • orTimeout(long timeout, TimeUnit unit):超时后抛出异常。
  • completeOnTimeout(T value, long timeout, TimeUnit unit):超时后返回默认值。

取消任务:
通过cancel(true)方法可以尝试取消任务,但如果任务已经开始执行,线程未必会立即停止。例如:

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        System.out.println("任务被中断");
    }
});
future.cancel(true);
System.out.println("cancel called");

即使调用了cancel,如果任务内部没有处理中断,线程可能不会立刻停止。

最常见的异步编程场景:异步IO操作

在实际开发中,异步编程最常见的应用场景就是IO操作,尤其是网络请求。很多同学在初学CompletableFuture时,容易犯一个常见的错误:只是把原本的同步IO操作放进supplyAsync里,表面上看起来是异步了,實際上只是把阻塞操作丢给了线程池,本质上并没有提升系统的并发能力。

下面先看一个“伪异步”的错误用法:

// 错误示范:将同步HTTP请求包装进CompletableFuture,实际还是阻塞线程池
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        // 这里以HttpURLConnection为例,实际是同步阻塞IO
        URL url = new URL("https://www.example.com");
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        conn.setRequestMethod("GET");
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(conn.getInputStream()))) {
            return reader.readLine();
        }
    } catch (IOException e) {
        return null;
    }
});
future.thenAccept(System.out::println);

这种写法虽然用上了CompletableFuture,但本质上还是同步IO,只是把阻塞转移到了线程池。如果线程池资源耗尽,依然会成为系统瓶颈。

正确的做法应该是利用真正的异步IO能力,让底层IO操作本身就是非阻塞的。Java 11引入的HttpClient就支持异步HTTP请求(或者使用第三方包AsyncHttpClient),配合CompletableFuture可以实现纯正的异步编程:

// 正确示范:使用HttpClient的sendAsync实现真正的异步HTTP请求
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create("https://www.example.com"))
        .build();
CompletableFuture<HttpResponse<String>> responseFuture =
        client.sendAsync(request, HttpResponse.BodyHandlers.ofString());
responseFuture.thenAccept(response -> {
    System.out.println(response.body());
});

这里的sendAsync方法底层就是异步IO,不会阻塞线程池中的线程。这样才能真正发挥CompletableFuture异步编排的优势,实现高并发、高吞吐的IO处理。

值得注意的是,使用这种“纯异步”方式后,等待IO事件的工作就不再由Java线程本身负责,而是交给了更底层的JDK甚至操作系统来处理。以Java 11的HttpClient为例,其底层实现基于NIO(非阻塞IO),通过Selector机制注册感兴趣的IO事件(如连接就绪、数据可读等),由操作系统内核负责监听这些事件。当事件发生时,操作系统会通过回调或事件通知的方式唤醒JVM中的Selector线程,进而触发CompletableFuture的回调链。

这种机制的核心思想是“事件驱动”,即线程不再主动等待某个IO操作完成,而是注册一个事件监听,等操作系统检测到事件发生后再通知应用层处理。这样可以极大提升资源利用率,避免大量线程因阻塞IO而浪费CPU和内存。

涉及到底层的技术方案包括:

  • JDK的NIO(基于epoll/kqueue/select等操作系统多路复用机制)
  • Netty等高性能网络框架的事件循环模型

因此,只有真正利用这些底层异步能力,才能实现高效的异步编程模型。CompletableFuture只是上层的编排工具,底层的高性能依赖于JDK和操作系统的事件驱动与异步IO支持。

CompletableFuture的实现原理

CompletableFuture的核心实现依赖于回调链表结构。它的主要成员变量和核心方法如下:

  • volatile Object result:保存异步计算的结果或异常。
  • volatile Completion stack:回调链表的栈顶,保存所有待触发的回调节点。
  • Executor executor:用于异步执行回调的线程池。

每次调用thenApplythenAccept等方法,都会向stack链表中添加一个Completion节点。任务完成时,会遍历链表,依次触发所有回调。

核心流程如下:

// 1. 任务定义
CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
    // 计算逻辑
    return value;
});

// 2. 注册回调
future.thenApply(...); // 向stack链表添加Completion节点

// 3. 任务完成时
private void completeValue(Object value) {
    if (UNSAFE.compareAndSwapObject(this, RESULT, null, value)) {
        postComplete(); // 触发所有回调
    }
}

private void postComplete() {
    Completion h;
    while ((h = stack) != null || (h = next) != null) {
        h.tryFire(NESTED); // 依次触发回调
        // ...existing code...
    }
}

总结

本篇主要介绍了Java并发包中异步编程的核心工具——CompletableFuture,分析了其与传统Future的实现区别,详细讲解了链式任务编排、边界场景控制(如超时与取消),并介绍了在最常见的IO场景之一——HTTP请求中的典型错误用法和正确用法。最后,还简要介绍了CompletableFuture的底层实现原理。