CompletableFuture异步编程
约 1590 字大约 5 分钟
javaasynccompletablefuture
2025-03-15
概述
CompletableFuture 是 Java 8 引入的异步编程工具类,实现了 Future 和 CompletionStage 接口。它支持链式调用、组合多个异步任务、灵活的异常处理,是Java中进行异步编程的首选方案。
与传统Future对比
// 传统 Future — 阻塞获取结果
ExecutorService executor = Executors.newFixedThreadPool(4);
Future<String> future = executor.submit(() -> fetchData());
String result = future.get(); // 阻塞!无法组合、无回调
// CompletableFuture — 非阻塞、可组合
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> fetchData());
cf.thenApply(String::toUpperCase)
.thenAccept(System.out::println); // 全程非阻塞| 特性 | Future | CompletableFuture |
|---|---|---|
| 阻塞获取 | get() | get()(也支持) |
| 回调 | 不支持 | thenApply/thenAccept/... |
| 链式组合 | 不支持 | thenCompose/thenCombine/... |
| 异常处理 | 只能get()时catch | exceptionally/handle/whenComplete |
| 手动完成 | 不支持 | complete/completeExceptionally |
创建CompletableFuture
// 1. 有返回值的异步任务
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
return queryDatabase();
});
// 2. 无返回值的异步任务
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
sendNotification();
});
// 3. 指定线程池
ExecutorService pool = Executors.newFixedThreadPool(8);
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
return queryDatabase();
}, pool);
// 4. 直接创建已完成的Future
CompletableFuture<String> completed = CompletableFuture.completedFuture("done");
// 5. 手动完成
CompletableFuture<String> manual = new CompletableFuture<>();
// 某个时刻...
manual.complete("result"); // 正常完成
// 或
manual.completeExceptionally(new RuntimeException("fail")); // 异常完成链式操作总览
thenApply / thenCompose / thenCombine
thenApply — 同步转换(map)
CompletableFuture<String> nameFuture = CompletableFuture
.supplyAsync(() -> getUserId()) // 异步获取userId
.thenApply(id -> getUserName(id)) // 同步转换(在同一线程)
.thenApply(name -> "Hello, " + name); // 再次转换thenCompose — 异步扁平化(flatMap)
当转换函数本身返回 CompletableFuture 时,使用 thenCompose 避免嵌套。
// thenApply 会导致嵌套: CompletableFuture<CompletableFuture<String>>
// thenCompose 扁平化: CompletableFuture<String>
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> getUserId())
.thenCompose(id -> queryUserAsync(id)) // queryUserAsync返回CF<String>
.thenCompose(user -> queryOrderAsync(user));thenCombine — 合并两个独立任务
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> getUser());
CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(() -> getOrder());
// 两个任务并行执行,都完成后合并结果
CompletableFuture<String> combined = userFuture.thenCombine(orderFuture,
(user, order) -> user + " ordered " + order);异常处理
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("Boom!");
return "OK";
});
// 1. exceptionally — 异常时提供降级值
CompletableFuture<String> withFallback = cf.exceptionally(ex -> {
log.warn("Failed: {}", ex.getMessage());
return "Fallback";
});
// 2. handle — 同时处理正常和异常(类似try-catch-finally)
CompletableFuture<String> handled = cf.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
}
return "Success: " + result;
});
// 3. whenComplete — 窥探结果,不改变值(类似finally)
CompletableFuture<String> peeked = cf.whenComplete((result, ex) -> {
if (ex != null) log.error("Error occurred", ex);
else log.info("Result: {}", result);
});
// 原始值或异常会继续传递到下一个阶段allOf / anyOf
allOf — 等待所有任务完成
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> queryService1());
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> queryService2());
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> queryService3());
// allOf返回CompletableFuture<Void>,需要手动收集结果
CompletableFuture<Void> allDone = CompletableFuture.allOf(cf1, cf2, cf3);
// 收集所有结果
CompletableFuture<List<String>> allResults = allDone.thenApply(v ->
Stream.of(cf1, cf2, cf3)
.map(CompletableFuture::join) // join不抛checked异常
.collect(Collectors.toList())
);anyOf — 任一任务完成
// 哪个先完成用哪个(赛跑模式)
CompletableFuture<Object> fastest = CompletableFuture.anyOf(cf1, cf2, cf3);
fastest.thenAccept(result -> System.out.println("First result: " + result));线程池选择
// 默认线程池:ForkJoinPool.commonPool()
// 适合CPU密集型任务,线程数 = CPU核数 - 1
// I/O密集型任务应使用自定义线程池
ExecutorService ioPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2,
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "io-pool-" + counter.incrementAndGet());
t.setDaemon(true);
return t;
}
}
);
// 使用自定义线程池
CompletableFuture.supplyAsync(() -> callRemoteAPI(), ioPool);Async后缀规则:
thenApply()— 在完成的那个线程上执行(同步)thenApplyAsync()— 提交到默认线程池执行thenApplyAsync(fn, executor)— 提交到指定线程池执行
实战模式
模式1:并行调用多个服务后聚合
public UserProfile getUserProfile(String userId) {
CompletableFuture<UserInfo> userInfoCF =
CompletableFuture.supplyAsync(() -> userService.getInfo(userId), ioPool);
CompletableFuture<List<Order>> ordersCF =
CompletableFuture.supplyAsync(() -> orderService.getOrders(userId), ioPool);
CompletableFuture<Integer> pointsCF =
CompletableFuture.supplyAsync(() -> pointService.getPoints(userId), ioPool);
return CompletableFuture.allOf(userInfoCF, ordersCF, pointsCF)
.thenApply(v -> {
UserProfile profile = new UserProfile();
profile.setUserInfo(userInfoCF.join());
profile.setOrders(ordersCF.join());
profile.setPoints(pointsCF.join());
return profile;
})
.join(); // 最终阻塞获取
}模式2:超时控制
// Java 9+
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> slowOperation())
.orTimeout(3, TimeUnit.SECONDS) // 超时抛出TimeoutException
.exceptionally(ex -> "timeout fallback");
// Java 9+
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> slowOperation())
.completeOnTimeout("default", 3, TimeUnit.SECONDS); // 超时返回默认值模式3:重试
public static <T> CompletableFuture<T> retry(
Supplier<CompletableFuture<T>> supplier, int maxRetries) {
CompletableFuture<T> cf = supplier.get();
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionallyCompose(ex -> supplier.get()); // Java 12+
}
return cf;
}
// 使用
CompletableFuture<String> result = retry(
() -> CompletableFuture.supplyAsync(() -> callUnstableAPI()), 3);常见陷阱
1. 忘记处理异常
// 错误:异常被静默吞掉
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("error");
}).thenApply(r -> r); // 异常不会打印,不会传播
// 正确:始终添加异常处理
CompletableFuture.supplyAsync(() -> riskyOperation())
.exceptionally(ex -> {
log.error("Operation failed", ex);
return fallbackValue;
});2. 在默认线程池中执行阻塞操作
// 错误:阻塞操作耗尽ForkJoinPool.commonPool
CompletableFuture.supplyAsync(() -> {
return httpClient.get("http://slow-api.com"); // 阻塞I/O
}); // 使用commonPool
// 正确:为I/O操作使用独立线程池
CompletableFuture.supplyAsync(() -> {
return httpClient.get("http://slow-api.com");
}, ioPool);3. allOf中某个任务异常
// allOf的异常行为:只要有一个失败,allOf就失败
// 但其他任务仍会继续执行(不会被取消)
// 如果需要忽略部分失败:
List<CompletableFuture<String>> futures = services.stream()
.map(svc -> CompletableFuture.supplyAsync(() -> svc.call())
.exceptionally(ex -> null)) // 每个任务独立处理异常
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.toList());总结
CompletableFuture 是Java异步编程的核心工具。掌握 thenApply/thenCompose/thenCombine 三大组合操作,exceptionally/handle 异常处理,以及 allOf/anyOf 多任务编排,就能应对绝大多数异步编程场景。关键是选择合适的线程池并妥善处理异常,避免静默失败和线程饥饿。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于