关闭EventSource:手动停止大模型流式输出的有效方法

在Web应用中,当使用EventSource接收大模型的流式输出时,关闭EventSource连接是最直接有效的手动停止方式。这种方法适用于大多数基于Server-Sent Events (SSE)的流式API。
在这里插入图片描述

核心实现代码

基础实现示例

class StreamController {
    constructor() {
        this.eventSource = null;
        this.isStreaming = false;
    }

    // 开始流式请求
    startStreaming(prompt) {
        if (this.isStreaming) {
            this.stopStreaming(); // 先停止现有的流
        }

        this.isStreaming = true;
        
        // 构建请求URL
        const url = new URL('/api/chat/stream', window.location.origin);
        url.searchParams.append('message', prompt);

        // 创建EventSource连接
        this.eventSource = new EventSource(url);

        // 处理消息接收
        this.eventSource.onmessage = (event) => {
            const data = JSON.parse(event.data);
            
            if (data.type === 'token') {
                this.appendToken(data.content);
            } else if (data.type === 'finished') {
                this.handleStreamEnd();
            } else if (data.type === 'error') {
                this.handleError(data.error);
            }
        };

        // 处理错误
        this.eventSource.onerror = (error) => {
            console.error('EventSource error:', error);
            this.handleStreamEnd();
        };
    }

    // 手动停止流式输出
    stopStreaming() {
        if (this.eventSource) {
            this.eventSource.close(); // 关键:关闭EventSource连接
            this.eventSource = null;
        }
        this.isStreaming = false;
        this.onStreamStopped(); // 触发停止回调
    }

    // 页面卸载时自动清理
    setupAutoCleanup() {
        window.addEventListener('beforeunload', () => {
            this.stopStreaming();
        });
    }

    // 其他辅助方法
    appendToken(token) {
        // 将token添加到UI
        const outputElement = document.getElementById('ai-output');
        outputElement.textContent += token;
    }

    handleStreamEnd() {
        this.isStreaming = false;
        this.eventSource = null;
    }

    onStreamStopped() {
        // 用户可以重写这个方法来自定义停止后的行为
        console.log('流式输出已手动停止');
    }
}

在React组件中的使用

import React, { useState, useRef, useEffect } from 'react';

const DeepSeekChat = () => {
    const [message, setMessage] = useState('');
    const [response, setResponse] = useState('');
    const [isLoading, setIsLoading] = useState(false);
    const eventSourceRef = useRef(null);

    const startStreaming = async () => {
        if (isLoading) {
            stopStreaming();
            return;
        }

        setIsLoading(true);
        setResponse('');

        try {
            // 创建EventSource连接
            eventSourceRef.current = new EventSource(
                `/api/chat/stream?message=${encodeURIComponent(message)}`
            );

            eventSourceRef.current.onmessage = (event) => {
                const data = JSON.parse(event.data);
                
                switch (data.type) {
                    case 'token':
                        setResponse(prev => prev + data.content);
                        break;
                    case 'finished':
                        stopStreaming();
                        break;
                    case 'error':
                        console.error('Stream error:', data.error);
                        stopStreaming();
                        break;
                }
            };

            eventSourceRef.current.onerror = (error) => {
                console.error('EventSource error:', error);
                stopStreaming();
            };

        } catch (error) {
            console.error('Failed to start streaming:', error);
            setIsLoading(false);
        }
    };

    const stopStreaming = () => {
        if (eventSourceRef.current) {
            eventSourceRef.current.close(); // 关闭连接
            eventSourceRef.current = null;
        }
        setIsLoading(false);
    };

    // 组件卸载时自动清理
    useEffect(() => {
        return () => {
            stopStreaming();
        };
    }, []);

    const handleSubmit = (e) => {
        e.preventDefault();
        startStreaming();
    };

    return (
        <div className="chat-container">
            <form onSubmit={handleSubmit}>
                <input
                    type="text"
                    value={message}
                    onChange={(e) => setMessage(e.target.value)}
                    placeholder="输入你的问题..."
                />
                <button type="submit" disabled={isLoading}>
                    {isLoading ? '生成中...' : '发送'}
                </button>
                {isLoading && (
                    <button type="button" onClick={stopStreaming}>
                        停止生成
                    </button>
                )}
            </form>
            <div className="response-area">
                {response}
            </div>
        </div>
    );
};

export default DeepSeekChat;

更完整的AbortController方案

对于需要更精细控制的场景,可以结合AbortController:

class AdvancedStreamController {
    constructor() {
        this.eventSource = null;
        this.abortController = null;
        this.isStreaming = false;
    }

    async startStreaming(prompt) {
        // 停止现有的流
        await this.stopStreaming();

        this.isStreaming = true;
        this.abortController = new AbortController();

        try {
            const response = await fetch('/api/chat/stream', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                },
                body: JSON.stringify({ message: prompt }),
                signal: this.abortController.signal
            });

            if (!response.ok) {
                throw new Error(`HTTP error! status: ${response.status}`);
            }

            const reader = response.body.getReader();
            const decoder = new TextDecoder();

            while (true) {
                const { done, value } = await reader.read();
                
                if (done) {
                    this.handleStreamEnd();
                    break;
                }

                const chunk = decoder.decode(value);
                const lines = chunk.split('\n');

                for (const line of lines) {
                    if (line.startsWith('data: ')) {
                        const data = JSON.parse(line.slice(6));
                        this.processData(data);
                    }
                }
            }

        } catch (error) {
            if (error.name === 'AbortError') {
                console.log('Stream was manually aborted');
            } else {
                console.error('Stream error:', error);
            }
            this.handleStreamEnd();
        }
    }

    stopStreaming() {
        if (this.abortController) {
            this.abortController.abort(); // 中止fetch请求
            this.abortController = null;
        }
        
        if (this.eventSource) {
            this.eventSource.close(); // 关闭EventSource
            this.eventSource = null;
        }
        
        this.isStreaming = false;
    }

    processData(data) {
        // 处理接收到的数据
        switch (data.type) {
            case 'token':
                this.onToken(data.content);
                break;
            case 'finished':
                this.handleStreamEnd();
                break;
        }
    }

    onToken(token) {
        // 处理每个token
        const outputElement = document.getElementById('output');
        outputElement.textContent += token;
    }

    handleStreamEnd() {
        this.isStreaming = false;
        this.abortController = null;
        this.eventSource = null;
    }
}

关键要点说明

1. 立即停止网络连接

// 关闭EventSource
eventSource.close();

// 或者中止fetch请求
abortController.abort();

这两种方式都会立即停止与服务器的连接,阻止后续数据的接收。

2. 资源清理

  • 将EventSource引用设为null,避免内存泄漏
  • 重置状态标志(如isStreaming
  • 移除事件监听器(如果需要)

3. 用户体验优化

// 提供视觉反馈
function showStopFeedback() {
    const stopIndicator = document.createElement('div');
    stopIndicator.textContent = '已停止生成';
    stopIndicator.className = 'stop-indicator';
    document.body.appendChild(stopIndicator);
    
    setTimeout(() => {
        stopIndicator.remove();
    }, 2000);
}

4. 错误处理

eventSource.onerror = (error) => {
    if (eventSource.readyState === EventSource.CLOSED) {
        console.log('Connection was closed manually');
    } else {
        console.error('Unexpected error:', error);
    }
    this.handleStreamEnd();
};

实际应用建议

  1. 在UI中提供明显的停止按钮
  2. 支持键盘快捷键(如ESC键)
  3. 在页面跳转或关闭前自动清理
  4. 考虑添加确认对话框(对于重要操作)

通过正确关闭EventSource连接,你可以有效地手动停止大模型的流式输出,既节省了计算资源,又提供了更好的用户体验。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐