Skip to content
清晨的一缕阳光
返回

Spring Boot 异步编程实战

前言

异步编程是提升应用吞吐量的重要手段。Spring Boot 提供了@Async、CompletableFuture 等多种异步编程方式。本文将介绍 Spring Boot 异步编程的完整方案。

@Async 基础

1. 启用异步

@SpringBootApplication
@EnableAsync
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

2. 基础使用

@Service
public class AsyncService {
    
    /**
     * 异步方法
     */
    @Async
    public void asyncMethod() {
        log.info("异步执行:{}", Thread.currentThread().getName());
        // 业务逻辑
    }
    
    /**
     * 带返回值的异步方法
     */
    @Async
    public CompletableFuture<String> asyncWithReturn() {
        log.info("异步执行:{}", Thread.currentThread().getName());
        return CompletableFuture.completedFuture("result");
    }
    
    /**
     * 带参数的异步方法
     */
    @Async
    public CompletableFuture<UserDTO> asyncWithParam(Long userId) {
        log.info("异步执行:{}", Thread.currentThread().getName());
        User user = userRepository.findById(userId).orElse(null);
        return CompletableFuture.completedFuture(convertToDTO(user));
    }
}

3. 调用异步方法

@Service
@RequiredArgsConstructor
public class BusinessService {
    
    private final AsyncService asyncService;
    
    public void process() {
        // 异步调用(注意:同类调用无效)
        asyncService.asyncMethod();
        
        // 获取返回值
        CompletableFuture<String> future = asyncService.asyncWithReturn();
        String result = future.join(); // 阻塞等待
        
        // 非阻塞处理
        asyncService.asyncWithReturn()
            .thenAccept(result -> log.info("结果:{}", result))
            .exceptionally(ex -> {
                log.error("异常", ex);
                return null;
            });
    }
}

线程池配置

1. 默认配置

spring:
  task:
    execution:
      pool:
        core-size: 8
        max-size: 100
        queue-capacity: 100
        keep-alive: 10s
      thread-name-prefix: async-

2. 自定义线程池

@Configuration
@EnableAsync
public class AsyncConfig {
    
    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("async-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
    
    @Bean(name = "ioExecutor")
    public Executor ioExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20);
        executor.setMaxPoolSize(200);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("io-");
        executor.initialize();
        return executor;
    }
    
    @Bean(name = "cpuExecutor")
    public Executor cpuExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        int cores = Runtime.getRuntime().availableProcessors();
        executor.setCorePoolSize(cores);
        executor.setMaxPoolSize(cores * 2);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("cpu-");
        executor.initialize();
        return executor;
    }
}

3. 使用指定线程池

@Service
public class AsyncService {
    
    /**
     * 使用自定义线程池
     */
    @Async("ioExecutor")
    public CompletableFuture<String> ioTask() {
        return CompletableFuture.completedFuture("io result");
    }
    
    @Async("cpuExecutor")
    public CompletableFuture<String> cpuTask() {
        return CompletableFuture.completedFuture("cpu result");
    }
}

CompletableFuture

1. 基础使用

@Service
public class CompletableFutureService {
    
    /**
     * 创建 CompletableFuture
     */
    public CompletableFuture<String> supplyAsync() {
        return CompletableFuture.supplyAsync(() -> {
            return "result";
        });
    }
    
    /**
     * 链式调用
     */
    public CompletableFuture<String> chain() {
        return CompletableFuture.supplyAsync(() -> "Hello")
            .thenApply(s -> s + " World")
            .thenApply(String::toUpperCase)
            .thenAccept(System.out::println)
            .thenRun(() -> System.out.println("Done"));
    }
    
    /**
     * 异常处理
     */
    public CompletableFuture<String> handleException() {
        return CompletableFuture.supplyAsync(() -> {
            if (true) throw new RuntimeException("error");
            return "result";
        })
        .exceptionally(ex -> "default value")
        .handle((result, ex) -> {
            if (ex != null) {
                log.error("异常", ex);
                return "error handler";
            }
            return result;
        });
    }
}

2. 组合多个 Future

@Service
public class CompletableFutureService {
    
    /**
     * 等待所有完成
     */
    public CompletableFuture<Void> allOf(
        CompletableFuture<String> future1,
        CompletableFuture<String> future2
    ) {
        return CompletableFuture.allOf(future1, future2)
            .thenRun(() -> {
                String result1 = future1.join();
                String result2 = future2.join();
                log.info("全部完成:{}, {}", result1, result2);
            });
    }
    
    /**
     * 任一完成即可
     */
    public CompletableFuture<Object> anyOf(
        CompletableFuture<String> future1,
        CompletableFuture<String> future2
    ) {
        return CompletableFuture.anyOf(future1, future2)
            .thenApply(result -> {
                log.info("任一完成:{}", result);
                return result;
            });
    }
    
    /**
     * 组合结果
     */
    public CompletableFuture<String> thenCombine(
        CompletableFuture<String> future1,
        CompletableFuture<String> future2
    ) {
        return future1.thenCombine(future2, (r1, r2) -> {
            return r1 + " + " + r2;
        });
    }
    
    /**
     * 顺序执行
     */
    public CompletableFuture<String> thenCompose(
        CompletableFuture<String> future1
    ) {
        return future1.thenCompose(result -> {
            return CompletableFuture.supplyAsync(() -> {
                return result + " composed";
            });
        });
    }
}

3. 超时处理

@Service
public class CompletableFutureService {
    
    /**
     * 超时处理
     */
    public CompletableFuture<String> withTimeout() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "result";
        })
        .orTimeout(1, TimeUnit.SECONDS)
        .exceptionally(ex -> {
            if (ex instanceof TimeoutException) {
                return "timeout";
            }
            return "error";
        });
    }
    
    /**
     * 超时备用
     */
    public CompletableFuture<String> completeOnTimeout() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "result";
        })
        .completeOnTimeout("default", 1, TimeUnit.SECONDS);
    }
}

实际应用场景

1. 并行调用外部服务

@Service
@RequiredArgsConstructor
public class UserService {
    
    private final ExternalService externalService;
    
    /**
     * 串行调用(慢)
     */
    public UserDTO getUserSerial(Long userId) {
        User user = userRepository.findById(userId).orElse(null);
        UserInfo info1 = externalService.getInfo1(userId);
        UserInfo info2 = externalService.getInfo2(userId);
        UserInfo info3 = externalService.getInfo3(userId);
        
        return merge(user, info1, info2, info3);
    }
    
    /**
     * 并行调用(快)
     */
    public CompletableFuture<UserDTO> getUserParallel(Long userId) {
        CompletableFuture<User> userFuture = 
            CompletableFuture.supplyAsync(() -> 
                userRepository.findById(userId).orElse(null)
            );
        
        CompletableFuture<UserInfo> info1Future = 
            CompletableFuture.supplyAsync(() -> 
                externalService.getInfo1(userId)
            );
        
        CompletableFuture<UserInfo> info2Future = 
            CompletableFuture.supplyAsync(() -> 
                externalService.getInfo2(userId)
            );
        
        CompletableFuture<UserInfo> info3Future = 
            CompletableFuture.supplyAsync(() -> 
                externalService.getInfo3(userId)
            );
        
        return CompletableFuture.allOf(
                userFuture, info1Future, info2Future, info3Future
            )
            .thenApply(v -> merge(
                userFuture.join(),
                info1Future.join(),
                info2Future.join(),
                info3Future.join()
            ));
    }
}

2. 批量处理

@Service
public class BatchService {
    
    @Autowired
    private Executor taskExecutor;
    
    /**
     * 批量异步处理
     */
    public CompletableFuture<List<Result>> batchProcess(List<Long> ids) {
        List<CompletableFuture<Result>> futures = ids.stream()
            .map(id -> CompletableFuture.supplyAsync(() -> {
                return processItem(id);
            }, taskExecutor))
            .collect(Collectors.toList());
        
        return CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0])
            )
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
    }
    
    private Result processItem(Long id) {
        // 处理单个项目
        return new Result(id, "processed");
    }
}

3. 流水线处理

@Service
public class PipelineService {
    
    /**
     * 数据流水线
     */
    public CompletableFuture<FinalResult> pipeline(InputData input) {
        return CompletableFuture.supplyAsync(() -> {
                // 步骤 1: 数据验证
                return validate(input);
            })
            .thenApplyAsync(validated -> {
                // 步骤 2: 数据转换
                return transform(validated);
            })
            .thenApplyAsync(transformed -> {
                // 步骤 3: 业务处理
                return process(transformed);
            })
            .thenApplyAsync(processed -> {
                // 步骤 4: 结果封装
                return envelope(processed);
            })
            .exceptionally(ex -> {
                log.error("流水线处理失败", ex);
                return FinalResult.error(ex.getMessage());
            });
    }
}

响应式编程

1. WebFlux 异步

@RestController
@RequestMapping("/api/users")
public class UserController {
    
    private final UserRepository userRepository;
    
    /**
     * 响应式查询
     */
    @GetMapping
    public Flux<UserDTO> getUsers() {
        return userRepository.findAll()
            .map(this::convertToDTO);
    }
    
    /**
     * 响应式详情
     */
    @GetMapping("/{id}")
    public Mono<UserDTO> getUser(@PathVariable Long id) {
        return userRepository.findById(id)
            .map(this::convertToDTO);
    }
    
    /**
     * 响应式创建
     */
    @PostMapping
    public Mono<UserDTO> createUser(@RequestBody Mono<UserCreateDTO> dto) {
        return dto.map(this::convert)
            .flatMap(userRepository::save)
            .map(this::convertToDTO);
    }
}

2. 异步转换

@Service
public class ReactiveService {
    
    /**
     * Mono 转 CompletableFuture
     */
    public CompletableFuture<String> monoToFuture(Mono<String> mono) {
        return mono.toFuture();
    }
    
    /**
     * CompletableFuture 转 Mono
     */
    public Mono<String> futureToMono(CompletableFuture<String> future) {
        return Mono.fromFuture(future);
    }
    
    /**
     * Flux 转 CompletableFuture
     */
    public CompletableFuture<List<String>> fluxToFuture(Flux<String> flux) {
        return flux.collectList().toFuture();
    }
}

最佳实践

1. 避免阻塞

// ✅ 推荐
@Async
public CompletableFuture<String> asyncMethod() {
    return CompletableFuture.supplyAsync(() -> {
        return doSomething();
    });
}

// ❌ 不推荐
@Async
public String asyncMethod() {
    return doSomething(); // 没有真正异步
}

2. 异常处理

// ✅ 推荐
@Async
public CompletableFuture<String> asyncMethod() {
    return CompletableFuture.supplyAsync(() -> {
        return doSomething();
    })
    .exceptionally(ex -> {
        log.error("异步执行失败", ex);
        return "default";
    });
}

// ❌ 不推荐
@Async
public CompletableFuture<String> asyncMethod() {
    return CompletableFuture.supplyAsync(() -> {
        return doSomething();
    });
    // 没有异常处理
}

3. 线程隔离

@Configuration
public class AsyncConfig {
    
    @Bean("ioExecutor")
    public Executor ioExecutor() {
        // IO 密集型使用虚拟线程
        return Executors.newVirtualThreadPerTaskExecutor();
    }
    
    @Bean("cpuExecutor")
    public Executor cpuExecutor() {
        // CPU 密集型使用固定线程池
        int cores = Runtime.getRuntime().availableProcessors();
        return Executors.newFixedThreadPool(cores);
    }
}

4. 监控异步任务

@Component
public class AsyncMonitor implements AsyncUncaughtExceptionHandler {
    
    private final MeterRegistry meterRegistry;
    
    @Override
    public void handleUncaughtException(
        Throwable ex,
        Method method,
        Object... params
    ) {
        meterRegistry.counter("async.error", 
            "method", method.getName()
        ).increment();
        
        log.error("异步任务异常:{}", method.getName(), ex);
    }
}

总结

异步编程要点:

异步编程是提升应用性能的关键技术。


分享这篇文章到:

上一篇文章
事件驱动架构
下一篇文章
代理模式实战详解