RuoYi-Vue-Plus SSE推送:实时消息通知机制

【免费下载链接】RuoYi-Vue-Plus 多租户后台管理系统 重写RuoYi-Vue所有功能 集成 Sa-Token、Mybatis-Plus、Warm-Flow工作流、SpringDoc、Hutool、OSS 定期同步 【免费下载链接】RuoYi-Vue-Plus 项目地址: https://gitcode.com/dromara/RuoYi-Vue-Plus

概述

在现代Web应用中,实时消息推送已成为提升用户体验的关键技术。RuoYi-Vue-Plus基于Server-Sent Events(SSE)技术构建了一套完整的实时消息通知机制,为多租户后台管理系统提供了高效、可靠的实时通信能力。

SSE(Server-Sent Events)是一种基于HTTP的服务器推送技术,允许服务器主动向客户端发送事件流数据。相比WebSocket,SSE具有以下优势:

  • 简单易用:基于标准HTTP协议,无需复杂握手过程
  • 自动重连:内置连接恢复机制
  • 轻量级:协议开销小,适合消息推送场景
  • 浏览器原生支持:现代浏览器都支持SSE

核心架构设计

RuoYi-Vue-Plus的SSE模块采用分层架构设计,确保系统的高可用性和扩展性:

mermaid

核心组件说明

组件名称 职责描述 关键特性
SseEmitterManager SSE连接管理核心 连接管理、消息分发、Redis集成
SseController HTTP接口控制器 提供RESTful API端点
SseMessageUtils 工具类封装 静态方法调用,简化使用
SseTopicListener 消息订阅监听器 Redis消息订阅与分发

详细实现解析

1. 连接管理机制

SseEmitterManager 是SSE功能的核心管理器,负责维护用户连接状态:

// 用户连接存储结构
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();

public SseEmitter connect(Long userId, String token) {
    // 每个用户可以建立多个SSE连接,通过token区分
    Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(
        userId, k -> new ConcurrentHashMap<>());
    
    // 创建SSE发射器,超时时间设置为24小时
    SseEmitter emitter = new SseEmitter(86400000L);
    emitters.put(token, emitter);
    
    // 设置连接生命周期回调
    emitter.onCompletion(() -> cleanupConnection(userId, token));
    emitter.onTimeout(() -> cleanupConnection(userId, token));
    emitter.onError((e) -> cleanupConnection(userId, token));
    
    return emitter;
}

2. 消息分发策略

系统支持多种消息分发模式:

mermaid

3. Redis集群支持

通过Redis Pub/Sub实现跨实例的消息广播:

private final static String SSE_TOPIC = "global:sse";

// 订阅Redis消息
public void subscribeMessage(Consumer<SseMessageDto> consumer) {
    RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer);
}

// 发布消息到Redis
public void publishMessage(SseMessageDto sseMessageDto) {
    RedisUtils.publish(SSE_TOPIC, sseMessageDto, consumer -> {
        log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",
            SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage());
    });
}

使用指南

1. 配置启用

application.yml 中配置SSE功能:

sse:
  enabled: true          # 启用SSE功能
  path: /sse            # SSE接口路径

2. 客户端连接

前端建立SSE连接示例:

// 建立SSE连接
const eventSource = new EventSource('/sse/connect');

// 监听消息事件
eventSource.addEventListener('message', function(event) {
    const data = JSON.parse(event.data);
    console.log('收到消息:', data);
    // 处理消息逻辑
});

// 监听错误事件
eventSource.onerror = function(error) {
    console.error('SSE连接错误:', error);
    // 自动重连逻辑
};

3. 服务端消息发送

使用工具类发送消息:

// 向指定用户发送消息
SseMessageUtils.sendMessage(123L, "您的订单已处理完成");

// 向所有用户广播消息  
SseMessageUtils.publishAll("系统维护通知:今晚23:00-24:00进行系统升级");

// 使用DTO发送结构化消息
SseMessageDto messageDto = new SseMessageDto();
messageDto.setUserIds(List.of(123L, 456L));
messageDto.setMessage("{\"type\":\"alert\",\"content\":\"重要通知\"}");
SseMessageUtils.publishMessage(messageDto);

4. REST API接口

系统提供完整的REST API接口:

接口路径 HTTP方法 功能描述 参数说明
/sse/connect GET 建立SSE连接
/sse/close GET 关闭SSE连接
/sse/send GET 向特定用户发送消息 userId, msg
/sse/sendAll GET 向所有用户发送消息 msg

性能优化策略

1. 连接管理优化

// 使用ConcurrentHashMap确保线程安全
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();

// 定时清理无效连接
@Scheduled(fixedRate = 300000) // 每5分钟执行一次
public void cleanupInactiveConnections() {
    USER_TOKEN_EMITTERS.entrySet().removeIf(entry -> 
        entry.getValue().isEmpty());
}

2. 消息压缩策略

对于大量消息推送场景,建议启用消息压缩:

server:
  compression:
    enabled: true
    mime-types: text/event-stream,application/json
    min-response-size: 1024

3. 连接超时配置

根据业务需求调整连接超时时间:

// 生产环境建议配置(单位:毫秒)
SseEmitter emitter = new SseEmitter(1800000L); // 30分钟

// 开发环境配置  
SseEmitter emitter = new SseEmitter(300000L);  // 5分钟

典型应用场景

1. 实时通知中心

mermaid

2. 在线用户状态同步

// 用户上线通知
public void onUserLogin(Long userId) {
    SseMessageDto message = new SseMessageDto();
    message.setMessage("{\"type\":\"user_online\",\"userId\":" + userId + "}");
    SseMessageUtils.publishAll(message);
}

// 用户下线通知
public void onUserLogout(Long userId) {
    SseMessageDto message = new SseMessageDto();
    message.setMessage("{\"type\":\"user_offline\",\"userId\":" + userId + "}");
    SseMessageUtils.publishAll(message);
}

3. 业务流程状态更新

// 订单状态变更通知
public void notifyOrderStatus(Long orderId, String status, Long userId) {
    String message = String.format(
        "{\"orderId\":%d,\"status\":\"%s\",\"timestamp\":%d}",
        orderId, status, System.currentTimeMillis());
    SseMessageUtils.sendMessage(userId, message);
}

故障处理与监控

1. 连接异常处理

emitter.onError((e) -> {
    log.warn("SSE连接异常 userId:{} token:{} error:{}", 
        userId, token, e.getMessage());
    cleanupConnection(userId, token);
    
    // 记录异常指标
    Metrics.counter("sse.connection.errors").increment();
});

2. 监控指标采集

建议采集的关键监控指标:

指标名称 描述 告警阈值
sse.connections.active 活跃连接数 > 10000
sse.messages.sent 消息发送速率 > 1000/秒
sse.connection.errors 连接错误数 > 100/分钟
sse.redis.publish.latency Redis发布延迟 > 100ms

最佳实践建议

1. 消息格式规范

推荐使用JSON格式的消息体:

{
  "type": "notification",
  "title": "系统通知",
  "content": "您的申请已审核通过",
  "timestamp": 1640995200000,
  "priority": "high",
  "actionUrl": "/notification/detail/123"
}

2. 连接数控制

对于大规模部署,建议实施连接数限制:

// 最大连接数限制
private static final int MAX_CONNECTIONS_PER_USER = 3;
private static final int MAX_TOTAL_CONNECTIONS = 10000;

public SseEmitter connect(Long userId, String token) {
    if (USER_TOKEN_EMITTERS.size() >= MAX_TOTAL_CONNECTIONS) {
        throw new RuntimeException("SSE连接数已达上限");
    }
    
    Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
    if (emitters != null && emitters.size() >= MAX_CONNECTIONS_PER_USER) {
        throw new RuntimeException("用户SSE连接数已达上限");
    }
    
    // ... 正常连接逻辑
}

3. 安全考虑

  • 使用Sa-Token进行身份验证
  • 实施CSRF保护
  • 限制消息频率防止滥用
  • 敏感消息内容加密传输

总结

RuoYi-Vue-Plus的SSE推送机制提供了一个强大而灵活的实时消息通知解决方案。通过精心的架构设计和性能优化,该系统能够支持大规模并发连接和高效的消息分发,满足企业级应用对实时通信的需求。

关键优势:

  • ✅ 基于标准SSE协议,兼容性好
  • ✅ 支持集群部署,通过Redis实现消息广播
  • ✅ 提供完整的工具类和API接口
  • ✅ 具备良好的扩展性和可维护性
  • ✅ 内置连接管理和异常处理机制

无论是系统通知、业务流程状态更新还是实时数据推送,RuoYi-Vue-Plus的SSE模块都能提供可靠的技术支撑,为构建现代化的实时Web应用奠定坚实基础。

【免费下载链接】RuoYi-Vue-Plus 多租户后台管理系统 重写RuoYi-Vue所有功能 集成 Sa-Token、Mybatis-Plus、Warm-Flow工作流、SpringDoc、Hutool、OSS 定期同步 【免费下载链接】RuoYi-Vue-Plus 项目地址: https://gitcode.com/dromara/RuoYi-Vue-Plus

更多推荐