高级 Python Web 开发:利用 FastAPI 构建高效的服务端事件(SSE)实时数据推送

目录
  1. 🎉 服务端事件(SSE)概述与原理
  2. 📡 FastAPI 实现 SSE 数据推送
  3. 🔄 实时更新前端界面
  4. 🔧 SSE 的性能优化与并发控制
  5. 🔒 SSE 的安全性与认证机制

1. 🎉 服务端事件(SSE)概述与原理

服务端事件(Server-Sent Events,SSE)是一种基于 HTTP 协议的实时通信技术,允许服务器通过单向连接向客户端持续推送数据。它使用 text/event-stream 内容类型,并通过浏览器原生的 EventSource 接口接收数据。SSE 适用于实时通知、日志流、状态监控等场景。

特点:

  • 建立的是单向连接,服务器推送,客户端只读;
  • 使用标准 HTTP,兼容性高;
  • 支持自动重连和事件标识符(Last-Event-ID);
  • 相较于 WebSocket 更适合广播、节流的场景。

2. 📡 FastAPI 实现 SSE 数据推送

依赖库:sse-starlette(兼容 FastAPI)

from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio

app = FastAPI()

async def event_generator():
    count = 0
    while True:
        yield {"data": f"第 {count} 条数据"}
        count += 1
        await asyncio.sleep(1)

@app.get("/events")
async def sse():
    return EventSourceResponse(event_generator())

说明:

  • EventSourceResponse 会自动设置响应头 Content-Type: text/event-stream
  • 事件格式:每条消息必须以 data: 内容\n\n 结尾
  • 支持传递 event, id, retry 字段,扩展性强

3. 🔄 实时更新前端界面

<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <title>SSE 实时数据</title>
</head>
<body>
    <h1>实时消息</h1>
    <div id="messages"></div>
    <script>
        const evtSource = new EventSource("/events");
        const container = document.getElementById("messages");

        evtSource.onmessage = function(event) {
            const p = document.createElement("p");
            p.textContent = event.data;
            container.appendChild(p);
        };
    </script>
</body>
</html>

4. 🔧 SSE 的性能优化与并发控制

连接管理(避免资源泄露):
from starlette.requests import Request

@app.get("/events")
async def sse(request: Request):
    async def event_stream():
        i = 0
        while not await request.is_disconnected():
            yield {"data": f"更新 {i}"}
            i += 1
            await asyncio.sleep(1)
        print("客户端断开连接")

    return EventSourceResponse(event_stream())
限流与连接控制:
from fastapi import HTTPException

MAX_CONNECTIONS = 1000
connections = set()

@app.get("/events")
async def sse(request: Request):
    if len(connections) >= MAX_CONNECTIONS:
        raise HTTPException(503, detail="服务器过载")

    async def stream():
        connections.add(request.client)
        try:
            i = 0
            while not await request.is_disconnected():
                yield {"data": f"推送 {i}"}
                i += 1
                await asyncio.sleep(1)
        finally:
            connections.remove(request.client)

    return EventSourceResponse(stream())
多通道分流:
@app.get("/stocks")
async def stock_stream():
    async def gen():
        while True:
            yield {"event": "stocks", "data": "AAPL: 189.2"}
            await asyncio.sleep(2)
    return EventSourceResponse(gen())

5. 🔒 SSE 的安全性与认证机制

身份认证(Token):
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def verify_token(token: str):
    return token == "valid-token"

@app.get("/events")
async def sse(token: str = Depends(oauth2_scheme)):
    if not verify_token(token):
        raise HTTPException(401, detail="未授权")
    return EventSourceResponse(event_generator())
HTTPS 与传输加密:
  • 确保服务部署在 TLS(HTTPS)下,避免中间人攻击;
  • 服务器应配置跨域策略(CORS)限制来源;
  • 可使用 JWT 携带身份并在服务端验证;

SSE 是后端向前端推送实时数据的理想选择,搭配 FastAPI 的异步特性,构建简单、性能优、易维护的实时系统。适合面向状态更新广播、单用户实时提示等场景,比 WebSocket 更轻量,部署更省心。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐