Skip to content
清晨的一缕阳光
返回

CompletableFuture 异步编程实战

CompletableFuture 异步编程实战

CompletableFuture 是 Java 8 引入的异步编程工具,支持函数式编排、异常处理和组合操作,大幅提升并发性能。

一、为什么需要异步编程

1.1 同步编程问题

// 串行执行,总耗时 = 各任务之和
Result r1 = task1(); // 100ms
Result r2 = task2(); // 100ms
Result r3 = task3(); // 100ms
// 总耗时:300ms

1.2 异步编程优势

// 并行执行,总耗时 = 最慢任务
CompletableFuture<Result> f1 = supplyAsync(() -> task1());
CompletableFuture<Result> f2 = supplyAsync(() -> task2());
CompletableFuture<Result> f3 = supplyAsync(() -> task3());

CompletableFuture.allOf(f1, f2, f3).join();
// 总耗时:~100ms

二、核心概念

2.1 Future 的局限

// ❌ 阻塞获取结果
Future<Result> future = executor.submit(task);
Result result = future.get(); // 阻塞

// ❌ 无法手动完成
// ❌ 无法链式调用
// ❌ 无法组合多个 Future

2.2 CompletableFuture 特性

CompletableFuture
├── 手动完成(complete)
├── 链式调用(thenApply)
├── 组合操作(thenCombine)
├── 异常处理(exceptionally)
└── 异步编排(thenCompose)

三、创建 CompletableFuture

3.1 异步执行

// 使用 ForkJoinPool.commonPool()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Hello";
});

// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Hello";
}, executor);

3.2 手动完成

CompletableFuture<String> future = new CompletableFuture<>();

// 正常完成
future.complete("Result");

// 异常完成
future.completeExceptionally(new RuntimeException("Error"));

// 获取结果
String result = future.get();

四、结果转换

4.1 thenApply(同步转换)

CompletableFuture<Integer> future = CompletableFuture
    .supplyAsync(() -> "Hello")
    .thenApply(s -> s.length());

// 结果:5

4.2 thenApplyAsync(异步转换)

CompletableFuture<Integer> future = CompletableFuture
    .supplyAsync(() -> "Hello")
    .thenApplyAsync(s -> s.length()); // 异步执行

4.3 thenAccept(消费结果)

CompletableFuture<Void> future = CompletableFuture
    .supplyAsync(() -> "Hello")
    .thenAccept(s -> System.out.println(s));

4.4 thenRun(不关心结果)

CompletableFuture<Void> future = CompletableFuture
    .supplyAsync(() -> "Hello")
    .thenRun(() -> System.out.println("Done"));

五、任务编排

5.1 thenCompose(依赖编排)

// 任务 2 依赖任务 1 的结果
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> getUserId())
    .thenCompose(userId -> getUserInfo(userId));

// 等价于
getUserId().thenApplyAsync(id -> getUserInfo(id)).join();

5.2 thenCombine(合并结果)

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<String> result = f1.thenCombine(f2, (s1, s2) -> s1 + " " + s2);
// 结果:"Hello World"

5.3 allOf(等待所有完成)

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> task1());
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> task2());
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> task3());

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join(); // 等待所有完成

// 获取所有结果
List<String> results = Stream.of(f1, f2, f3)
    .map(CompletableFuture::join)
    .collect(Collectors.toList());

5.4 anyOf(任一完成)

CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
Object result = any.join(); // 返回最先完成的结果

六、异常处理

6.1 exceptionally(异常兜底)

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        throw new RuntimeException("Error");
    })
    .exceptionally(ex -> "Default Value");

// 结果:"Default Value"

6.2 handle(无论异常)

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        throw new RuntimeException("Error");
    })
    .handle((result, ex) -> {
        if (ex != null) {
            return "Error: " + ex.getMessage();
        }
        return result;
    });

6.3 whenComplete(回调)

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "Result")
    .whenComplete((result, ex) -> {
        if (ex == null) {
            System.out.println("Success: " + result);
        } else {
            System.out.println("Error: " + ex.getMessage());
        }
    });

七、实战场景

7.1 并行调用多个服务

public UserProfile getUserProfile(Long userId) {
    CompletableFuture<User> userFuture = CompletableFuture
        .supplyAsync(() -> userService.getUser(userId));
    
    CompletableFuture<Order> orderFuture = CompletableFuture
        .supplyAsync(() -> orderService.getRecentOrder(userId));
    
    CompletableFuture<Points> pointsFuture = CompletableFuture
        .supplyAsync(() -> pointsService.getPoints(userId));
    
    // 等待所有完成
    CompletableFuture.allOf(userFuture, orderFuture, pointsFuture).join();
    
    // 组装结果
    return UserProfile.builder()
        .user(userFuture.join())
        .order(orderFuture.join())
        .points(pointsFuture.join())
        .build();
}

7.2 超时控制

public <T> T getWithTimeout(CompletableFuture<T> future, long timeout) {
    try {
        return future.get(timeout, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        future.cancel(true);
        throw new RuntimeException("Timeout", e);
    } catch (Exception e) {
        throw new RuntimeException("Error", e);
    }
}

// 使用
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> slowTask());

String result = getWithTimeout(future, 1000);

7.3 链式编排

CompletableFuture<String> result = CompletableFuture
    .supplyAsync(() -> fetchUserId())
    .thenCompose(userId -> fetchUserInfo(userId))
    .thenApply(user -> user.getName())
    .thenApply(name -> name.toUpperCase())
    .exceptionally(ex -> "Default User")
    .whenComplete((name, ex) -> System.out.println("Done: " + name));

7.4 批量处理

List<Long> userIds = Arrays.asList(1L, 2L, 3L, 4L, 5L);

List<CompletableFuture<User>> futures = userIds.stream()
    .map(id -> CompletableFuture.supplyAsync(() -> userService.getUser(id)))
    .collect(Collectors.toList());

// 等待所有完成
CompletableFuture<Void> all = CompletableFuture.allOf(
    futures.toArray(CompletableFuture[]::new)
);

// 收集结果
List<User> users = all.thenApply(v -> 
    futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList())
).join();

八、最佳实践

8.1 使用自定义线程池

// ❌ 默认 ForkJoinPool(可能线程不足)
CompletableFuture.supplyAsync(() -> task());

// ✅ 自定义线程池
ExecutorService executor = new ThreadPoolExecutor(
    10, 50, 60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100),
    new ThreadFactoryBuilder().setNameFormat("async-%d").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

CompletableFuture.supplyAsync(() -> task(), executor);

8.2 避免阻塞

// ❌ 阻塞等待
result = future.get();

// ✅ 链式调用
future.thenAccept(result -> {
    // 处理结果
});

8.3 异常必须处理

// ❌ 不处理异常(可能吞掉)
CompletableFuture.supplyAsync(() -> task());

// ✅ 处理异常
CompletableFuture.supplyAsync(() -> task())
    .exceptionally(ex -> {
        log.error("Error", ex);
        return defaultValue;
    });

8.4 设置超时

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> task())
    .orTimeout(5, TimeUnit.SECONDS); // JDK 9+

九、总结

CompletableFuture 核心方法:

方法作用是否异步
supplyAsync异步执行
thenApply转换结果
thenAccept消费结果
thenCompose依赖编排
thenCombine合并结果
allOf等待所有
anyOf任一完成
exceptionally异常兜底
handle无论异常

CompletableFuture 是异步编程利器,掌握它能大幅提升系统并发性能。


分享这篇文章到:

上一篇文章
Java 内存模型 (JMM) 详解
下一篇文章
ReentrantLock 原理与实战