前言
WebFlux 是 Spring 5 引入的响应式 Web 框架,基于 Reactor 实现非阻塞 IO。本文将介绍 Spring Boot 集成 WebFlux 的完整方案。
快速开始
1. 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2. 基础配置
server:
port: 8080
netty:
connection-timeout: 30s
idle-timeout: 60s
3. 创建 Controller
@RestController
@RequestMapping("/api/users")
@RequiredArgsConstructor
public class UserController {
private final UserService userService;
/**
* 返回 Mono
*/
@GetMapping("/{id}")
public Mono<UserDTO> getUser(@PathVariable Long id) {
return userService.findById(id);
}
/**
* 返回 Flux
*/
@GetMapping
public Flux<UserDTO> getUsers() {
return userService.findAll();
}
/**
* 创建用户
*/
@PostMapping
public Mono<UserDTO> createUser(@RequestBody Mono<UserCreateDTO> dto) {
return userService.create(dto);
}
}
4. 创建 Service
@Service
@RequiredArgsConstructor
public class UserService {
private final UserRepository userRepository;
public Mono<UserDTO> findById(Long id) {
return userRepository.findById(id)
.map(this::convertToDTO)
.switchIfEmpty(Mono.error(
BusinessException.notFound("用户")
));
}
public Flux<UserDTO> findAll() {
return userRepository.findAll()
.map(this::convertToDTO);
}
public Mono<UserDTO> create(Mono<UserCreateDTO> dtoMono) {
return dtoMono
.map(this::convert)
.flatMap(userRepository::save)
.map(this::convertToDTO);
}
}
5. 创建 Repository
@Repository
public interface UserRepository
extends ReactiveCrudRepository<User, Long> {
Flux<User> findByUsernameContaining(String keyword);
Mono<User> findByEmail(String email);
Flux<User> findByAgeBetween(int min, int max);
}
Reactor 核心
1. Mono 和 Flux
@Service
public class ReactorService {
/**
* Mono - 0 或 1 个元素
*/
public Mono<String> monoExample() {
// 从值创建
return Mono.just("value");
// 空 Mono
// return Mono.empty();
// 从 Optional 创建
// return Mono.fromOptional(Optional.of("value"));
// 错误
// return Mono.error(new RuntimeException("error"));
}
/**
* Flux - 0 到 N 个元素
*/
public Flux<String> fluxExample() {
// 从值创建
return Flux.just("a", "b", "c");
// 从数组创建
// return Flux.fromArray(new String[]{"a", "b"});
// 从列表创建
// return Flux.fromIterable(List.of("a", "b"));
// 从流创建
// return Flux.fromStream(Stream.of("a", "b"));
}
}
2. 操作符
@Service
public class ReactorOperators {
/**
* 转换操作
*/
public Mono<String> transform(Mono<User> userMono) {
return userMono
.map(User::getUsername) // 映射
.defaultIfEmpty("anonymous") // 默认值
.switchIfEmpty(Mono.just("guest")) // 备用值
.filter(name -> name.length() > 0) // 过滤
.map(String::toUpperCase); // 链式
}
/**
* 合并操作
*/
public Flux<String> merge(Flux<String> flux1, Flux<String> flux2) {
// 合并(交织)
return Flux.merge(flux1, flux2);
// 连接(先 flux1 后 flux2)
// return flux1.concatWith(flux2);
// 组合
// return flux1.zipWith(flux2);
}
/**
* 错误处理
*/
public Mono<String> handleError(Mono<String> mono) {
return mono
.onErrorReturn("default") // 返回默认值
.onErrorResume(ex -> Mono.just("fallback")) // 备用 Mono
.onErrorMap(ex -> new BusinessException(ex)); // 转换异常
}
/**
* 重试
*/
public Mono<String> retry(Mono<String> mono) {
return mono
.retry(3) // 重试 3 次
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))); // 退避重试
}
}
3. 调度器
@Service
public class SchedulerService {
/**
* 指定执行线程
*/
public Mono<String> subscribeOn() {
return Mono.fromCallable(() -> {
return blockingOperation();
})
.subscribeOn(Schedulers.boundedElastic()); // IO 密集型
}
/**
* 指定回调线程
*/
public Mono<String> publishOn() {
return Flux.range(1, 10)
.publishOn(Schedulers.parallel()) // CPU 密集型
.map(i -> i * 2)
.publishOn(Schedulers.single()) // 顺序执行
.next();
}
}
函数式端点
1. 定义路由
@Configuration
public class UserRouter {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions
.route(GET("/api/users"), handler::getAll)
.andRoute(GET("/api/users/{id}"), handler::getById)
.andRoute(POST("/api/users"), handler::create)
.andRoute(PUT("/api/users/{id}"), handler::update)
.andRoute(DELETE("/api/users/{id}"), handler::delete);
}
}
2. 创建 Handler
@Component
public class UserHandler {
private final UserService userService;
public Mono<ServerResponse> getAll(ServerRequest request) {
return userService.findAll()
.collectList()
.flatMap(users -> ServerResponse.ok().bodyValue(users));
}
public Mono<ServerResponse> getById(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
return userService.findById(id)
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> create(ServerRequest request) {
return request.bodyToMono(UserCreateDTO.class)
.flatMap(userService::create)
.flatMap(user -> ServerResponse
.status(HttpStatus.CREATED)
.bodyValue(user));
}
}
WebSocket
1. 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2. 创建 WebSocket Handler
@Component
public class WebSocketHandler implements org.springframework.web.reactive.socket.WebSocketHandler {
private final ConcurrentHashMap<String, WebSocketSession> sessions =
new ConcurrentHashMap<>();
@Override
public Mono<Void> handle(WebSocketSession session) {
// 保存会话
sessions.put(session.getId(), session);
// 接收消息
Mono<Void> receive = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(message -> log.info("收到消息:{}", message))
.then();
// 发送消息
Mono<Void> send = session.send(
Flux.interval(Duration.ofSeconds(1))
.map(seq -> session.textMessage("Heartbeat: " + seq))
);
return Mono.zip(receive, send).then();
}
/**
* 广播消息
*/
public void broadcast(String message) {
sessions.values().forEach(session -> {
session.send(Mono.just(session.textMessage(message)))
.subscribe();
});
}
}
3. 配置 WebSocket
@Configuration
public class WebSocketConfig {
@Bean
public SimpleUrlHandlerMapping webSocketMapping(WebSocketHandler handler) {
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(Map.of("/ws", handler));
mapping.setOrder(10);
return mapping;
}
}
4. 前端连接
const ws = new WebSocket('ws://localhost:8080/ws');
ws.onopen = () => {
console.log('连接成功');
ws.send('Hello');
};
ws.onmessage = (event) => {
console.log('收到消息:', event.data);
};
ws.onerror = (error) => {
console.error('错误:', error);
};
性能优化
1. 连接池配置
spring:
webflux:
max-in-memory-size: 262144
r2dbc:
pool:
max-size: 10
initial-size: 5
max-idle-time: 30m
max-life-time: 60m
2. 超时配置
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(
HttpClient.create()
.responseTimeout(Duration.ofSeconds(30))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
))
.build();
}
}
3. 缓存配置
@Configuration
public class CacheConfig {
@Bean
public WebFilter cacheWebFilter() {
return (exchange, chain) -> {
ServerHttpRequest request = exchange.getRequest();
if (request.getMethod() == HttpMethod.GET) {
exchange.getResponse().getHeaders().setCacheControl(
CacheControl.maxAge(Duration.ofMinutes(10))
);
}
return chain.filter(exchange);
};
}
}
4. 背压处理
@Service
public class BackpressureService {
/**
* 限制速率
*/
public Flux<String> rateLimit(Flux<String> flux) {
return flux.limitRate(10); // 请求 10 个
}
/**
* 限制数量
*/
public Flux<String> limitN(Flux<String> flux) {
return flux.take(100); // 只取 100 个
}
/**
* 限流
*/
public Flux<String> throttle(Flux<String> flux) {
return flux.delayElements(Duration.ofMillis(100)); // 每 100ms 一个
}
}
最佳实践
1. 避免阻塞
// ✅ 推荐
@Service
public class ReactiveService {
public Mono<String> getData() {
return Mono.fromCallable(() -> blockingCall())
.subscribeOn(Schedulers.boundedElastic());
}
}
// ❌ 不推荐
@Service
public class BlockingService {
public Mono<String> getData() {
return Mono.fromCallable(() -> {
return blockingCall(); // 阻塞响应式线程
});
}
}
2. 错误处理
// ✅ 推荐
@GetMapping("/api/users/{id}")
public Mono<UserDTO> getUser(@PathVariable Long id) {
return userService.findById(id)
.onErrorResume(NotFoundException.class, ex ->
Mono.error(new BusinessException("用户不存在"))
)
.onErrorTimeout(Duration.ofSeconds(5),
() -> Mono.error(new BusinessException("超时"))
);
}
// ❌ 不推荐
@GetMapping("/api/users/{id}")
public Mono<UserDTO> getUser(@PathVariable Long id) {
return userService.findById(id);
// 没有错误处理
}
3. 日志记录
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.filter(logRequest())
.filter(logResponse())
.build();
}
private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
log.info("请求:{} {}", clientRequest.method(), clientRequest.url());
return Mono.just(clientRequest);
});
}
private ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
log.info("响应:{}", clientResponse.statusCode());
return Mono.just(clientResponse);
});
}
}
4. 测试
@WebFluxTest(UserController.class)
class UserControllerTest {
@Autowired
private WebTestClient webTestClient;
@MockBean
private UserService userService;
@Test
void testGetUser() {
UserDTO user = new UserDTO(1L, "test");
when(userService.findById(1L))
.thenReturn(Mono.just(user));
webTestClient.get()
.uri("/api/users/1")
.exchange()
.expectStatus().isOk()
.expectBody(UserDTO.class)
.isEqualTo(user);
}
}
总结
WebFlux 要点:
- ✅ Reactor - Mono、Flux、操作符
- ✅ Controller - 响应式端点
- ✅ 函数式 - RouterFunction、Handler
- ✅ WebSocket - 实时通信
- ✅ 性能优化 - 连接池、超时、背压
- ✅ 最佳实践 - 避免阻塞、错误处理
WebFlux 是高并发场景的理想选择。