高级 Python Web 开发:利用 FastAPI 构建高效的服务端事件(SSE)实时数据推送
服务端事件(Server-Sent Events,简称 SSE)是一种基于 HTTP 协议的技术,允许服务器通过单向通道向客户端推送实时更新。与 WebSocket 的双向通信不同,SSE 主要是服务器向客户端单方向推送数据,适用于实时更新的场景,如新闻推送、社交媒体通知、实时数据流等。SSE 使用了 HTTP 协议,数据通过格式发送,客户端(如浏览器)通过对象来接收这些数据流。SSE 的实现简单
·
高级 Python Web 开发:利用 FastAPI 构建高效的服务端事件(SSE)实时数据推送
目录
- 🎉 服务端事件(SSE)概述与原理
- 📡 FastAPI 实现 SSE 数据推送
- 🔄 实时更新前端界面
- 🔧 SSE 的性能优化与并发控制
- 🔒 SSE 的安全性与认证机制
1. 🎉 服务端事件(SSE)概述与原理
服务端事件(Server-Sent Events,SSE)是一种基于 HTTP 协议的实时通信技术,允许服务器通过单向连接向客户端持续推送数据。它使用 text/event-stream 内容类型,并通过浏览器原生的 EventSource 接口接收数据。SSE 适用于实时通知、日志流、状态监控等场景。
特点:
- 建立的是单向连接,服务器推送,客户端只读;
- 使用标准 HTTP,兼容性高;
- 支持自动重连和事件标识符(Last-Event-ID);
- 相较于 WebSocket 更适合广播、节流的场景。
2. 📡 FastAPI 实现 SSE 数据推送
依赖库:sse-starlette(兼容 FastAPI)
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
app = FastAPI()
async def event_generator():
count = 0
while True:
yield {"data": f"第 {count} 条数据"}
count += 1
await asyncio.sleep(1)
@app.get("/events")
async def sse():
return EventSourceResponse(event_generator())
说明:
EventSourceResponse会自动设置响应头Content-Type: text/event-stream- 事件格式:每条消息必须以
data: 内容\n\n结尾 - 支持传递
event,id,retry字段,扩展性强
3. 🔄 实时更新前端界面
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>SSE 实时数据</title>
</head>
<body>
<h1>实时消息</h1>
<div id="messages"></div>
<script>
const evtSource = new EventSource("/events");
const container = document.getElementById("messages");
evtSource.onmessage = function(event) {
const p = document.createElement("p");
p.textContent = event.data;
container.appendChild(p);
};
</script>
</body>
</html>
4. 🔧 SSE 的性能优化与并发控制
连接管理(避免资源泄露):
from starlette.requests import Request
@app.get("/events")
async def sse(request: Request):
async def event_stream():
i = 0
while not await request.is_disconnected():
yield {"data": f"更新 {i}"}
i += 1
await asyncio.sleep(1)
print("客户端断开连接")
return EventSourceResponse(event_stream())
限流与连接控制:
from fastapi import HTTPException
MAX_CONNECTIONS = 1000
connections = set()
@app.get("/events")
async def sse(request: Request):
if len(connections) >= MAX_CONNECTIONS:
raise HTTPException(503, detail="服务器过载")
async def stream():
connections.add(request.client)
try:
i = 0
while not await request.is_disconnected():
yield {"data": f"推送 {i}"}
i += 1
await asyncio.sleep(1)
finally:
connections.remove(request.client)
return EventSourceResponse(stream())
多通道分流:
@app.get("/stocks")
async def stock_stream():
async def gen():
while True:
yield {"event": "stocks", "data": "AAPL: 189.2"}
await asyncio.sleep(2)
return EventSourceResponse(gen())
5. 🔒 SSE 的安全性与认证机制
身份认证(Token):
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_token(token: str):
return token == "valid-token"
@app.get("/events")
async def sse(token: str = Depends(oauth2_scheme)):
if not verify_token(token):
raise HTTPException(401, detail="未授权")
return EventSourceResponse(event_generator())
HTTPS 与传输加密:
- 确保服务部署在 TLS(HTTPS)下,避免中间人攻击;
- 服务器应配置跨域策略(CORS)限制来源;
- 可使用 JWT 携带身份并在服务端验证;
SSE 是后端向前端推送实时数据的理想选择,搭配 FastAPI 的异步特性,构建简单、性能优、易维护的实时系统。适合面向状态更新广播、单用户实时提示等场景,比 WebSocket 更轻量,部署更省心。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐



所有评论(0)