Skip to content
清晨的一缕阳光
返回

Spring Boot WebFlux 响应式编程

前言

WebFlux 是 Spring 5 引入的响应式 Web 框架,基于 Reactor 实现非阻塞 IO。本文将介绍 Spring Boot 集成 WebFlux 的完整方案。

快速开始

1. 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

2. 基础配置

server:
  port: 8080
  netty:
    connection-timeout: 30s
    idle-timeout: 60s

3. 创建 Controller

@RestController
@RequestMapping("/api/users")
@RequiredArgsConstructor
public class UserController {
    
    private final UserService userService;
    
    /**
     * 返回 Mono
     */
    @GetMapping("/{id}")
    public Mono<UserDTO> getUser(@PathVariable Long id) {
        return userService.findById(id);
    }
    
    /**
     * 返回 Flux
     */
    @GetMapping
    public Flux<UserDTO> getUsers() {
        return userService.findAll();
    }
    
    /**
     * 创建用户
     */
    @PostMapping
    public Mono<UserDTO> createUser(@RequestBody Mono<UserCreateDTO> dto) {
        return userService.create(dto);
    }
}

4. 创建 Service

@Service
@RequiredArgsConstructor
public class UserService {
    
    private final UserRepository userRepository;
    
    public Mono<UserDTO> findById(Long id) {
        return userRepository.findById(id)
            .map(this::convertToDTO)
            .switchIfEmpty(Mono.error(
                BusinessException.notFound("用户")
            ));
    }
    
    public Flux<UserDTO> findAll() {
        return userRepository.findAll()
            .map(this::convertToDTO);
    }
    
    public Mono<UserDTO> create(Mono<UserCreateDTO> dtoMono) {
        return dtoMono
            .map(this::convert)
            .flatMap(userRepository::save)
            .map(this::convertToDTO);
    }
}

5. 创建 Repository

@Repository
public interface UserRepository 
        extends ReactiveCrudRepository<User, Long> {
    
    Flux<User> findByUsernameContaining(String keyword);
    
    Mono<User> findByEmail(String email);
    
    Flux<User> findByAgeBetween(int min, int max);
}

Reactor 核心

1. Mono 和 Flux

@Service
public class ReactorService {
    
    /**
     * Mono - 0 或 1 个元素
     */
    public Mono<String> monoExample() {
        // 从值创建
        return Mono.just("value");
        
        // 空 Mono
        // return Mono.empty();
        
        // 从 Optional 创建
        // return Mono.fromOptional(Optional.of("value"));
        
        // 错误
        // return Mono.error(new RuntimeException("error"));
    }
    
    /**
     * Flux - 0 到 N 个元素
     */
    public Flux<String> fluxExample() {
        // 从值创建
        return Flux.just("a", "b", "c");
        
        // 从数组创建
        // return Flux.fromArray(new String[]{"a", "b"});
        
        // 从列表创建
        // return Flux.fromIterable(List.of("a", "b"));
        
        // 从流创建
        // return Flux.fromStream(Stream.of("a", "b"));
    }
}

2. 操作符

@Service
public class ReactorOperators {
    
    /**
     * 转换操作
     */
    public Mono<String> transform(Mono<User> userMono) {
        return userMono
            .map(User::getUsername)           // 映射
            .defaultIfEmpty("anonymous")      // 默认值
            .switchIfEmpty(Mono.just("guest")) // 备用值
            .filter(name -> name.length() > 0) // 过滤
            .map(String::toUpperCase);         // 链式
    }
    
    /**
     * 合并操作
     */
    public Flux<String> merge(Flux<String> flux1, Flux<String> flux2) {
        // 合并(交织)
        return Flux.merge(flux1, flux2);
        
        // 连接(先 flux1 后 flux2)
        // return flux1.concatWith(flux2);
        
        // 组合
        // return flux1.zipWith(flux2);
    }
    
    /**
     * 错误处理
     */
    public Mono<String> handleError(Mono<String> mono) {
        return mono
            .onErrorReturn("default")              // 返回默认值
            .onErrorResume(ex -> Mono.just("fallback")) // 备用 Mono
            .onErrorMap(ex -> new BusinessException(ex)); // 转换异常
    }
    
    /**
     * 重试
     */
    public Mono<String> retry(Mono<String> mono) {
        return mono
            .retry(3)                              // 重试 3 次
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))); // 退避重试
    }
}

3. 调度器

@Service
public class SchedulerService {
    
    /**
     * 指定执行线程
     */
    public Mono<String> subscribeOn() {
        return Mono.fromCallable(() -> {
            return blockingOperation();
        })
        .subscribeOn(Schedulers.boundedElastic()); // IO 密集型
    }
    
    /**
     * 指定回调线程
     */
    public Mono<String> publishOn() {
        return Flux.range(1, 10)
            .publishOn(Schedulers.parallel())      // CPU 密集型
            .map(i -> i * 2)
            .publishOn(Schedulers.single())        // 顺序执行
            .next();
    }
}

函数式端点

1. 定义路由

@Configuration
public class UserRouter {
    
    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
        return RouterFunctions
            .route(GET("/api/users"), handler::getAll)
            .andRoute(GET("/api/users/{id}"), handler::getById)
            .andRoute(POST("/api/users"), handler::create)
            .andRoute(PUT("/api/users/{id}"), handler::update)
            .andRoute(DELETE("/api/users/{id}"), handler::delete);
    }
}

2. 创建 Handler

@Component
public class UserHandler {
    
    private final UserService userService;
    
    public Mono<ServerResponse> getAll(ServerRequest request) {
        return userService.findAll()
            .collectList()
            .flatMap(users -> ServerResponse.ok().bodyValue(users));
    }
    
    public Mono<ServerResponse> getById(ServerRequest request) {
        Long id = Long.valueOf(request.pathVariable("id"));
        return userService.findById(id)
            .flatMap(user -> ServerResponse.ok().bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
    
    public Mono<ServerResponse> create(ServerRequest request) {
        return request.bodyToMono(UserCreateDTO.class)
            .flatMap(userService::create)
            .flatMap(user -> ServerResponse
                .status(HttpStatus.CREATED)
                .bodyValue(user));
    }
}

WebSocket

1. 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

2. 创建 WebSocket Handler

@Component
public class WebSocketHandler implements org.springframework.web.reactive.socket.WebSocketHandler {
    
    private final ConcurrentHashMap<String, WebSocketSession> sessions = 
        new ConcurrentHashMap<>();
    
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // 保存会话
        sessions.put(session.getId(), session);
        
        // 接收消息
        Mono<Void> receive = session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .doOnNext(message -> log.info("收到消息:{}", message))
            .then();
        
        // 发送消息
        Mono<Void> send = session.send(
            Flux.interval(Duration.ofSeconds(1))
                .map(seq -> session.textMessage("Heartbeat: " + seq))
        );
        
        return Mono.zip(receive, send).then();
    }
    
    /**
     * 广播消息
     */
    public void broadcast(String message) {
        sessions.values().forEach(session -> {
            session.send(Mono.just(session.textMessage(message)))
                .subscribe();
        });
    }
}

3. 配置 WebSocket

@Configuration
public class WebSocketConfig {
    
    @Bean
    public SimpleUrlHandlerMapping webSocketMapping(WebSocketHandler handler) {
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(Map.of("/ws", handler));
        mapping.setOrder(10);
        return mapping;
    }
}

4. 前端连接

const ws = new WebSocket('ws://localhost:8080/ws');

ws.onopen = () => {
    console.log('连接成功');
    ws.send('Hello');
};

ws.onmessage = (event) => {
    console.log('收到消息:', event.data);
};

ws.onerror = (error) => {
    console.error('错误:', error);
};

性能优化

1. 连接池配置

spring:
  webflux:
    max-in-memory-size: 262144
  r2dbc:
    pool:
      max-size: 10
      initial-size: 5
      max-idle-time: 30m
      max-life-time: 60m

2. 超时配置

@Configuration
public class WebClientConfig {
    
    @Bean
    public WebClient webClient() {
        return WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(
                HttpClient.create()
                    .responseTimeout(Duration.ofSeconds(30))
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
            ))
            .build();
    }
}

3. 缓存配置

@Configuration
public class CacheConfig {
    
    @Bean
    public WebFilter cacheWebFilter() {
        return (exchange, chain) -> {
            ServerHttpRequest request = exchange.getRequest();
            if (request.getMethod() == HttpMethod.GET) {
                exchange.getResponse().getHeaders().setCacheControl(
                    CacheControl.maxAge(Duration.ofMinutes(10))
                );
            }
            return chain.filter(exchange);
        };
    }
}

4. 背压处理

@Service
public class BackpressureService {
    
    /**
     * 限制速率
     */
    public Flux<String> rateLimit(Flux<String> flux) {
        return flux.limitRate(10); // 请求 10 个
    }
    
    /**
     * 限制数量
     */
    public Flux<String> limitN(Flux<String> flux) {
        return flux.take(100); // 只取 100 个
    }
    
    /**
     * 限流
     */
    public Flux<String> throttle(Flux<String> flux) {
        return flux.delayElements(Duration.ofMillis(100)); // 每 100ms 一个
    }
}

最佳实践

1. 避免阻塞

// ✅ 推荐
@Service
public class ReactiveService {
    
    public Mono<String> getData() {
        return Mono.fromCallable(() -> blockingCall())
            .subscribeOn(Schedulers.boundedElastic());
    }
}

// ❌ 不推荐
@Service
public class BlockingService {
    
    public Mono<String> getData() {
        return Mono.fromCallable(() -> {
            return blockingCall(); // 阻塞响应式线程
        });
    }
}

2. 错误处理

// ✅ 推荐
@GetMapping("/api/users/{id}")
public Mono<UserDTO> getUser(@PathVariable Long id) {
    return userService.findById(id)
        .onErrorResume(NotFoundException.class, ex -> 
            Mono.error(new BusinessException("用户不存在"))
        )
        .onErrorTimeout(Duration.ofSeconds(5), 
            () -> Mono.error(new BusinessException("超时"))
        );
}

// ❌ 不推荐
@GetMapping("/api/users/{id}")
public Mono<UserDTO> getUser(@PathVariable Long id) {
    return userService.findById(id);
    // 没有错误处理
}

3. 日志记录

@Configuration
public class WebClientConfig {
    
    @Bean
    public WebClient webClient() {
        return WebClient.builder()
            .filter(logRequest())
            .filter(logResponse())
            .build();
    }
    
    private ExchangeFilterFunction logRequest() {
        return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
            log.info("请求:{} {}", clientRequest.method(), clientRequest.url());
            return Mono.just(clientRequest);
        });
    }
    
    private ExchangeFilterFunction logResponse() {
        return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
            log.info("响应:{}", clientResponse.statusCode());
            return Mono.just(clientResponse);
        });
    }
}

4. 测试

@WebFluxTest(UserController.class)
class UserControllerTest {
    
    @Autowired
    private WebTestClient webTestClient;
    
    @MockBean
    private UserService userService;
    
    @Test
    void testGetUser() {
        UserDTO user = new UserDTO(1L, "test");
        
        when(userService.findById(1L))
            .thenReturn(Mono.just(user));
        
        webTestClient.get()
            .uri("/api/users/1")
            .exchange()
            .expectStatus().isOk()
            .expectBody(UserDTO.class)
            .isEqualTo(user);
    }
}

总结

WebFlux 要点:

WebFlux 是高并发场景的理想选择。


分享这篇文章到:

上一篇文章
Spring Authorization Server
下一篇文章
SkyWalking 链路追踪实战