RuoYi-Vue-Plus SSE推送:实时消息通知机制
在现代Web应用中,实时消息推送已成为提升用户体验的关键技术。RuoYi-Vue-Plus基于Server-Sent Events(SSE)技术构建了一套完整的实时消息通知机制,为多租户后台管理系统提供了高效、可靠的实时通信能力。SSE(Server-Sent Events)是一种基于HTTP的服务器推送技术,允许服务器主动向客户端发送事件流数据。相比WebSocket,SSE具有以下优势:...
RuoYi-Vue-Plus SSE推送:实时消息通知机制
概述
在现代Web应用中,实时消息推送已成为提升用户体验的关键技术。RuoYi-Vue-Plus基于Server-Sent Events(SSE)技术构建了一套完整的实时消息通知机制,为多租户后台管理系统提供了高效、可靠的实时通信能力。
SSE(Server-Sent Events)是一种基于HTTP的服务器推送技术,允许服务器主动向客户端发送事件流数据。相比WebSocket,SSE具有以下优势:
- 简单易用:基于标准HTTP协议,无需复杂握手过程
- 自动重连:内置连接恢复机制
- 轻量级:协议开销小,适合消息推送场景
- 浏览器原生支持:现代浏览器都支持SSE
核心架构设计
RuoYi-Vue-Plus的SSE模块采用分层架构设计,确保系统的高可用性和扩展性:
核心组件说明
| 组件名称 | 职责描述 | 关键特性 |
|---|---|---|
SseEmitterManager |
SSE连接管理核心 | 连接管理、消息分发、Redis集成 |
SseController |
HTTP接口控制器 | 提供RESTful API端点 |
SseMessageUtils |
工具类封装 | 静态方法调用,简化使用 |
SseTopicListener |
消息订阅监听器 | Redis消息订阅与分发 |
详细实现解析
1. 连接管理机制
SseEmitterManager 是SSE功能的核心管理器,负责维护用户连接状态:
// 用户连接存储结构
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
public SseEmitter connect(Long userId, String token) {
// 每个用户可以建立多个SSE连接,通过token区分
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(
userId, k -> new ConcurrentHashMap<>());
// 创建SSE发射器,超时时间设置为24小时
SseEmitter emitter = new SseEmitter(86400000L);
emitters.put(token, emitter);
// 设置连接生命周期回调
emitter.onCompletion(() -> cleanupConnection(userId, token));
emitter.onTimeout(() -> cleanupConnection(userId, token));
emitter.onError((e) -> cleanupConnection(userId, token));
return emitter;
}
2. 消息分发策略
系统支持多种消息分发模式:
3. Redis集群支持
通过Redis Pub/Sub实现跨实例的消息广播:
private final static String SSE_TOPIC = "global:sse";
// 订阅Redis消息
public void subscribeMessage(Consumer<SseMessageDto> consumer) {
RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer);
}
// 发布消息到Redis
public void publishMessage(SseMessageDto sseMessageDto) {
RedisUtils.publish(SSE_TOPIC, sseMessageDto, consumer -> {
log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",
SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage());
});
}
使用指南
1. 配置启用
在 application.yml 中配置SSE功能:
sse:
enabled: true # 启用SSE功能
path: /sse # SSE接口路径
2. 客户端连接
前端建立SSE连接示例:
// 建立SSE连接
const eventSource = new EventSource('/sse/connect');
// 监听消息事件
eventSource.addEventListener('message', function(event) {
const data = JSON.parse(event.data);
console.log('收到消息:', data);
// 处理消息逻辑
});
// 监听错误事件
eventSource.onerror = function(error) {
console.error('SSE连接错误:', error);
// 自动重连逻辑
};
3. 服务端消息发送
使用工具类发送消息:
// 向指定用户发送消息
SseMessageUtils.sendMessage(123L, "您的订单已处理完成");
// 向所有用户广播消息
SseMessageUtils.publishAll("系统维护通知:今晚23:00-24:00进行系统升级");
// 使用DTO发送结构化消息
SseMessageDto messageDto = new SseMessageDto();
messageDto.setUserIds(List.of(123L, 456L));
messageDto.setMessage("{\"type\":\"alert\",\"content\":\"重要通知\"}");
SseMessageUtils.publishMessage(messageDto);
4. REST API接口
系统提供完整的REST API接口:
| 接口路径 | HTTP方法 | 功能描述 | 参数说明 |
|---|---|---|---|
/sse/connect |
GET | 建立SSE连接 | 无 |
/sse/close |
GET | 关闭SSE连接 | 无 |
/sse/send |
GET | 向特定用户发送消息 | userId, msg |
/sse/sendAll |
GET | 向所有用户发送消息 | msg |
性能优化策略
1. 连接管理优化
// 使用ConcurrentHashMap确保线程安全
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
// 定时清理无效连接
@Scheduled(fixedRate = 300000) // 每5分钟执行一次
public void cleanupInactiveConnections() {
USER_TOKEN_EMITTERS.entrySet().removeIf(entry ->
entry.getValue().isEmpty());
}
2. 消息压缩策略
对于大量消息推送场景,建议启用消息压缩:
server:
compression:
enabled: true
mime-types: text/event-stream,application/json
min-response-size: 1024
3. 连接超时配置
根据业务需求调整连接超时时间:
// 生产环境建议配置(单位:毫秒)
SseEmitter emitter = new SseEmitter(1800000L); // 30分钟
// 开发环境配置
SseEmitter emitter = new SseEmitter(300000L); // 5分钟
典型应用场景
1. 实时通知中心
2. 在线用户状态同步
// 用户上线通知
public void onUserLogin(Long userId) {
SseMessageDto message = new SseMessageDto();
message.setMessage("{\"type\":\"user_online\",\"userId\":" + userId + "}");
SseMessageUtils.publishAll(message);
}
// 用户下线通知
public void onUserLogout(Long userId) {
SseMessageDto message = new SseMessageDto();
message.setMessage("{\"type\":\"user_offline\",\"userId\":" + userId + "}");
SseMessageUtils.publishAll(message);
}
3. 业务流程状态更新
// 订单状态变更通知
public void notifyOrderStatus(Long orderId, String status, Long userId) {
String message = String.format(
"{\"orderId\":%d,\"status\":\"%s\",\"timestamp\":%d}",
orderId, status, System.currentTimeMillis());
SseMessageUtils.sendMessage(userId, message);
}
故障处理与监控
1. 连接异常处理
emitter.onError((e) -> {
log.warn("SSE连接异常 userId:{} token:{} error:{}",
userId, token, e.getMessage());
cleanupConnection(userId, token);
// 记录异常指标
Metrics.counter("sse.connection.errors").increment();
});
2. 监控指标采集
建议采集的关键监控指标:
| 指标名称 | 描述 | 告警阈值 |
|---|---|---|
sse.connections.active |
活跃连接数 | > 10000 |
sse.messages.sent |
消息发送速率 | > 1000/秒 |
sse.connection.errors |
连接错误数 | > 100/分钟 |
sse.redis.publish.latency |
Redis发布延迟 | > 100ms |
最佳实践建议
1. 消息格式规范
推荐使用JSON格式的消息体:
{
"type": "notification",
"title": "系统通知",
"content": "您的申请已审核通过",
"timestamp": 1640995200000,
"priority": "high",
"actionUrl": "/notification/detail/123"
}
2. 连接数控制
对于大规模部署,建议实施连接数限制:
// 最大连接数限制
private static final int MAX_CONNECTIONS_PER_USER = 3;
private static final int MAX_TOTAL_CONNECTIONS = 10000;
public SseEmitter connect(Long userId, String token) {
if (USER_TOKEN_EMITTERS.size() >= MAX_TOTAL_CONNECTIONS) {
throw new RuntimeException("SSE连接数已达上限");
}
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
if (emitters != null && emitters.size() >= MAX_CONNECTIONS_PER_USER) {
throw new RuntimeException("用户SSE连接数已达上限");
}
// ... 正常连接逻辑
}
3. 安全考虑
- 使用Sa-Token进行身份验证
- 实施CSRF保护
- 限制消息频率防止滥用
- 敏感消息内容加密传输
总结
RuoYi-Vue-Plus的SSE推送机制提供了一个强大而灵活的实时消息通知解决方案。通过精心的架构设计和性能优化,该系统能够支持大规模并发连接和高效的消息分发,满足企业级应用对实时通信的需求。
关键优势:
- ✅ 基于标准SSE协议,兼容性好
- ✅ 支持集群部署,通过Redis实现消息广播
- ✅ 提供完整的工具类和API接口
- ✅ 具备良好的扩展性和可维护性
- ✅ 内置连接管理和异常处理机制
无论是系统通知、业务流程状态更新还是实时数据推送,RuoYi-Vue-Plus的SSE模块都能提供可靠的技术支撑,为构建现代化的实时Web应用奠定坚实基础。
更多推荐



所有评论(0)