CompletableFuture — 异步编排
CompletableFuture 是 Java 8 引入的异步编程利器,支持链式调用、组合多个异步任务、异常处理,是现代 Java 异步编程的标准方式。
基础用法
java
// 创建异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 在 ForkJoinPool.commonPool() 中执行
return fetchDataFromRemote();
});
// 指定线程池(生产环境推荐)
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(
() -> fetchDataFromRemote(), executor);
// 无返回值的异步任务
CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> {
sendEmail("user@example.com");
}, executor);
// 获取结果
String result = future.get(); // 阻塞等待
String result2 = future.get(5, TimeUnit.SECONDS); // 超时等待
String result3 = future.getNow("default"); // 立即获取,未完成返回默认值
String result4 = future.join(); // 类似 get,但抛 unchecked 异常链式操作
java
CompletableFuture<UserVO> pipeline = CompletableFuture
// 1. 异步获取用户 ID
.supplyAsync(() -> getUserIdFromToken(token), executor)
// 2. 用 ID 查询用户(thenApply:同步转换,在同一线程)
.thenApply(userId -> userRepository.findById(userId))
// 3. 异步查询用户订单(thenCompose:异步转换,避免嵌套)
.thenCompose(user -> CompletableFuture.supplyAsync(
() -> orderRepository.findByUserId(user.getId()), executor)
.thenApply(orders -> new UserVO(user, orders)))
// 4. 处理结果(thenAccept:消费结果,无返回值)
.thenAccept(userVO -> cache.put(token, userVO))
// 5. 异常处理
.exceptionally(ex -> {
log.error("获取用户信息失败", ex);
return null;
});组合多个 Future
java
// allOf — 等待所有完成
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> fetchUser());
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> fetchOrder());
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> fetchProduct());
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join(); // 等待全部完成
// 收集所有结果
List<String> results = Stream.of(f1, f2, f3)
.map(CompletableFuture::join)
.collect(Collectors.toList());
// anyOf — 任意一个完成即返回
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
Object firstResult = any.join();
// thenCombine — 合并两个 Future 的结果
CompletableFuture<UserVO> combined = f1.thenCombine(f2,
(user, order) -> new UserVO(user, order));实战:并发查询聚合
java
@Service
public class UserProfileService {
public UserProfileVO getProfile(Long userId) {
// 并发查询三个数据源
CompletableFuture<User> userFuture =
CompletableFuture.supplyAsync(() -> userRepo.findById(userId), executor);
CompletableFuture<List<Order>> ordersFuture =
CompletableFuture.supplyAsync(() -> orderRepo.findByUserId(userId), executor);
CompletableFuture<UserStats> statsFuture =
CompletableFuture.supplyAsync(() -> statsService.getUserStats(userId), executor);
// 等待全部完成并组合
return CompletableFuture.allOf(userFuture, ordersFuture, statsFuture)
.thenApply(v -> {
User user = userFuture.join();
List<Order> orders = ordersFuture.join();
UserStats stats = statsFuture.join();
return new UserProfileVO(user, orders, stats);
})
.exceptionally(ex -> {
log.error("获取用户档案失败, userId={}", userId, ex);
throw new ServiceException("获取用户档案失败");
})
.join();
}
}超时处理(Java 9+)
java
// orTimeout — 超时后抛出 TimeoutException
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> slowOperation())
.orTimeout(3, TimeUnit.SECONDS);
// completeOnTimeout — 超时后返回默认值
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> slowOperation())
.completeOnTimeout("default", 3, TimeUnit.SECONDS);异常处理
java
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("随机失败");
return "success";
})
// exceptionally — 异常时返回默认值
.exceptionally(ex -> {
log.error("操作失败", ex);
return "fallback";
});
// handle — 无论成功失败都处理(类似 finally)
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> riskyOperation())
.handle((result, ex) -> {
if (ex != null) {
log.error("失败", ex);
return "error";
}
return result.toUpperCase();
});
// whenComplete — 处理完成事件,不改变结果
CompletableFuture<String> future3 = CompletableFuture
.supplyAsync(() -> "result")
.whenComplete((result, ex) -> {
if (ex != null) {
metrics.recordFailure();
} else {
metrics.recordSuccess();
}
// 不改变 future 的结果
});注意事项
java
// ❌ 不要使用 ForkJoinPool.commonPool() 执行 I/O 操作
// commonPool 线程数 = CPU 核数 - 1,I/O 阻塞会耗尽线程
CompletableFuture.supplyAsync(() -> httpClient.get(url)); // 危险!
// ✅ 使用专用线程池
ExecutorService ioExecutor = Executors.newFixedThreadPool(50);
CompletableFuture.supplyAsync(() -> httpClient.get(url), ioExecutor);
// ❌ 不要在 thenApply 中执行耗时操作
future.thenApply(result -> {
Thread.sleep(1000); // 阻塞线程!
return process(result);
});
// ✅ 耗时操作用 thenApplyAsync
future.thenApplyAsync(result -> {
Thread.sleep(1000);
return process(result);
}, executor);CompletableFuture vs 响应式
- CompletableFuture:适合有限数量的异步任务组合,API 简单直观
- Project Reactor:适合流式数据处理、背压控制、大量并发
- Java 21 Virtual Threads:可以用同步写法实现高并发,是 CompletableFuture 的有力替代