CompletableFuture 异步编程实战
CompletableFuture 是 Java 8 引入的异步编程工具,支持函数式编排、异常处理和组合操作,大幅提升并发性能。
一、为什么需要异步编程
1.1 同步编程问题
// 串行执行,总耗时 = 各任务之和
Result r1 = task1(); // 100ms
Result r2 = task2(); // 100ms
Result r3 = task3(); // 100ms
// 总耗时:300ms
1.2 异步编程优势
// 并行执行,总耗时 = 最慢任务
CompletableFuture<Result> f1 = supplyAsync(() -> task1());
CompletableFuture<Result> f2 = supplyAsync(() -> task2());
CompletableFuture<Result> f3 = supplyAsync(() -> task3());
CompletableFuture.allOf(f1, f2, f3).join();
// 总耗时:~100ms
二、核心概念
2.1 Future 的局限
// ❌ 阻塞获取结果
Future<Result> future = executor.submit(task);
Result result = future.get(); // 阻塞
// ❌ 无法手动完成
// ❌ 无法链式调用
// ❌ 无法组合多个 Future
2.2 CompletableFuture 特性
CompletableFuture
├── 手动完成(complete)
├── 链式调用(thenApply)
├── 组合操作(thenCombine)
├── 异常处理(exceptionally)
└── 异步编排(thenCompose)
三、创建 CompletableFuture
3.1 异步执行
// 使用 ForkJoinPool.commonPool()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, executor);
3.2 手动完成
CompletableFuture<String> future = new CompletableFuture<>();
// 正常完成
future.complete("Result");
// 异常完成
future.completeExceptionally(new RuntimeException("Error"));
// 获取结果
String result = future.get();
四、结果转换
4.1 thenApply(同步转换)
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s.length());
// 结果:5
4.2 thenApplyAsync(异步转换)
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApplyAsync(s -> s.length()); // 异步执行
4.3 thenAccept(消费结果)
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenAccept(s -> System.out.println(s));
4.4 thenRun(不关心结果)
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("Done"));
五、任务编排
5.1 thenCompose(依赖编排)
// 任务 2 依赖任务 1 的结果
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> getUserId())
.thenCompose(userId -> getUserInfo(userId));
// 等价于
getUserId().thenApplyAsync(id -> getUserInfo(id)).join();
5.2 thenCombine(合并结果)
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> result = f1.thenCombine(f2, (s1, s2) -> s1 + " " + s2);
// 结果:"Hello World"
5.3 allOf(等待所有完成)
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> task1());
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> task2());
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> task3());
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join(); // 等待所有完成
// 获取所有结果
List<String> results = Stream.of(f1, f2, f3)
.map(CompletableFuture::join)
.collect(Collectors.toList());
5.4 anyOf(任一完成)
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
Object result = any.join(); // 返回最先完成的结果
六、异常处理
6.1 exceptionally(异常兜底)
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
throw new RuntimeException("Error");
})
.exceptionally(ex -> "Default Value");
// 结果:"Default Value"
6.2 handle(无论异常)
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
throw new RuntimeException("Error");
})
.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
}
return result;
});
6.3 whenComplete(回调)
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Result")
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Success: " + result);
} else {
System.out.println("Error: " + ex.getMessage());
}
});
七、实战场景
7.1 并行调用多个服务
public UserProfile getUserProfile(Long userId) {
CompletableFuture<User> userFuture = CompletableFuture
.supplyAsync(() -> userService.getUser(userId));
CompletableFuture<Order> orderFuture = CompletableFuture
.supplyAsync(() -> orderService.getRecentOrder(userId));
CompletableFuture<Points> pointsFuture = CompletableFuture
.supplyAsync(() -> pointsService.getPoints(userId));
// 等待所有完成
CompletableFuture.allOf(userFuture, orderFuture, pointsFuture).join();
// 组装结果
return UserProfile.builder()
.user(userFuture.join())
.order(orderFuture.join())
.points(pointsFuture.join())
.build();
}
7.2 超时控制
public <T> T getWithTimeout(CompletableFuture<T> future, long timeout) {
try {
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
future.cancel(true);
throw new RuntimeException("Timeout", e);
} catch (Exception e) {
throw new RuntimeException("Error", e);
}
}
// 使用
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> slowTask());
String result = getWithTimeout(future, 1000);
7.3 链式编排
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> fetchUserId())
.thenCompose(userId -> fetchUserInfo(userId))
.thenApply(user -> user.getName())
.thenApply(name -> name.toUpperCase())
.exceptionally(ex -> "Default User")
.whenComplete((name, ex) -> System.out.println("Done: " + name));
7.4 批量处理
List<Long> userIds = Arrays.asList(1L, 2L, 3L, 4L, 5L);
List<CompletableFuture<User>> futures = userIds.stream()
.map(id -> CompletableFuture.supplyAsync(() -> userService.getUser(id)))
.collect(Collectors.toList());
// 等待所有完成
CompletableFuture<Void> all = CompletableFuture.allOf(
futures.toArray(CompletableFuture[]::new)
);
// 收集结果
List<User> users = all.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
).join();
八、最佳实践
8.1 使用自定义线程池
// ❌ 默认 ForkJoinPool(可能线程不足)
CompletableFuture.supplyAsync(() -> task());
// ✅ 自定义线程池
ExecutorService executor = new ThreadPoolExecutor(
10, 50, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("async-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
CompletableFuture.supplyAsync(() -> task(), executor);
8.2 避免阻塞
// ❌ 阻塞等待
result = future.get();
// ✅ 链式调用
future.thenAccept(result -> {
// 处理结果
});
8.3 异常必须处理
// ❌ 不处理异常(可能吞掉)
CompletableFuture.supplyAsync(() -> task());
// ✅ 处理异常
CompletableFuture.supplyAsync(() -> task())
.exceptionally(ex -> {
log.error("Error", ex);
return defaultValue;
});
8.4 设置超时
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> task())
.orTimeout(5, TimeUnit.SECONDS); // JDK 9+
九、总结
CompletableFuture 核心方法:
| 方法 | 作用 | 是否异步 |
|---|---|---|
| supplyAsync | 异步执行 | ✅ |
| thenApply | 转换结果 | ❌ |
| thenAccept | 消费结果 | ❌ |
| thenCompose | 依赖编排 | ✅ |
| thenCombine | 合并结果 | ✅ |
| allOf | 等待所有 | ❌ |
| anyOf | 任一完成 | ❌ |
| exceptionally | 异常兜底 | ❌ |
| handle | 无论异常 | ❌ |
CompletableFuture 是异步编程利器,掌握它能大幅提升系统并发性能。