前言
异步编程是提升应用吞吐量的重要手段。Spring Boot 提供了@Async、CompletableFuture 等多种异步编程方式。本文将介绍 Spring Boot 异步编程的完整方案。
@Async 基础
1. 启用异步
@SpringBootApplication
@EnableAsync
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
2. 基础使用
@Service
public class AsyncService {
/**
* 异步方法
*/
@Async
public void asyncMethod() {
log.info("异步执行:{}", Thread.currentThread().getName());
// 业务逻辑
}
/**
* 带返回值的异步方法
*/
@Async
public CompletableFuture<String> asyncWithReturn() {
log.info("异步执行:{}", Thread.currentThread().getName());
return CompletableFuture.completedFuture("result");
}
/**
* 带参数的异步方法
*/
@Async
public CompletableFuture<UserDTO> asyncWithParam(Long userId) {
log.info("异步执行:{}", Thread.currentThread().getName());
User user = userRepository.findById(userId).orElse(null);
return CompletableFuture.completedFuture(convertToDTO(user));
}
}
3. 调用异步方法
@Service
@RequiredArgsConstructor
public class BusinessService {
private final AsyncService asyncService;
public void process() {
// 异步调用(注意:同类调用无效)
asyncService.asyncMethod();
// 获取返回值
CompletableFuture<String> future = asyncService.asyncWithReturn();
String result = future.join(); // 阻塞等待
// 非阻塞处理
asyncService.asyncWithReturn()
.thenAccept(result -> log.info("结果:{}", result))
.exceptionally(ex -> {
log.error("异常", ex);
return null;
});
}
}
线程池配置
1. 默认配置
spring:
task:
execution:
pool:
core-size: 8
max-size: 100
queue-capacity: 100
keep-alive: 10s
thread-name-prefix: async-
2. 自定义线程池
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
@Bean(name = "ioExecutor")
public Executor ioExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(200);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("io-");
executor.initialize();
return executor;
}
@Bean(name = "cpuExecutor")
public Executor cpuExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int cores = Runtime.getRuntime().availableProcessors();
executor.setCorePoolSize(cores);
executor.setMaxPoolSize(cores * 2);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("cpu-");
executor.initialize();
return executor;
}
}
3. 使用指定线程池
@Service
public class AsyncService {
/**
* 使用自定义线程池
*/
@Async("ioExecutor")
public CompletableFuture<String> ioTask() {
return CompletableFuture.completedFuture("io result");
}
@Async("cpuExecutor")
public CompletableFuture<String> cpuTask() {
return CompletableFuture.completedFuture("cpu result");
}
}
CompletableFuture
1. 基础使用
@Service
public class CompletableFutureService {
/**
* 创建 CompletableFuture
*/
public CompletableFuture<String> supplyAsync() {
return CompletableFuture.supplyAsync(() -> {
return "result";
});
}
/**
* 链式调用
*/
public CompletableFuture<String> chain() {
return CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase)
.thenAccept(System.out::println)
.thenRun(() -> System.out.println("Done"));
}
/**
* 异常处理
*/
public CompletableFuture<String> handleException() {
return CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("error");
return "result";
})
.exceptionally(ex -> "default value")
.handle((result, ex) -> {
if (ex != null) {
log.error("异常", ex);
return "error handler";
}
return result;
});
}
}
2. 组合多个 Future
@Service
public class CompletableFutureService {
/**
* 等待所有完成
*/
public CompletableFuture<Void> allOf(
CompletableFuture<String> future1,
CompletableFuture<String> future2
) {
return CompletableFuture.allOf(future1, future2)
.thenRun(() -> {
String result1 = future1.join();
String result2 = future2.join();
log.info("全部完成:{}, {}", result1, result2);
});
}
/**
* 任一完成即可
*/
public CompletableFuture<Object> anyOf(
CompletableFuture<String> future1,
CompletableFuture<String> future2
) {
return CompletableFuture.anyOf(future1, future2)
.thenApply(result -> {
log.info("任一完成:{}", result);
return result;
});
}
/**
* 组合结果
*/
public CompletableFuture<String> thenCombine(
CompletableFuture<String> future1,
CompletableFuture<String> future2
) {
return future1.thenCombine(future2, (r1, r2) -> {
return r1 + " + " + r2;
});
}
/**
* 顺序执行
*/
public CompletableFuture<String> thenCompose(
CompletableFuture<String> future1
) {
return future1.thenCompose(result -> {
return CompletableFuture.supplyAsync(() -> {
return result + " composed";
});
});
}
}
3. 超时处理
@Service
public class CompletableFutureService {
/**
* 超时处理
*/
public CompletableFuture<String> withTimeout() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "result";
})
.orTimeout(1, TimeUnit.SECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
return "timeout";
}
return "error";
});
}
/**
* 超时备用
*/
public CompletableFuture<String> completeOnTimeout() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "result";
})
.completeOnTimeout("default", 1, TimeUnit.SECONDS);
}
}
实际应用场景
1. 并行调用外部服务
@Service
@RequiredArgsConstructor
public class UserService {
private final ExternalService externalService;
/**
* 串行调用(慢)
*/
public UserDTO getUserSerial(Long userId) {
User user = userRepository.findById(userId).orElse(null);
UserInfo info1 = externalService.getInfo1(userId);
UserInfo info2 = externalService.getInfo2(userId);
UserInfo info3 = externalService.getInfo3(userId);
return merge(user, info1, info2, info3);
}
/**
* 并行调用(快)
*/
public CompletableFuture<UserDTO> getUserParallel(Long userId) {
CompletableFuture<User> userFuture =
CompletableFuture.supplyAsync(() ->
userRepository.findById(userId).orElse(null)
);
CompletableFuture<UserInfo> info1Future =
CompletableFuture.supplyAsync(() ->
externalService.getInfo1(userId)
);
CompletableFuture<UserInfo> info2Future =
CompletableFuture.supplyAsync(() ->
externalService.getInfo2(userId)
);
CompletableFuture<UserInfo> info3Future =
CompletableFuture.supplyAsync(() ->
externalService.getInfo3(userId)
);
return CompletableFuture.allOf(
userFuture, info1Future, info2Future, info3Future
)
.thenApply(v -> merge(
userFuture.join(),
info1Future.join(),
info2Future.join(),
info3Future.join()
));
}
}
2. 批量处理
@Service
public class BatchService {
@Autowired
private Executor taskExecutor;
/**
* 批量异步处理
*/
public CompletableFuture<List<Result>> batchProcess(List<Long> ids) {
List<CompletableFuture<Result>> futures = ids.stream()
.map(id -> CompletableFuture.supplyAsync(() -> {
return processItem(id);
}, taskExecutor))
.collect(Collectors.toList());
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
)
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
private Result processItem(Long id) {
// 处理单个项目
return new Result(id, "processed");
}
}
3. 流水线处理
@Service
public class PipelineService {
/**
* 数据流水线
*/
public CompletableFuture<FinalResult> pipeline(InputData input) {
return CompletableFuture.supplyAsync(() -> {
// 步骤 1: 数据验证
return validate(input);
})
.thenApplyAsync(validated -> {
// 步骤 2: 数据转换
return transform(validated);
})
.thenApplyAsync(transformed -> {
// 步骤 3: 业务处理
return process(transformed);
})
.thenApplyAsync(processed -> {
// 步骤 4: 结果封装
return envelope(processed);
})
.exceptionally(ex -> {
log.error("流水线处理失败", ex);
return FinalResult.error(ex.getMessage());
});
}
}
响应式编程
1. WebFlux 异步
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserRepository userRepository;
/**
* 响应式查询
*/
@GetMapping
public Flux<UserDTO> getUsers() {
return userRepository.findAll()
.map(this::convertToDTO);
}
/**
* 响应式详情
*/
@GetMapping("/{id}")
public Mono<UserDTO> getUser(@PathVariable Long id) {
return userRepository.findById(id)
.map(this::convertToDTO);
}
/**
* 响应式创建
*/
@PostMapping
public Mono<UserDTO> createUser(@RequestBody Mono<UserCreateDTO> dto) {
return dto.map(this::convert)
.flatMap(userRepository::save)
.map(this::convertToDTO);
}
}
2. 异步转换
@Service
public class ReactiveService {
/**
* Mono 转 CompletableFuture
*/
public CompletableFuture<String> monoToFuture(Mono<String> mono) {
return mono.toFuture();
}
/**
* CompletableFuture 转 Mono
*/
public Mono<String> futureToMono(CompletableFuture<String> future) {
return Mono.fromFuture(future);
}
/**
* Flux 转 CompletableFuture
*/
public CompletableFuture<List<String>> fluxToFuture(Flux<String> flux) {
return flux.collectList().toFuture();
}
}
最佳实践
1. 避免阻塞
// ✅ 推荐
@Async
public CompletableFuture<String> asyncMethod() {
return CompletableFuture.supplyAsync(() -> {
return doSomething();
});
}
// ❌ 不推荐
@Async
public String asyncMethod() {
return doSomething(); // 没有真正异步
}
2. 异常处理
// ✅ 推荐
@Async
public CompletableFuture<String> asyncMethod() {
return CompletableFuture.supplyAsync(() -> {
return doSomething();
})
.exceptionally(ex -> {
log.error("异步执行失败", ex);
return "default";
});
}
// ❌ 不推荐
@Async
public CompletableFuture<String> asyncMethod() {
return CompletableFuture.supplyAsync(() -> {
return doSomething();
});
// 没有异常处理
}
3. 线程隔离
@Configuration
public class AsyncConfig {
@Bean("ioExecutor")
public Executor ioExecutor() {
// IO 密集型使用虚拟线程
return Executors.newVirtualThreadPerTaskExecutor();
}
@Bean("cpuExecutor")
public Executor cpuExecutor() {
// CPU 密集型使用固定线程池
int cores = Runtime.getRuntime().availableProcessors();
return Executors.newFixedThreadPool(cores);
}
}
4. 监控异步任务
@Component
public class AsyncMonitor implements AsyncUncaughtExceptionHandler {
private final MeterRegistry meterRegistry;
@Override
public void handleUncaughtException(
Throwable ex,
Method method,
Object... params
) {
meterRegistry.counter("async.error",
"method", method.getName()
).increment();
log.error("异步任务异常:{}", method.getName(), ex);
}
}
总结
异步编程要点:
- ✅ @Async - 声明式异步
- ✅ 线程池 - 合理配置、隔离使用
- ✅ CompletableFuture - 组合、链式调用
- ✅ 响应式 - WebFlux、Reactive
- ✅ 最佳实践 - 避免阻塞、异常处理
异步编程是提升应用性能的关键技术。