Skip to content
清晨的一缕阳光
返回

Spring Boot WebSocket 实时通信

前言

WebSocket 提供全双工通信通道,适合实时聊天、消息推送、在线协作等场景。Spring Boot 通过 spring-boot-starter-websocket 可以方便地集成 WebSocket。

快速开始

1. 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2. 配置 WebSocket

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
    @Override
    public void configureMessageTransport(MessageTransportRegistration registration) {
        // 配置消息传输
    }
    
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.taskExecutor().corePoolSize(4).maxPoolSize(8);
    }
    
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        registration.taskExecutor().corePoolSize(4).maxPoolSize(8);
    }
    
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册 STOMP 端点
        registry.addEndpoint("/ws")
            .setAllowedOriginPatterns("*")
            .withSockJS();
    }
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 启用简单的内存消息代理
        registry.enableSimpleBroker("/topic", "/queue");
        
        // 设置应用前缀
        registry.setApplicationDestinationPrefixes("/app");
        
        // 设置用户前缀
        registry.setUserDestinationPrefix("/user");
    }
}

3. 创建 Controller

@Controller
public class ChatController {
    
    /**
     * 处理聊天消息
     */
    @MessageMapping("/chat.sendMessage")
    @SendTo("/topic/public")
    public ChatMessage sendMessage(ChatMessage message) {
        log.info("收到消息:{}", message);
        
        // 设置发送时间
        message.setTimestamp(LocalDateTime.now());
        
        return message;
    }
    
    /**
     * 添加用户
     */
    @MessageMapping("/chat.addUser")
    @SendTo("/topic/public")
    public ChatMessage addUser(ChatMessage message, 
                               SimpMessageHeaderAccessor headerAccessor) {
        // 在 WebSocket 会话中保存用户名
        headerAccessor.getSessionAttributes().put("username", message.getSender());
        
        message.setType(MessageType.JOIN);
        message.setTimestamp(LocalDateTime.now());
        
        return message;
    }
    
    /**
     * 发送私信
     */
    @MessageMapping("/chat.private")
    public ChatMessage sendPrivate(ChatMessage message,
                                   @Header("simpSessionId") String sessionId) {
        message.setTimestamp(LocalDateTime.now());
        
        // 发送给指定用户
        return message;
    }
}

4. 前端连接

<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/stompjs@2.3.3/lib/stomp.min.js"></script>
<script>
    // 连接 WebSocket
    const socket = new SockJS('/ws');
    const stompClient = Stomp.over(socket);
    
    stompClient.connect({}, frame => {
        console.log('连接成功:', frame);
        
        // 订阅公共频道
        stompClient.subscribe('/topic/public', message => {
            const chatMessage = JSON.parse(message.body);
            displayMessage(chatMessage);
        });
        
        // 订阅私信
        stompClient.subscribe('/user/queue/private', message => {
            const chatMessage = JSON.parse(message.body);
            displayPrivateMessage(chatMessage);
        });
    });
    
    // 发送消息
    function sendMessage() {
        const message = {
            type: 'CHAT',
            content: document.getElementById('messageInput').value,
            sender: username
        };
        
        stompClient.send('/app/chat.sendMessage', {}, JSON.stringify(message));
    }
    
    // 加入聊天
    function addUser() {
        const message = {
            type: 'JOIN',
            sender: username
        };
        
        stompClient.send('/app/chat.addUser', {}, JSON.stringify(message));
    }
</script>

消息推送

1. 推送服务

@Service
public class NotificationService {
    
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    /**
     * 推送给所有用户
     */
    public void sendToAll(String message) {
        messagingTemplate.convertAndSend("/topic/notifications", message);
    }
    
    /**
     * 推送给指定用户
     */
    public void sendToUser(String username, Notification notification) {
        messagingTemplate.convertAndSendToUser(
            username,
            "/queue/notifications",
            notification
        );
    }
    
    /**
     * 推送订单状态更新
     */
    public void sendOrderUpdate(Long orderId, OrderStatus status) {
        OrderUpdate update = new OrderUpdate(orderId, status);
        
        messagingTemplate.convertAndSend(
            "/topic/orders/" + orderId,
            update
        );
    }
    
    /**
     * 推送在线用户列表
     */
    public void sendOnlineUsers(List<String> users) {
        messagingTemplate.convertAndSend("/topic/online-users", users);
    }
}

2. 监听连接事件

@Component
public class WebSocketEventListener {
    
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    @Autowired
    private NotificationService notificationService;
    
    /**
     * 监听连接事件
     */
    @EventListener
    public void handleWebSocketConnectListener(SessionConnectedEvent event) {
        log.info("新的 WebSocket 连接");
    }
    
    /**
     * 监听断开事件
     */
    @EventListener
    public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
        
        String username = (String) headerAccessor.getSessionAttributes().get("username");
        
        if (username != null) {
            log.info("用户断开连接:{}", username);
            
            // 通知其他用户
            notificationService.sendToAll(new ChatMessage(
                MessageType.LEAVE,
                username,
                username + " 离开了"
            ));
            
            // 更新在线用户列表
            updateOnlineUsers();
        }
    }
    
    private void updateOnlineUsers() {
        // 获取在线用户列表并推送
        List<String> onlineUsers = getOnlineUsers();
        notificationService.sendOnlineUsers(onlineUsers);
    }
}

3. 发送通知

@Service
@RequiredArgsConstructor
public class OrderService {
    
    private final NotificationService notificationService;
    
    /**
     * 创建订单并推送通知
     */
    public Order createOrder(OrderCreateDTO dto) {
        Order order = orderRepository.save(convert(dto));
        
        // 推送通知给用户
        notificationService.sendToUser(order.getUserId(), new Notification(
            "订单创建成功",
            "订单号:" + order.getOrderNo(),
            NotificationType.ORDER
        ));
        
        return order;
    }
    
    /**
     * 更新订单状态并推送
     */
    public void updateOrderStatus(Long orderId, OrderStatus status) {
        Order order = orderRepository.findById(orderId).orElse(null);
        
        order.setStatus(status);
        orderRepository.save(order);
        
        // 推送状态更新
        notificationService.sendOrderUpdate(orderId, status);
    }
}

聊天室实现

1. 聊天室管理

@Service
public class ChatRoomService {
    
    private final ConcurrentHashMap<String, Set<String>> chatRooms = 
        new ConcurrentHashMap<>();
    
    private final ConcurrentHashMap<String, Set<String>> userRooms = 
        new ConcurrentHashMap<>();
    
    /**
     * 创建聊天室
     */
    public String createRoom(String name, String creator) {
        String roomId = UUID.randomUUID().toString();
        
        Set<String> members = ConcurrentHashMap.newKeySet();
        members.add(creator);
        
        chatRooms.put(roomId, members);
        userRooms.put(creator, ConcurrentHashMap.newKeySet(List.of(roomId)));
        
        return roomId;
    }
    
    /**
     * 加入聊天室
     */
    public void joinRoom(String roomId, String username) {
        chatRooms.computeIfPresent(roomId, (id, members) -> {
            members.add(username);
            return members;
        });
        
        userRooms.computeIfPresent(username, (user, rooms) -> {
            rooms.add(roomId);
            return rooms;
        });
    }
    
    /**
     * 离开聊天室
     */
    public void leaveRoom(String roomId, String username) {
        chatRooms.computeIfPresent(roomId, (id, members) -> {
            members.remove(username);
            return members;
        });
        
        userRooms.computeIfPresent(username, (user, rooms) -> {
            rooms.remove(roomId);
            return rooms;
        });
    }
    
    /**
     * 获取聊天室成员
     */
    public Set<String> getRoomMembers(String roomId) {
        return chatRooms.getOrDefault(roomId, ConcurrentHashMap.newKeySet());
    }
}

2. 聊天室 Controller

@Controller
@RequestMapping("/ws/chat")
public class ChatRoomController {
    
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    @Autowired
    private ChatRoomService chatRoomService;
    
    /**
     * 发送聊天室消息
     */
    @MessageMapping("/room/{roomId}/send")
    public ChatMessage sendToRoom(
        @DestinationVariable String roomId,
        ChatMessage message
    ) {
        message.setRoomId(roomId);
        message.setTimestamp(LocalDateTime.now());
        
        // 推送到聊天室
        messagingTemplate.convertAndSend(
            "/topic/room/" + roomId,
            message
        );
        
        return message;
    }
    
    /**
     * 加入聊天室
     */
    @MessageMapping("/room/{roomId}/join")
    public void joinRoom(
        @DestinationVariable String roomId,
        @Header("simpSessionAttributes") Map<String, Object> attributes
    ) {
        String username = (String) attributes.get("username");
        
        chatRoomService.joinRoom(roomId, username);
        
        // 通知聊天室成员
        ChatMessage joinMessage = new ChatMessage(
            MessageType.JOIN,
            username,
            username + " 加入了聊天室"
        );
        
        messagingTemplate.convertAndSend(
            "/topic/room/" + roomId,
            joinMessage
        );
    }
}

安全性

1. 认证授权

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketSecurityConfig implements WebSocketMessageBrokerConfigurer {
    
    @Autowired
    private JwtService jwtService;
    
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
            .setAllowedOriginPatterns("*")
            .addInterceptors(new HandshakeInterceptor() {
                @Override
                public boolean beforeHandshake(
                    ServerHttpRequest request,
                    ServerHttpResponse response,
                    WebSocketHandler wsHandler,
                    Map<String, Object> attributes
                ) throws Exception {
                    
                    if (request instanceof ServletServerHttpRequest) {
                        ServletServerHttpRequest serverRequest = 
                            (ServletServerHttpRequest) request;
                        
                        String token = extractToken(serverRequest);
                        
                        if (token != null && jwtService.validateToken(token)) {
                            String username = jwtService.extractUsername(token);
                            attributes.put("username", username);
                            return true;
                        }
                    }
                    
                    return false;
                }
                
                @Override
                public void afterHandshake(
                    ServerHttpRequest request,
                    ServerHttpResponse response,
                    WebSocketHandler wsHandler,
                    Exception exception
                ) {
                }
            });
    }
    
    private String extractToken(ServletServerHttpRequest request) {
        String bearer = request.getHeaders().getFirst("Authorization");
        if (bearer != null && bearer.startsWith("Bearer ")) {
            return bearer.substring(7);
        }
        return request.getServletRequest().getParameter("token");
    }
}

2. 消息拦截

@Component
public class MessageInterceptor implements ChannelInterceptor {
    
    @Autowired
    private JwtService jwtService;
    
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        
        // 认证检查
        if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
            String destination = accessor.getDestination();
            
            // 检查是否有权限订阅
            if (destination != null && destination.startsWith("/user/")) {
                String username = (String) accessor.getSessionAttributes().get("username");
                
                if (username == null) {
                    throw new MessagingException("未授权访问");
                }
            }
        }
        
        return message;
    }
}

最佳实践

1. 心跳检测

@Configuration
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptor() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
                
                if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                    // 设置心跳
                    accessor.setHeartbeat(new long[]{10000, 10000]);
                }
                
                return message;
            }
        });
    }
}

2. 消息限流

@Component
public class RateLimitInterceptor implements ChannelInterceptor {
    
    private final RateLimiter rateLimiter = RateLimiter.create(10); // 每秒 10 条
    
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        
        if (StompCommand.SEND.equals(accessor.getCommand())) {
            if (!rateLimiter.tryAcquire()) {
                throw new MessagingException("消息发送过于频繁");
            }
        }
        
        return message;
    }
}

3. 消息持久化

@Service
public class ChatMessageService {
    
    @Autowired
    private ChatMessageRepository repository;
    
    public void saveMessage(ChatMessage message) {
        repository.save(message);
    }
    
    public List<ChatMessage> getHistory(String roomId, int limit) {
        return repository.findByRoomIdOrderByTimestampDesc(roomId, PageRequest.of(0, limit));
    }
}

4. 集群支持

spring:
  session:
    store-type: redis
  
  data:
    redis:
      host: localhost
      port: 6379
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
    @Bean
    public SubscribableChannel clientInboundChannel() {
        return new RedisSubscribableChannel(); // 使用 Redis 广播
    }
}

总结

WebSocket 要点:

WebSocket 是实时通信的理想选择。


分享这篇文章到:

上一篇文章
MySQL 架构设计与存储引擎
下一篇文章
Spring Authorization Server