Skip to content

CompletableFuture — 异步编排

CompletableFuture 是 Java 8 引入的异步编程利器,支持链式调用、组合多个异步任务、异常处理,是现代 Java 异步编程的标准方式。

基础用法

java
// 创建异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 在 ForkJoinPool.commonPool() 中执行
    return fetchDataFromRemote();
});

// 指定线程池(生产环境推荐)
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(
    () -> fetchDataFromRemote(), executor);

// 无返回值的异步任务
CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> {
    sendEmail("user@example.com");
}, executor);

// 获取结果
String result = future.get();                          // 阻塞等待
String result2 = future.get(5, TimeUnit.SECONDS);     // 超时等待
String result3 = future.getNow("default");             // 立即获取,未完成返回默认值
String result4 = future.join();                        // 类似 get,但抛 unchecked 异常

链式操作

java
CompletableFuture<UserVO> pipeline = CompletableFuture
    // 1. 异步获取用户 ID
    .supplyAsync(() -> getUserIdFromToken(token), executor)

    // 2. 用 ID 查询用户(thenApply:同步转换,在同一线程)
    .thenApply(userId -> userRepository.findById(userId))

    // 3. 异步查询用户订单(thenCompose:异步转换,避免嵌套)
    .thenCompose(user -> CompletableFuture.supplyAsync(
        () -> orderRepository.findByUserId(user.getId()), executor)
        .thenApply(orders -> new UserVO(user, orders)))

    // 4. 处理结果(thenAccept:消费结果,无返回值)
    .thenAccept(userVO -> cache.put(token, userVO))

    // 5. 异常处理
    .exceptionally(ex -> {
        log.error("获取用户信息失败", ex);
        return null;
    });

组合多个 Future

java
// allOf — 等待所有完成
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> fetchUser());
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> fetchOrder());
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> fetchProduct());

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();  // 等待全部完成

// 收集所有结果
List<String> results = Stream.of(f1, f2, f3)
    .map(CompletableFuture::join)
    .collect(Collectors.toList());

// anyOf — 任意一个完成即返回
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
Object firstResult = any.join();

// thenCombine — 合并两个 Future 的结果
CompletableFuture<UserVO> combined = f1.thenCombine(f2,
    (user, order) -> new UserVO(user, order));

实战:并发查询聚合

java
@Service
public class UserProfileService {

    public UserProfileVO getProfile(Long userId) {
        // 并发查询三个数据源
        CompletableFuture<User> userFuture =
            CompletableFuture.supplyAsync(() -> userRepo.findById(userId), executor);

        CompletableFuture<List<Order>> ordersFuture =
            CompletableFuture.supplyAsync(() -> orderRepo.findByUserId(userId), executor);

        CompletableFuture<UserStats> statsFuture =
            CompletableFuture.supplyAsync(() -> statsService.getUserStats(userId), executor);

        // 等待全部完成并组合
        return CompletableFuture.allOf(userFuture, ordersFuture, statsFuture)
            .thenApply(v -> {
                User user = userFuture.join();
                List<Order> orders = ordersFuture.join();
                UserStats stats = statsFuture.join();
                return new UserProfileVO(user, orders, stats);
            })
            .exceptionally(ex -> {
                log.error("获取用户档案失败, userId={}", userId, ex);
                throw new ServiceException("获取用户档案失败");
            })
            .join();
    }
}

超时处理(Java 9+)

java
// orTimeout — 超时后抛出 TimeoutException
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> slowOperation())
    .orTimeout(3, TimeUnit.SECONDS);

// completeOnTimeout — 超时后返回默认值
CompletableFuture<String> future2 = CompletableFuture
    .supplyAsync(() -> slowOperation())
    .completeOnTimeout("default", 3, TimeUnit.SECONDS);

异常处理

java
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() > 0.5) throw new RuntimeException("随机失败");
        return "success";
    })

    // exceptionally — 异常时返回默认值
    .exceptionally(ex -> {
        log.error("操作失败", ex);
        return "fallback";
    });

// handle — 无论成功失败都处理(类似 finally)
CompletableFuture<String> future2 = CompletableFuture
    .supplyAsync(() -> riskyOperation())
    .handle((result, ex) -> {
        if (ex != null) {
            log.error("失败", ex);
            return "error";
        }
        return result.toUpperCase();
    });

// whenComplete — 处理完成事件,不改变结果
CompletableFuture<String> future3 = CompletableFuture
    .supplyAsync(() -> "result")
    .whenComplete((result, ex) -> {
        if (ex != null) {
            metrics.recordFailure();
        } else {
            metrics.recordSuccess();
        }
        // 不改变 future 的结果
    });

注意事项

java
// ❌ 不要使用 ForkJoinPool.commonPool() 执行 I/O 操作
// commonPool 线程数 = CPU 核数 - 1,I/O 阻塞会耗尽线程
CompletableFuture.supplyAsync(() -> httpClient.get(url));  // 危险!

// ✅ 使用专用线程池
ExecutorService ioExecutor = Executors.newFixedThreadPool(50);
CompletableFuture.supplyAsync(() -> httpClient.get(url), ioExecutor);

// ❌ 不要在 thenApply 中执行耗时操作
future.thenApply(result -> {
    Thread.sleep(1000);  // 阻塞线程!
    return process(result);
});

// ✅ 耗时操作用 thenApplyAsync
future.thenApplyAsync(result -> {
    Thread.sleep(1000);
    return process(result);
}, executor);

CompletableFuture vs 响应式

  • CompletableFuture:适合有限数量的异步任务组合,API 简单直观
  • Project Reactor:适合流式数据处理、背压控制、大量并发
  • Java 21 Virtual Threads:可以用同步写法实现高并发,是 CompletableFuture 的有力替代

系统学习 Java 生态,深入底层架构