前言
WebSocket 提供全双工通信通道,适合实时聊天、消息推送、在线协作等场景。Spring Boot 通过 spring-boot-starter-websocket 可以方便地集成 WebSocket。
快速开始
1. 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2. 配置 WebSocket
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageTransport(MessageTransportRegistration registration) {
// 配置消息传输
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(4).maxPoolSize(8);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(4).maxPoolSize(8);
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册 STOMP 端点
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 启用简单的内存消息代理
registry.enableSimpleBroker("/topic", "/queue");
// 设置应用前缀
registry.setApplicationDestinationPrefixes("/app");
// 设置用户前缀
registry.setUserDestinationPrefix("/user");
}
}
3. 创建 Controller
@Controller
public class ChatController {
/**
* 处理聊天消息
*/
@MessageMapping("/chat.sendMessage")
@SendTo("/topic/public")
public ChatMessage sendMessage(ChatMessage message) {
log.info("收到消息:{}", message);
// 设置发送时间
message.setTimestamp(LocalDateTime.now());
return message;
}
/**
* 添加用户
*/
@MessageMapping("/chat.addUser")
@SendTo("/topic/public")
public ChatMessage addUser(ChatMessage message,
SimpMessageHeaderAccessor headerAccessor) {
// 在 WebSocket 会话中保存用户名
headerAccessor.getSessionAttributes().put("username", message.getSender());
message.setType(MessageType.JOIN);
message.setTimestamp(LocalDateTime.now());
return message;
}
/**
* 发送私信
*/
@MessageMapping("/chat.private")
public ChatMessage sendPrivate(ChatMessage message,
@Header("simpSessionId") String sessionId) {
message.setTimestamp(LocalDateTime.now());
// 发送给指定用户
return message;
}
}
4. 前端连接
<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/stompjs@2.3.3/lib/stomp.min.js"></script>
<script>
// 连接 WebSocket
const socket = new SockJS('/ws');
const stompClient = Stomp.over(socket);
stompClient.connect({}, frame => {
console.log('连接成功:', frame);
// 订阅公共频道
stompClient.subscribe('/topic/public', message => {
const chatMessage = JSON.parse(message.body);
displayMessage(chatMessage);
});
// 订阅私信
stompClient.subscribe('/user/queue/private', message => {
const chatMessage = JSON.parse(message.body);
displayPrivateMessage(chatMessage);
});
});
// 发送消息
function sendMessage() {
const message = {
type: 'CHAT',
content: document.getElementById('messageInput').value,
sender: username
};
stompClient.send('/app/chat.sendMessage', {}, JSON.stringify(message));
}
// 加入聊天
function addUser() {
const message = {
type: 'JOIN',
sender: username
};
stompClient.send('/app/chat.addUser', {}, JSON.stringify(message));
}
</script>
消息推送
1. 推送服务
@Service
public class NotificationService {
@Autowired
private SimpMessagingTemplate messagingTemplate;
/**
* 推送给所有用户
*/
public void sendToAll(String message) {
messagingTemplate.convertAndSend("/topic/notifications", message);
}
/**
* 推送给指定用户
*/
public void sendToUser(String username, Notification notification) {
messagingTemplate.convertAndSendToUser(
username,
"/queue/notifications",
notification
);
}
/**
* 推送订单状态更新
*/
public void sendOrderUpdate(Long orderId, OrderStatus status) {
OrderUpdate update = new OrderUpdate(orderId, status);
messagingTemplate.convertAndSend(
"/topic/orders/" + orderId,
update
);
}
/**
* 推送在线用户列表
*/
public void sendOnlineUsers(List<String> users) {
messagingTemplate.convertAndSend("/topic/online-users", users);
}
}
2. 监听连接事件
@Component
public class WebSocketEventListener {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private NotificationService notificationService;
/**
* 监听连接事件
*/
@EventListener
public void handleWebSocketConnectListener(SessionConnectedEvent event) {
log.info("新的 WebSocket 连接");
}
/**
* 监听断开事件
*/
@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
String username = (String) headerAccessor.getSessionAttributes().get("username");
if (username != null) {
log.info("用户断开连接:{}", username);
// 通知其他用户
notificationService.sendToAll(new ChatMessage(
MessageType.LEAVE,
username,
username + " 离开了"
));
// 更新在线用户列表
updateOnlineUsers();
}
}
private void updateOnlineUsers() {
// 获取在线用户列表并推送
List<String> onlineUsers = getOnlineUsers();
notificationService.sendOnlineUsers(onlineUsers);
}
}
3. 发送通知
@Service
@RequiredArgsConstructor
public class OrderService {
private final NotificationService notificationService;
/**
* 创建订单并推送通知
*/
public Order createOrder(OrderCreateDTO dto) {
Order order = orderRepository.save(convert(dto));
// 推送通知给用户
notificationService.sendToUser(order.getUserId(), new Notification(
"订单创建成功",
"订单号:" + order.getOrderNo(),
NotificationType.ORDER
));
return order;
}
/**
* 更新订单状态并推送
*/
public void updateOrderStatus(Long orderId, OrderStatus status) {
Order order = orderRepository.findById(orderId).orElse(null);
order.setStatus(status);
orderRepository.save(order);
// 推送状态更新
notificationService.sendOrderUpdate(orderId, status);
}
}
聊天室实现
1. 聊天室管理
@Service
public class ChatRoomService {
private final ConcurrentHashMap<String, Set<String>> chatRooms =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Set<String>> userRooms =
new ConcurrentHashMap<>();
/**
* 创建聊天室
*/
public String createRoom(String name, String creator) {
String roomId = UUID.randomUUID().toString();
Set<String> members = ConcurrentHashMap.newKeySet();
members.add(creator);
chatRooms.put(roomId, members);
userRooms.put(creator, ConcurrentHashMap.newKeySet(List.of(roomId)));
return roomId;
}
/**
* 加入聊天室
*/
public void joinRoom(String roomId, String username) {
chatRooms.computeIfPresent(roomId, (id, members) -> {
members.add(username);
return members;
});
userRooms.computeIfPresent(username, (user, rooms) -> {
rooms.add(roomId);
return rooms;
});
}
/**
* 离开聊天室
*/
public void leaveRoom(String roomId, String username) {
chatRooms.computeIfPresent(roomId, (id, members) -> {
members.remove(username);
return members;
});
userRooms.computeIfPresent(username, (user, rooms) -> {
rooms.remove(roomId);
return rooms;
});
}
/**
* 获取聊天室成员
*/
public Set<String> getRoomMembers(String roomId) {
return chatRooms.getOrDefault(roomId, ConcurrentHashMap.newKeySet());
}
}
2. 聊天室 Controller
@Controller
@RequestMapping("/ws/chat")
public class ChatRoomController {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private ChatRoomService chatRoomService;
/**
* 发送聊天室消息
*/
@MessageMapping("/room/{roomId}/send")
public ChatMessage sendToRoom(
@DestinationVariable String roomId,
ChatMessage message
) {
message.setRoomId(roomId);
message.setTimestamp(LocalDateTime.now());
// 推送到聊天室
messagingTemplate.convertAndSend(
"/topic/room/" + roomId,
message
);
return message;
}
/**
* 加入聊天室
*/
@MessageMapping("/room/{roomId}/join")
public void joinRoom(
@DestinationVariable String roomId,
@Header("simpSessionAttributes") Map<String, Object> attributes
) {
String username = (String) attributes.get("username");
chatRoomService.joinRoom(roomId, username);
// 通知聊天室成员
ChatMessage joinMessage = new ChatMessage(
MessageType.JOIN,
username,
username + " 加入了聊天室"
);
messagingTemplate.convertAndSend(
"/topic/room/" + roomId,
joinMessage
);
}
}
安全性
1. 认证授权
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketSecurityConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
private JwtService jwtService;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.addInterceptors(new HandshakeInterceptor() {
@Override
public boolean beforeHandshake(
ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Map<String, Object> attributes
) throws Exception {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest serverRequest =
(ServletServerHttpRequest) request;
String token = extractToken(serverRequest);
if (token != null && jwtService.validateToken(token)) {
String username = jwtService.extractUsername(token);
attributes.put("username", username);
return true;
}
}
return false;
}
@Override
public void afterHandshake(
ServerHttpRequest request,
ServerHttpResponse response,
WebSocketHandler wsHandler,
Exception exception
) {
}
});
}
private String extractToken(ServletServerHttpRequest request) {
String bearer = request.getHeaders().getFirst("Authorization");
if (bearer != null && bearer.startsWith("Bearer ")) {
return bearer.substring(7);
}
return request.getServletRequest().getParameter("token");
}
}
2. 消息拦截
@Component
public class MessageInterceptor implements ChannelInterceptor {
@Autowired
private JwtService jwtService;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
// 认证检查
if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
String destination = accessor.getDestination();
// 检查是否有权限订阅
if (destination != null && destination.startsWith("/user/")) {
String username = (String) accessor.getSessionAttributes().get("username");
if (username == null) {
throw new MessagingException("未授权访问");
}
}
}
return message;
}
}
最佳实践
1. 心跳检测
@Configuration
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// 设置心跳
accessor.setHeartbeat(new long[]{10000, 10000]);
}
return message;
}
});
}
}
2. 消息限流
@Component
public class RateLimitInterceptor implements ChannelInterceptor {
private final RateLimiter rateLimiter = RateLimiter.create(10); // 每秒 10 条
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
if (StompCommand.SEND.equals(accessor.getCommand())) {
if (!rateLimiter.tryAcquire()) {
throw new MessagingException("消息发送过于频繁");
}
}
return message;
}
}
3. 消息持久化
@Service
public class ChatMessageService {
@Autowired
private ChatMessageRepository repository;
public void saveMessage(ChatMessage message) {
repository.save(message);
}
public List<ChatMessage> getHistory(String roomId, int limit) {
return repository.findByRoomIdOrderByTimestampDesc(roomId, PageRequest.of(0, limit));
}
}
4. 集群支持
spring:
session:
store-type: redis
data:
redis:
host: localhost
port: 6379
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Bean
public SubscribableChannel clientInboundChannel() {
return new RedisSubscribableChannel(); // 使用 Redis 广播
}
}
总结
WebSocket 要点:
- ✅ STOMP 协议 - 消息订阅发布
- ✅ 消息推送 - 实时通知、状态更新
- ✅ 聊天室 - 群聊、私信
- ✅ 安全性 - 认证、授权、限流
- ✅ 最佳实践 - 心跳、持久化、集群
WebSocket 是实时通信的理想选择。