本系统基于 Spring Boot + WebSocket + STOMP + RabbitMQ + Redis 构建,目标是实现一个 支持集群部署、可横向扩展、具备完善会话治理能力的企业级实时消息通信架构。
整体设计遵循以下核心原则:
- 无状态 WebSocket 服务实例
- 消息路由下沉到 Broker(RabbitMQ)
- 连接状态集中存储于 Redis
- 协议级(STOMP)而非业务级控制
在企业级场景下,WebSocket 通信通常面临以下问题:
- 多实例部署下,连接状态不可共享
- 内置 SimpleBroker 无法支撑高并发与横向扩展
- 客户端异常断线导致“假在线”
- 会话数、在线人数难以准确统计
- 权限校验与消息路由混杂在业务代码中
本架构的设计目标是:
- 支持多实例 Spring Boot 部署
- 支持百万级连接的 Broker 分发能力
- 统一管理用户在线状态与会话生命周期
- 在 STOMP 协议层完成鉴权与权限控制
- 实现可观测、可清理、可统计的连接治理体系
系统整体由五个核心组件构成:
- 基于 SockJS + STOMP 协议
- 与服务端通过
/ws建立 WebSocket 连接 - 负责:
- 发送 CONNECT / SUBSCRIBE / SEND 帧
- 定时发送心跳消息
- 接收 Broker 推送的实时消息
客户端不直接感知服务端实例,仅与统一入口通信。
Spring Boot 服务作为 WebSocket 接入层与业务处理层,主要职责包括:
- 接收 WebSocket / STOMP 连接
- 执行 STOMP 协议级拦截与鉴权
- 处理业务消息(私聊 / 群聊 / 广播)
- 与 Redis、RabbitMQ 协同完成状态管理与消息分发
关键特性:
- 服务实例无状态
- 不维护本地 Session
- 所有连接状态集中存储于 Redis
基于 Spring 的 STOMP 支持,系统在协议层完成以下能力:
- CONNECT 阶段完成身份认证
- SUBSCRIBE 阶段完成订阅权限校验
- SEND 阶段完成业务消息路由
所有客户端消息均遵循统一的 STOMP 语义:
/app/**:应用级消息入口/topic/**:广播 / 群消息/user/queue/**:点对点私聊消息
该设计确保:
- 消息路径语义清晰
- 权限控制集中
- 客户端与服务端协议解耦
RabbitMQ 作为 消息代理与分发中心,通过 STOMP Broker Relay 接管消息路由:
- 替代 Spring 内置 SimpleBroker
- 支持 RabbitMQ 集群
- 支持高并发消息分发
在该架构中:
- 群聊消息映射为 RabbitMQ
topic - 私聊消息映射为 RabbitMQ
queue - 广播消息统一通过
topic分发
核心作用:
- 消息跨实例转发
- Broker 级负载均衡
- 解耦消息生产与消费
Redis 作为 全局会话与状态管理中心,负责:
- 用户与 WebSocket Session 的映射关系
- 在线用户集合维护
- Session 心跳时间记录
- 在线人数与连接数统计
通过 Redis:
- 多实例之间共享连接状态
- 支持异常断线自动清理
- 实现准确的在线统计能力
Redis 不参与消息转发,仅负责 连接治理与状态管理。
一次完整的消息流转过程如下:
- 客户端通过 WebSocket 建立连接
- STOMP CONNECT 帧触发身份认证
- 服务端为连接绑定唯一 Principal
- 会话信息写入 Redis
- 客户端订阅 topic / queue
- 业务消息通过
/app/**发送至服务端 - 服务端将消息投递至 RabbitMQ
- RabbitMQ 将消息分发至目标订阅者
- 客户端实时接收消息
整个过程:
- 服务实例不直接转发消息
- 所有分发逻辑由 Broker 完成
- 连接状态与消息流转完全解耦
该架构具备以下特征:
- 高可扩展性:支持横向扩展 WebSocket 服务实例
- 高可靠性:Broker 级消息分发,避免单点瓶颈
- 强治理能力:Redis 集中管理连接与心跳
- 协议清晰:基于 STOMP 规范,职责边界明确
- 生产友好:适用于 IM、通知、实时协作等场景
该方案并非简单 WebSocket 示例,而是面向 企业级实时通信场景 的标准化架构实现。
编辑 pom.xml 添加 WebSocket 依赖
<!-- WebSocket 与 STOMP 协议支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Reactor Netty 核心组件,提供基于 Netty 的网络通信能力 -->
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
</dependency>
<!-- Reactor Netty HTTP / WebSocket 支持模块 -->
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-http</artifactId>
</dependency>
<!-- Spring Boot Redis 数据库集成,支持多种 Redis 数据结构和操作 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Lettuce 客户端连接池实现,基于 Apache Commons Pool2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!-- Spring Boot Validation 数据校验框架 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>编辑 application.yml 配置文件
server:
port: 18002
spring:
application:
name: ${project.artifactId}
logging:
level:
root: info
io.github.atengk: debug
---
# Redis的相关配置
spring:
data:
redis:
host: 175.178.193.128 # Redis服务器地址
database: 1 # Redis数据库索引(默认为0)
port: 20003 # Redis服务器连接端口
password: Admin@123 # Redis服务器连接密码(默认为空)
client-type: lettuce # 默认使用Lettuce作为Redis客户端
lettuce:
pool:
max-active: 100 # 连接池最大连接数(使用负值表示没有限制)
max-wait: -1s # 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 100 # 连接池中的最大空闲连接
min-idle: 0 # 连接池最小空闲连接数
time-between-eviction-runs: 1s # 空闲对象逐出器线程的运行间隔时间.空闲连接线程释放周期时间
timeout: 5000ms # 连接超时时间(毫秒)
---
# WebSocket + STOMP Broker Relay 配置
websocket:
url: /ws # WebSocket 端点
allowed-origins: "*" # 跨域允许
heartbeat-interval: 30000 # 前端心跳发送间隔 ms
application-destination-prefix: /app
user-destination-prefix: /user
heartbeat-destination: /app/heartbeat # 心跳发送地址
broker-relay:
relay-host: 175.178.193.128
relay-port: 20015
client-login: admin
client-passcode: Admin@123
system-login: admin
system-passcode: Admin@123
system-heartbeat-send-interval: 10000
system-heartbeat-receive-interval: 10000package io.github.atengk.config;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* WebSocket 配置属性绑定类
*
* <p>
* 统一管理 WebSocket + STOMP 相关配置,包括:
* <ul>
* <li>WebSocket 端点配置</li>
* <li>STOMP 应用级前缀</li>
* <li>用户级消息前缀</li>
* <li>心跳相关配置</li>
* <li>RabbitMQ STOMP Broker Relay 配置</li>
* </ul>
*
* <p>
* 该配置通过 {@code application.yml} 中 {@code websocket.*} 前缀进行绑定,
* 作为整个 WebSocket 架构的统一参数入口。
* </p>
*
* @author 孔余
* @since 2026-01-30
*/
@Data
@Component
@ConfigurationProperties(prefix = "websocket")
public class WebSocketProperties {
/**
* WebSocket STOMP 端点地址
* <p>示例:/ws</p>
*/
@NotBlank
private String url;
/**
* 允许跨域的来源
* <p>支持通配符配置,如:*</p>
*/
@NotBlank
private String allowedOrigins;
/**
* 前端心跳发送间隔(毫秒)
*/
@NotNull
private Long heartbeatInterval;
/**
* STOMP 应用级消息前缀
* <p>客户端向服务端发送消息时使用</p>
* <p>示例:/app</p>
*/
@NotBlank
private String applicationDestinationPrefix;
/**
* STOMP 用户级消息前缀
* <p>用于点对点私聊消息</p>
* <p>示例:/user</p>
*/
@NotBlank
private String userDestinationPrefix;
/**
* 心跳消息发送目的地
* <p>示例:/app/heartbeat</p>
*/
@NotBlank
private String heartbeatDestination;
/**
* RabbitMQ STOMP Broker Relay 配置
*/
@Valid
@NotNull
private BrokerRelay brokerRelay;
/**
* RabbitMQ STOMP Broker Relay 配置项
*
* <p>
* 用于配置 Spring 与 RabbitMQ 之间的 STOMP 协议通信参数,
* 支持 Broker 集群、系统心跳检测等能力。
* </p>
*
* @author 孔余
* @since 2026-01-30
*/
@Data
public static class BrokerRelay {
/**
* RabbitMQ STOMP Relay 主机地址
*/
@NotBlank
private String relayHost;
/**
* RabbitMQ STOMP Relay 端口
*/
@NotNull
private Integer relayPort;
/**
* 客户端连接 Broker 使用的用户名
*/
@NotBlank
private String clientLogin;
/**
* 客户端连接 Broker 使用的密码
*/
@NotBlank
private String clientPasscode;
/**
* 系统级连接 Broker 使用的用户名
*/
@NotBlank
private String systemLogin;
/**
* 系统级连接 Broker 使用的密码
*/
@NotBlank
private String systemPasscode;
/**
* 系统向 Broker 发送心跳的时间间隔(毫秒)
*/
@NotNull
private Long systemHeartbeatSendInterval;
/**
* 系统从 Broker 接收心跳的时间间隔(毫秒)
*/
@NotNull
private Long systemHeartbeatReceiveInterval;
}
}package io.github.atengk.entity;
import java.io.Serial;
import java.io.Serializable;
import java.security.Principal;
import java.util.Objects;
/**
* STOMP 用户身份实现
*
* <p>
* 用于在 WebSocket / STOMP 通信过程中标识当前用户,
* 通常由 CONNECT 阶段解析并绑定到会话。
* </p>
*
* <p>
* 该类可安全用于:
* <ul>
* <li>Spring Messaging UserDestination</li>
* <li>在线用户映射</li>
* <li>分布式会话存储</li>
* </ul>
* </p>
*
* @author 孔余
* @since 2026-01-30
*/
public class StompUserPrincipal implements Principal, Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 业务用户 ID
*/
private final String userId;
public StompUserPrincipal(String userId) {
this.userId = Objects.requireNonNull(userId, "userId must not be null");
}
@Override
public String getName() {
return userId;
}
public String getUserId() {
return userId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof StompUserPrincipal)) {
return false;
}
StompUserPrincipal that = (StompUserPrincipal) o;
return Objects.equals(userId, that.userId);
}
@Override
public int hashCode() {
return Objects.hash(userId);
}
@Override
public String toString() {
return "StompUserPrincipal{userId='" + userId + "'}";
}
}package io.github.atengk.dto;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
/**
* 私聊消息 DTO
*
* <p>
* 表示客户端向指定用户发送的一条私聊消息。
* </p>
*
* @author 孔余
* @since 2026-01-30
*/
@Data
public class PrivateMessageDTO implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 接收方用户 ID
*/
private String toUserId;
/**
* 消息内容
*/
private String content;
}package io.github.atengk.dto;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
/**
* 群消息 DTO
*
* <p>
* 表示客户端向指定群组发送的一条消息,
* 通常由服务端转发给群内所有在线成员。
* </p>
*
* @author 孔余
* @since 2026-01-30
*/
@Data
public class GroupMessageDTO implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 群 ID
*/
private String groupId;
/**
* 消息内容
*/
private String content;
}package io.github.atengk.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* WebSocket 会话管理服务(Redis 企业级实现)
*
* <p>
* 主要职责:
* <ul>
* <li>维护 session 与用户的映射关系</li>
* <li>维护用户在线 / 离线状态(支持多端)</li>
* <li>基于心跳机制自动清理失效连接</li>
* <li>提供在线统计数据,支撑监控与运维</li>
* </ul>
* </p>
*
* <p>
* 设计说明:
* <ul>
* <li>使用 Set 支持用户多 session</li>
* <li>使用 ZSet 管理 session 心跳超时</li>
* <li>所有操作保证幂等性</li>
* </ul>
* </p>
*
* @author 孔余
* @since 2026-01-30
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class WebSocketSessionService {
private final RedisTemplate<String, String> redisTemplate;
/* ======================= Redis Key ======================= */
/** WebSocket 用户 -> session 集合 */
private static final String KEY_USER_SESSIONS = "ws:user:sessions:";
/** WebSocket session -> user 映射 */
private static final String KEY_SESSION_USER = "ws:session:user:";
/** WebSocket 在线用户集合 */
private static final String KEY_ONLINE_USERS = "ws:stats:online:users";
/** WebSocket session 心跳 ZSET */
private static final String KEY_SESSION_HEARTBEAT = "ws:session:heartbeat:zset";
/** 心跳超时时间(秒) */
private static final long HEARTBEAT_TIMEOUT_SECONDS = 90;
/* ======================= 连接管理 ======================= */
/**
* STOMP CONNECT 时调用
*
* @param userId 用户 ID
* @param sessionId WebSocket sessionId
*/
public void onConnect(String userId, String sessionId) {
// 初始化心跳
onHeartbeat(sessionId);
redisTemplate.opsForSet()
.add(KEY_USER_SESSIONS + userId, sessionId);
redisTemplate.opsForValue()
.set(KEY_SESSION_USER + sessionId, userId);
redisTemplate.opsForSet()
.add(KEY_ONLINE_USERS, userId);
log.info("【WebSocket 连接建立】userId={}, sessionId={}", userId, sessionId);
}
/**
* WebSocket 断连处理(幂等)
*
* @param sessionId WebSocket sessionId
*/
public void onDisconnect(String sessionId) {
// 移除心跳
redisTemplate.opsForZSet()
.remove(KEY_SESSION_HEARTBEAT, sessionId);
String userId = redisTemplate.opsForValue()
.get(KEY_SESSION_USER + sessionId);
if (userId == null) {
log.debug("【WebSocket 断连】session 已被清理,sessionId={}", sessionId);
return;
}
redisTemplate.opsForSet()
.remove(KEY_USER_SESSIONS + userId, sessionId);
redisTemplate.delete(KEY_SESSION_USER + sessionId);
Long remain = redisTemplate.opsForSet()
.size(KEY_USER_SESSIONS + userId);
if (remain == null || remain == 0) {
redisTemplate.opsForSet()
.remove(KEY_ONLINE_USERS, userId);
log.info("【WebSocket 用户离线】userId={}", userId);
} else {
log.info("【WebSocket 连接断开】userId={}, 剩余连接数={}", userId, remain);
}
}
/* ======================= 心跳管理 ======================= */
/**
* 心跳续期
*
* @param sessionId WebSocket sessionId
*/
public void onHeartbeat(String sessionId) {
long now = Instant.now().getEpochSecond();
redisTemplate.opsForZSet()
.add(KEY_SESSION_HEARTBEAT, sessionId, now);
log.debug("【WebSocket 心跳刷新】sessionId={}, time={}", sessionId, now);
}
/**
* 清理心跳超时的 WebSocket session
*
* <p>
* 该方法仅负责清理规则与 Redis 操作,
* 由定时任务或其他触发方式调用。
* </p>
*/
public void cleanupExpiredSessions() {
long expireBefore = Instant.now().getEpochSecond() - HEARTBEAT_TIMEOUT_SECONDS;
Set<String> expiredSessions = redisTemplate.opsForZSet()
.rangeByScore(KEY_SESSION_HEARTBEAT, 0, expireBefore);
if (expiredSessions == null || expiredSessions.isEmpty()) {
return;
}
for (String sessionId : expiredSessions) {
log.warn("【WebSocket 心跳超时】清理 sessionId={}", sessionId);
onDisconnect(sessionId);
}
log.info("【WebSocket 会话清理完成】清理失效 session 数={}", expiredSessions.size());
}
/* ======================= 在线统计 ======================= */
/**
* 获取当前 WebSocket 在线统计信息
*
* <p>
* 本方法返回的是一份「系统在线状态快照」,主要用于:
* <ul>
* <li>管理后台在线人数展示</li>
* <li>运维监控与容量评估</li>
* <li>排查异常连接或多端登录情况</li>
* </ul>
* </p>
*
* <p>
* 返回 Map 中各字段含义如下:
* <ul>
* <li>
* totalUsers:
* 当前在线用户数(按用户维度去重)。
* <br/>
* 只要某个用户至少存在一条有效 WebSocket 连接,即视为在线用户。
* </li>
* <li>
* totalConnections:
* 当前在线的 WebSocket 连接总数。
* <br/>
* 每一个 session 视为一条独立连接,支持多端同时在线。
* </li>
* <li>
* avgConnectionsPerUser:
* 人均 WebSocket 连接数。
* <br/>
* 计算公式:totalConnections / totalUsers,
* 用于反映用户多端登录情况或是否存在异常连接。
* </li>
* <li>
* timestamp:
* 统计数据生成时间的时间戳(秒级,Unix Epoch)。
* <br/>
* 用于标识该统计快照的有效时间点,便于监控与前端判断数据是否过期。
* </li>
* </ul>
* </p>
*
* <p>
* 说明:
* <ul>
* <li>该统计为实时计算结果,不做缓存</li>
* <li>仅用于监控与展示,不参与业务逻辑判断</li>
* </ul>
* </p>
*
* @return 在线统计数据快照
*
* @author 孔余
* @since 2026-01-30
*/
public Map<String, Object> getOnlineStats() {
Map<String, Object> stats = new HashMap<>();
Set<String> users = redisTemplate.opsForSet()
.members(KEY_ONLINE_USERS);
int totalUsers = users == null ? 0 : users.size();
int totalConnections = 0;
if (users != null) {
for (String userId : users) {
Long count = redisTemplate.opsForSet()
.size(KEY_USER_SESSIONS + userId);
totalConnections += count == null ? 0 : count.intValue();
}
}
stats.put("totalUsers", totalUsers);
stats.put("totalConnections", totalConnections);
stats.put("avgConnectionsPerUser",
totalUsers == 0 ? 0 : (double) totalConnections / totalUsers);
stats.put("timestamp", Instant.now().getEpochSecond());
return stats;
}
}注意开启 @EnableScheduling,或者使用其他定时任务调度框架
package io.github.atengk.schedule;
import io.github.atengk.service.WebSocketSessionService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* WebSocket 会话定时任务
*
* <p>
* 定期触发 WebSocket 会话清理逻辑,
* 用于清除心跳超时、异常断开的连接,
* 防止在线状态与 Redis 数据长期不一致。
* </p>
*
* <p>
* 该类仅负责调度,不包含具体清理规则,
* 实际清理逻辑由 {@link WebSocketSessionService} 统一维护。
* </p>
*
* @author 孔余
* @since 2026-01-30
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketSessionScheduler {
private final WebSocketSessionService sessionService;
/**
* 定时清理心跳超时的 WebSocket 会话
*
* <p>
* 默认每 60 秒执行一次,用于回收:
* <ul>
* <li>非正常断开的连接</li>
* <li>心跳丢失的 session</li>
* </ul>
* </p>
*/
@Scheduled(fixedDelay = 60_000)
public void cleanupExpiredSessions() {
log.debug("【WebSocket 定时任务】开始执行会话清理");
sessionService.cleanupExpiredSessions();
}
}package io.github.atengk.interceptor;
import io.github.atengk.config.WebSocketProperties;
import io.github.atengk.service.WebSocketSessionService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
/**
* WebSocket 心跳拦截器
*
* <p>
* 用于拦截客户端发送的业务心跳消息,
* 在不进入业务处理链路的前提下,
* 刷新对应 WebSocket session 的心跳时间。
* </p>
*
* <p>
* 拦截规则:
* <ul>
* <li>仅处理 STOMP SEND 帧</li>
* <li>仅命中配置的心跳 destination</li>
* </ul>
* </p>
*
* <p>
* 设计原则:
* <ul>
* <li>逻辑必须足够轻量</li>
* <li>不参与任何业务处理</li>
* <li>允许高频调用</li>
* </ul>
* </p>
*
* @author 孔余
* @since 2026-01-30
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketHeartbeatInterceptor implements ChannelInterceptor {
private final WebSocketSessionService webSocketSessionService;
private final WebSocketProperties webSocketProperties;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getCommand();
if (!StompCommand.SEND.equals(command)) {
return message;
}
String destination = accessor.getDestination();
String sessionId = accessor.getSessionId();
if (ObjectUtils.isEmpty(destination)
|| ObjectUtils.isEmpty(sessionId)
|| ObjectUtils.isEmpty(webSocketProperties.getHeartbeatDestination())) {
return message;
}
// 命中业务心跳 destination
if (destination.equals(webSocketProperties.getHeartbeatDestination())) {
webSocketSessionService.onHeartbeat(sessionId);
log.debug("【WebSocket 心跳续期】sessionId={}", sessionId);
// 心跳消息无需进入后续业务链路
return null;
}
return message;
}
}package io.github.atengk.interceptor;
import io.github.atengk.entity.StompUserPrincipal;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.stereotype.Component;
import java.security.Principal;
/**
* WebSocket 鉴权拦截器
*
* <p>功能说明:
* <ul>
* <li>CONNECT:WebSocket 连接阶段鉴权,并绑定 Principal</li>
* <li>SUBSCRIBE:订阅阶段进行资源级鉴权(如群 topic 权限)</li>
* </ul>
*
* <p>设计原则:
* <ul>
* <li>所有安全逻辑前置在 ChannelInterceptor 层,Controller 无感知</li>
* <li>通过绑定 Principal,保证后续 SEND / SUBSCRIBE 身份不可伪造</li>
* <li>鉴权失败直接抛出 MessagingException,中断连接或订阅</li>
* </ul>
*
* @author 孔余
* @since 2026-01-30
*/
@Slf4j
@Component
public class WebSocketAuthInterceptor implements ChannelInterceptor {
/**
* 群 topic 前缀(RabbitMQ STOMP 规范)
*/
private static final String GROUP_TOPIC_PREFIX = "/topic/group.";
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor == null || accessor.getCommand() == null) {
return message;
}
StompCommand command = accessor.getCommand();
if (StompCommand.CONNECT.equals(command)) {
handleConnect(accessor);
} else if (StompCommand.SUBSCRIBE.equals(command)) {
handleSubscribe(accessor);
}
return message;
}
/* ====================== CONNECT ====================== */
/**
* WebSocket CONNECT 阶段鉴权
*
* <p>处理逻辑:
* <ul>
* <li>提取 userId / Authorization</li>
* <li>校验 token 合法性</li>
* <li>绑定 Principal,贯穿整个 WebSocket 会话生命周期</li>
* </ul>
*/
private void handleConnect(StompHeaderAccessor accessor) {
String userId = extractRequiredHeader(accessor, "userId");
String token = extractRequiredHeader(accessor, "Authorization");
validateToken(userId, token);
accessor.setUser(new StompUserPrincipal(userId));
log.info("【WebSocket CONNECT】鉴权成功 userId={}", userId);
}
/**
* Token 校验
*
* <p>说明:
* <ul>
* <li>当前为示例实现</li>
* <li>可替换为 JWT / OAuth2 / Redis / RPC 鉴权</li>
* </ul>
*/
private void validateToken(String userId, String token) {
if (!"Bearer Admin@123".equals(token)) {
log.warn("【WebSocket CONNECT】token 校验失败 userId={}, token={}", userId, token);
throw new MessagingException("WebSocket 鉴权失败");
}
}
/* ====================== SUBSCRIBE ====================== */
/**
* WebSocket SUBSCRIBE 阶段鉴权
*
* <p>仅对群 topic 进行权限校验,防止非法监听敏感消息
*/
private void handleSubscribe(StompHeaderAccessor accessor) {
String destination = accessor.getDestination();
Principal principal = accessor.getUser();
if (destination == null || principal == null) {
return;
}
if (!destination.startsWith(GROUP_TOPIC_PREFIX)) {
return;
}
String groupId = destination.substring(GROUP_TOPIC_PREFIX.length());
String userId = principal.getName();
if (!isGroupMember(userId, groupId)) {
log.warn("【WebSocket SUBSCRIBE】无权订阅 groupId={} userId={}", groupId, userId);
throw new MessagingException("无权订阅该群");
}
log.info("【WebSocket SUBSCRIBE】订阅成功 groupId={} userId={}", groupId, userId);
}
/* ====================== Utils ====================== */
/**
* 提取必传 Header
*/
private String extractRequiredHeader(StompHeaderAccessor accessor, String headerName) {
String value = accessor.getFirstNativeHeader(headerName);
if (value == null || value.isEmpty()) {
log.warn("【WebSocket CONNECT】缺少必要 Header:{}", headerName);
throw new MessagingException("WebSocket 缺少必要参数:" + headerName);
}
return value;
}
/**
* 群成员校验
*
* <p>示例实现,实际应由:
* <ul>
* <li>数据库</li>
* <li>Redis</li>
* <li>远程服务</li>
* </ul>
* 提供
*/
private boolean isGroupMember(String userId, String groupId) {
if ("group-001".equals(groupId)) {
return "10001".equals(userId) || "10002".equals(userId);
}
return false;
}
}package io.github.atengk.listener;
import io.github.atengk.service.WebSocketSessionService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionConnectEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import java.security.Principal;
/**
* WebSocket 连接生命周期事件监听器
*
* <p>
* 负责监听 STOMP 连接 / 断开事件,
* 并将会话生命周期交由 {@link WebSocketSessionService} 统一管理。
* </p>
*
* @author 孔余
* @since 2026-01-30
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketEventListener {
private final WebSocketSessionService sessionService;
/**
* STOMP CONNECT 事件
*
* <p>
* 触发时机:
* - 客户端 CONNECT 帧通过鉴权
* - {@link io.github.atengk.interceptor.WebSocketAuthInterceptor}
* 已完成 Principal 绑定
* </p>
*/
@EventListener
public void handleConnect(SessionConnectEvent event) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = accessor.getSessionId();
Principal principal = accessor.getUser();
if (principal == null) {
log.warn("【WebSocket CONNECT】未获取到 Principal,sessionId={}", sessionId);
return;
}
String userId = principal.getName();
sessionService.onConnect(userId, sessionId);
log.info("【WebSocket CONNECT】userId={}, sessionId={}", userId, sessionId);
}
/**
* STOMP DISCONNECT 事件
*
* <p>
* 说明:
* - 该事件不一定可靠(浏览器异常断开可能不会触发)
* - 实际连接治理以 Redis 心跳清理为准
* </p>
*/
@EventListener
public void handleDisconnect(SessionDisconnectEvent event) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = accessor.getSessionId();
if (sessionId == null) {
return;
}
sessionService.onDisconnect(sessionId);
log.info("【WebSocket DISCONNECT】sessionId={}", sessionId);
}
}package io.github.atengk.config;
import io.github.atengk.interceptor.WebSocketAuthInterceptor;
import io.github.atengk.interceptor.WebSocketHeartbeatInterceptor;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/**
* WebSocket + STOMP + RabbitMQ(Broker Relay 模式)核心配置
*
* <p>架构说明:
* <ul>
* <li>使用 RabbitMQ STOMP Broker Relay 替代 Spring SimpleBroker</li>
* <li>支持多实例 Spring Boot 横向扩展</li>
* <li>消息完全由 MQ 承载,应用层无状态</li>
* </ul>
*
* <p>通道职责划分:
* <ul>
* <li>Application Destination:处理业务消息(/app/**)</li>
* <li>Broker Destination:由 RabbitMQ 分发(/topic、/queue)</li>
* </ul>
*
* @author 孔余
* @since 2026-01-30
*/
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private final WebSocketProperties webSocketProperties;
private final WebSocketHeartbeatInterceptor webSocketHeartbeatInterceptor;
private final WebSocketAuthInterceptor webSocketAuthInterceptor;
/**
* 配置 STOMP 消息代理(RabbitMQ Broker Relay)
*
* <p>说明:
* <ul>
* <li>不使用 enableSimpleBroker(单机、内存实现)</li>
* <li>Broker Relay 适用于生产环境与集群部署</li>
* <li>系统心跳由 Spring 与 RabbitMQ 之间维护</li>
* </ul>
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost(webSocketProperties.getBrokerRelay().getRelayHost())
.setRelayPort(webSocketProperties.getBrokerRelay().getRelayPort())
.setClientLogin(webSocketProperties.getBrokerRelay().getClientLogin())
.setClientPasscode(webSocketProperties.getBrokerRelay().getClientPasscode())
.setSystemLogin(webSocketProperties.getBrokerRelay().getSystemLogin())
.setSystemPasscode(webSocketProperties.getBrokerRelay().getSystemPasscode())
.setSystemHeartbeatSendInterval(
webSocketProperties.getBrokerRelay().getSystemHeartbeatSendInterval())
.setSystemHeartbeatReceiveInterval(
webSocketProperties.getBrokerRelay().getSystemHeartbeatReceiveInterval());
registry.setApplicationDestinationPrefixes(
webSocketProperties.getApplicationDestinationPrefix());
registry.setUserDestinationPrefix(
webSocketProperties.getUserDestinationPrefix());
}
/**
* 注册 WebSocket STOMP 端点
*
* <p>说明:
* <ul>
* <li>支持 SockJS,兼容老旧浏览器</li>
* <li>allowedOriginPatterns 用于支持多域名部署</li>
* </ul>
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint(webSocketProperties.getUrl())
.setAllowedOriginPatterns(webSocketProperties.getAllowedOrigins())
.withSockJS();
}
/**
* 配置客户端 Inbound 通道拦截器
*
* <p>执行顺序(非常重要):
* <ol>
* <li>WebSocketHeartbeatInterceptor:心跳短路,不进入业务层</li>
* <li>WebSocketAuthInterceptor:CONNECT / SUBSCRIBE 鉴权</li>
* </ol>
*
* <p>设计原则:
* <ul>
* <li>所有治理逻辑前置在通道层</li>
* <li>Controller 只关心业务</li>
* </ul>
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(
webSocketHeartbeatInterceptor,
webSocketAuthInterceptor
);
}
}package io.github.atengk.controller;
import com.alibaba.fastjson2.JSONObject;
import io.github.atengk.dto.GroupMessageDTO;
import io.github.atengk.dto.PrivateMessageDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
import java.security.Principal;
/**
* WebSocket 聊天消息控制器
*
* <p>职责说明:
* <ul>
* <li>只处理业务级消息 SEND</li>
* <li>不处理心跳、不处理 CONNECT / SUBSCRIBE</li>
* <li>不信任客户端传入的 userId,一律使用 Principal</li>
* </ul>
*
* <p>设计原则:
* <ul>
* <li>安全校验前置(拦截器 + Controller 二次兜底)</li>
* <li>消息路由交由 Broker(RabbitMQ)完成</li>
* <li>Controller 本身保持无状态</li>
* </ul>
*
* @author 孔余
* @since 2026-01-30
*/
@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {
private final SimpMessagingTemplate messagingTemplate;
/* ====================== 公共消息 ====================== */
/**
* 公共广播消息
*
* <p>说明:
* <ul>
* <li>所有在线用户均可收到</li>
* <li>适用于公告、系统广播、弹幕等场景</li>
* </ul>
*
* @param message 消息内容
* @param principal 当前连接用户身份(来自 CONNECT 阶段绑定)
*/
@MessageMapping("/public.send")
public void sendPublicMessage(
@Payload String message,
Principal principal
) {
String fromUserId = principal.getName();
// 权限校验
validatePublicSendPermission(fromUserId);
log.info("【公共消息】from={} content={}", fromUserId, message);
messagingTemplate.convertAndSend(
"/topic/public",
buildPublicMessage(fromUserId, message)
);
}
/* ====================== 私聊消息 ====================== */
/**
* 私聊消息发送
*
* <p>说明:
* <ul>
* <li>基于 user destination(/user/{id}/queue/**)</li>
* <li>消息由 Broker 精确投递到目标用户</li>
* </ul>
*
* @param dto 私聊消息 DTO
* @param principal 当前连接用户身份
*/
@MessageMapping("/private.send")
public void sendPrivateMessage(
@Payload PrivateMessageDTO dto,
Principal principal
) {
String fromUserId = principal.getName();
String toUserId = dto.getToUserId();
// 业务级私聊权限校验
validatePrivateChatPermission(fromUserId, toUserId);
log.info("【私聊消息】from={} → to={} content={}",
fromUserId, toUserId, dto.getContent());
messagingTemplate.convertAndSendToUser(
toUserId,
"/queue/message",
buildPrivateMessage(fromUserId, dto.getContent())
);
}
/* ====================== 群聊消息 ====================== */
/**
* 群消息发送
*
* <p>说明:
* <ul>
* <li>一个群对应一个 topic</li>
* <li>消息由 Broker 广播给所有订阅该群的成员</li>
* </ul>
*
* @param dto 群消息 DTO
* @param principal 当前连接用户身份
*/
@MessageMapping("/group.send")
public void sendGroupMessage(
@Payload GroupMessageDTO dto,
Principal principal
) {
String fromUserId = principal.getName();
String groupId = dto.getGroupId();
// 业务级群权限校验(SUBSCRIBE 之外的二次兜底)
validateGroupChatPermission(fromUserId, groupId);
log.info("【群消息】group={} from={} content={}",
groupId, fromUserId, dto.getContent());
messagingTemplate.convertAndSend(
"/topic/group." + groupId,
buildGroupMessage(fromUserId, groupId, dto.getContent())
);
}
/* ====================== 权限校验(示例) ====================== */
/**
* 校验用户是否有权限发送公共广播
*/
private void validatePublicSendPermission(String userId) {
// TODO: 真实项目调用 Service / DB / Redis
if (!mockCanSendPublic(userId)) {
log.warn("【公共消息】用户无权发送 userId={}", userId);
throw new IllegalArgumentException("无权发送公共消息");
}
}
/**
* 模拟权限判断
*/
private boolean mockCanSendPublic(String userId) {
// 比如只有 userId=10001 可以发送
return "10001".equals(userId);
}
/**
* 私聊权限校验
*
* <p>真实场景可校验:
* <ul>
* <li>是否好友</li>
* <li>是否被拉黑</li>
* <li>是否同租户 / 同组织</li>
* <li>是否允许陌生人私聊</li>
* </ul>
*
* @param fromUserId 发送方用户 ID
* @param toUserId 接收方用户 ID
*/
private void validatePrivateChatPermission(String fromUserId, String toUserId) {
if (fromUserId.equals(toUserId)) {
throw new IllegalArgumentException("不能给自己发送私聊消息");
}
// mock 示例规则
if ("10001".equals(fromUserId) && "10002".equals(toUserId)) {
return;
}
log.warn("【私聊权限拒绝】from={} to={}", fromUserId, toUserId);
throw new IllegalArgumentException("无权限向该用户发送私聊消息");
}
/**
* 群聊权限校验
*
* <p>真实场景可校验:
* <ul>
* <li>是否群成员</li>
* <li>是否被禁言</li>
* <li>群是否已解散</li>
* </ul>
*
* @param userId 用户 ID
* @param groupId 群 ID
*/
private void validateGroupChatPermission(String userId, String groupId) {
// mock 示例规则
if ("group-001".equals(groupId) && ("10001".equals(userId) || "10002".equals(userId))) {
return;
}
log.warn("【群聊权限拒绝】userId={} groupId={}", userId, groupId);
throw new IllegalArgumentException("无权限向该群发送消息");
}
/* ====================== 消息构造 ====================== */
/**
* 构造公共 / 私聊消息体
*/
private JSONObject buildPrivateMessage(String fromUserId, String content) {
return JSONObject.of(
"fromUserId", fromUserId,
"content", content,
"timestamp", System.currentTimeMillis()
);
}
/**
* 构造公共消息体
*/
private JSONObject buildPublicMessage(String fromUserId, String content) {
return JSONObject.of(
"fromUserId", fromUserId,
"content", content,
"timestamp", System.currentTimeMillis()
);
}
/**
* 构造群消息体
*/
private JSONObject buildGroupMessage(String fromUserId, String groupId, String content) {
return JSONObject.of(
"groupId", groupId,
"fromUserId", fromUserId,
"content", content,
"timestamp", System.currentTimeMillis()
);
}
}package io.github.atengk.controller;
import io.github.atengk.service.WebSocketSessionService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
/**
* WebSocket 会话与在线状态监控接口
*
* <p>主要用途:
* <ul>
* <li>提供 WebSocket 在线用户与连接统计</li>
* <li>用于运维监控、管理后台、健康检查</li>
* </ul>
*
* <p>设计说明:
* <ul>
* <li>只读接口,不涉及任何写操作</li>
* <li>数据来源于 Redis,支持集群环境</li>
* </ul>
*
* @author 孔余
* @since 2026-01-30
*/
@RestController
@RequestMapping("/websocket-session")
@RequiredArgsConstructor
public class WebSocketSessionController {
private final WebSocketSessionService webSocketSessionService;
/**
* 获取当前 WebSocket 在线统计信息
*
* <p>返回数据说明:
* <ul>
* <li>totalUsers:当前在线用户数(去重)</li>
* <li>totalConnections:当前在线 WebSocket 连接总数</li>
* <li>avgConnectionsPerUser:人均连接数(用于发现多端登录情况)</li>
* <li>timestamp:统计时间点(Unix 秒级时间戳)</li>
* </ul>
*
* <p>使用场景:
* <ul>
* <li>管理后台实时展示在线人数</li>
* <li>监控系统采集指标</li>
* <li>压测 / 容量评估</li>
* </ul>
*
* @return WebSocket 在线统计数据
*/
@GetMapping("/online-stats")
public Map<String, Object> getOnlineStats() {
return webSocketSessionService.getOnlineStats();
}
}编辑 resources\static\index.html ,后续调试STOMP的一个页面
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>STOMP WebSocket 调试台</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.5.0/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
background: #fafafa;
}
h1 { margin-bottom: 10px; }
.status {
padding: 8px 12px;
margin-bottom: 15px;
border-radius: 4px;
font-weight: bold;
}
.connected { background: #e6fffa; color: #065f46; }
.disconnected { background: #fee2e2; color: #7f1d1d; }
.section {
margin-top: 15px;
padding: 12px;
border: 1px solid #ddd;
background: #fff;
}
input, button {
padding: 6px;
margin: 4px 0;
}
button {
cursor: pointer;
}
.log {
font-size: 13px;
margin-top: 10px;
background: #111827;
color: #e5e7eb;
padding: 10px;
height: 280px;
overflow-y: auto;
}
.log div { margin-bottom: 4px; }
.log .sys { color: #93c5fd; }
.log .send { color: #facc15; }
.log .recv { color: #86efac; }
.log .warn { color: #fca5a5; }
</style>
</head>
<body>
<h1>STOMP WebSocket 调试台</h1>
<div id="status" class="status disconnected">🔴 未连接</div>
<div class="section">
<h3>连接信息</h3>
<input id="userId" value="10001" placeholder="userId"/>
<input id="username" value="阿腾" placeholder="username"/>
<input id="token" value="Admin@123" placeholder="token"/>
<br/>
<button onclick="connect()">连接</button>
<button onclick="disconnect()">断开</button>
</div>
<div class="section">
<h3>广播消息</h3>
<input id="messageToAll" placeholder="发送给所有用户"/>
<button onclick="sendToAll()">发送</button>
</div>
<div class="section">
<h3>私聊</h3>
<input id="targetUserId" placeholder="目标用户ID"/>
<input id="messageToUser" placeholder="私聊内容"/>
<button onclick="sendToUser()">发送</button>
</div>
<div class="section">
<h3>群聊</h3>
<input id="groupId" placeholder="群ID"/>
<input id="messageToGroup" placeholder="群消息"/>
<br/>
<button onclick="sendToGroup()">发送</button>
<button onclick="subscribeGroupByInput()">订阅群</button>
<button onclick="unsubscribeGroupByInput()">退订群</button>
</div>
<div class="section">
<h3>日志</h3>
<div id="log" class="log"></div>
</div>
<script>
let socket;
let stompClient;
let heartbeatTimer;
const WS_URL = "http://localhost:18002/ws";
const HEARTBEAT_INTERVAL = 30_000;
const groupSubscriptions = new Map();
function setStatus(connected) {
const el = document.getElementById("status");
el.className = "status " + (connected ? "connected" : "disconnected");
el.textContent = connected ? "🟢 已连接" : "🔴 未连接";
}
function log(msg, type = "sys") {
const el = document.getElementById("log");
const div = document.createElement("div");
div.className = type;
div.textContent = `[${new Date().toLocaleTimeString()}] ${msg}`;
el.appendChild(div);
el.scrollTop = el.scrollHeight;
}
function connect() {
if (stompClient?.connected) {
log("已连接,无需重复连接", "warn");
return;
}
socket = new SockJS(WS_URL);
stompClient = Stomp.over(socket);
stompClient.debug = null;
stompClient.connect({
userId: userId.value,
username: username.value,
Authorization: "Bearer " + token.value
}, () => {
setStatus(true);
log("WebSocket 连接成功");
subscribeBase();
startHeartbeat();
}, err => {
log("连接失败:" + err, "warn");
});
}
function disconnect() {
stopHeartbeat();
groupSubscriptions.forEach(s => s.unsubscribe());
groupSubscriptions.clear();
stompClient?.disconnect(() => {
setStatus(false);
log("连接已断开");
});
}
function subscribeBase() {
stompClient.subscribe("/topic/public", msg => {
const d = JSON.parse(msg.body);
log(`📢 公共 ${d.fromUserId}:${d.content}`, "recv");
});
stompClient.subscribe("/user/queue/message", msg => {
const d = JSON.parse(msg.body);
log(`👤 私聊 ${d.fromUserId}:${d.content}`, "recv");
});
log("基础订阅完成");
}
function subscribeGroupByInput() {
const gid = groupId.value;
if (!gid || groupSubscriptions.has(gid)) return;
const sub = stompClient.subscribe(`/topic/group.${gid}`, msg => {
const d = JSON.parse(msg.body);
log(`👥 群[${gid}] ${d.fromUserId}:${d.content}`, "recv");
});
groupSubscriptions.set(gid, sub);
log(`订阅群 ${gid}`);
}
function unsubscribeGroupByInput() {
const gid = groupId.value;
const sub = groupSubscriptions.get(gid);
if (!sub) return;
sub.unsubscribe();
groupSubscriptions.delete(gid);
log(`退订群 ${gid}`);
}
function startHeartbeat() {
heartbeatTimer = setInterval(() => {
stompClient?.connected &&
stompClient.send("/app/heartbeat", {}, "");
log("心跳发送", "sys");
}, HEARTBEAT_INTERVAL);
}
function stopHeartbeat() {
clearInterval(heartbeatTimer);
}
function sendToAll() {
stompClient.send("/app/public.send", {}, messageToAll.value);
log("广播:" + messageToAll.value, "send");
messageToAll.value = "";
}
function sendToUser() {
stompClient.send("/app/private.send", {}, JSON.stringify({
toUserId: targetUserId.value,
content: messageToUser.value
}));
log("私聊 → " + targetUserId.value, "send");
messageToUser.value = "";
}
function sendToGroup() {
stompClient.send("/app/group.send", {}, JSON.stringify({
groupId: groupId.value,
content: messageToGroup.value
}));
log("群消息 → " + groupId.value, "send");
messageToGroup.value = "";
}
window.onbeforeunload = disconnect;
</script>
</body>
</html>