/** * StreamManager - 管理多个并发 SSE 流 * * 功能: * 1. 同时管理多个会话的流式请求 * 2. 支持取消、重试等操作 * 3. 与 Pinia store 集成 */ import { useStreamStore } from './streamStore.js' class StreamManager { constructor() { // 存储所有活跃的流:{ conversationId: { abort, promise } } this.activeStreams = {} // SSE 解码器 this.decoder = new TextDecoder() } /** * 启动一个新的流 * @param {string} conversationId - 会话 ID * @param {object} data - 请求数据 * @param {string} userMessageId - 用户消息 ID */ async startStream(conversationId, data, userMessageId) { const streamStore = useStreamStore() // 如果该会话已有活跃流,先取消 if (this.activeStreams[conversationId]) { this.cancelStream(conversationId) } const controller = new AbortController() this.activeStreams[conversationId] = { controller } // 初始化 store 中的流状态 streamStore.initStream(conversationId, userMessageId) const promise = this._executeStream(conversationId, data, controller.signal) this.activeStreams[conversationId].promise = promise return promise } /** * 执行 SSE 流 */ async _executeStream(conversationId, data, signal) { const streamStore = useStreamStore() const token = localStorage.getItem('access_token') try { const res = await fetch('/api/messages/stream', { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${token}` }, body: JSON.stringify({ conversation_id: data.conversation_id, content: data.content, thinking_enabled: data.thinking_enabled || false, enabled_tools: data.enabled_tools || [] }), signal }) if (!res.ok) { const err = await res.json().catch(() => ({})) throw new Error(err.message || `HTTP ${res.status}`) } const reader = res.body.getReader() let buffer = '' let completed = false while (true) { const { done, value } = await reader.read() if (value) { buffer += this.decoder.decode(value, { stream: true }) } if (done) { // 处理 buffer 中剩余的数据 this._processBuffer(conversationId, buffer, streamStore, () => { completed = true }) if (!completed) { streamStore.errorStream(conversationId, 'stream ended without done event') } break } const lines = buffer.split('\n') buffer = lines.pop() || '' this._processLines(conversationId, lines, streamStore) } // 流结束但没有收到 done 事件 if (!completed) { streamStore.errorStream(conversationId, 'stream ended unexpectedly') } } catch (e) { if (e.name !== 'AbortError') { console.error('Stream error:', e) streamStore.errorStream(conversationId, e.message) } } finally { // 清理活跃流记录 delete this.activeStreams[conversationId] } } /** * 处理缓冲区 */ _processBuffer(conversationId, buffer, streamStore, onComplete) { const lines = buffer.split('\n') let currentEvent = '' for (const line of lines) { if (line.startsWith('event: ')) { currentEvent = line.slice(7).trim() } else if (line.startsWith('data: ')) { try { const data = JSON.parse(line.slice(6)) this._handleEvent(conversationId, currentEvent, data, streamStore, onComplete) } catch (e) { console.error('SSE parse error:', e, 'line:', line) } } } } /** * 处理行 */ _processLines(conversationId, lines, streamStore) { let currentEvent = '' for (const line of lines) { if (line.startsWith('event: ')) { currentEvent = line.slice(7).trim() } else if (line.startsWith('data: ')) { try { const data = JSON.parse(line.slice(6)) this._handleEvent(conversationId, currentEvent, data, streamStore, null) } catch (e) { // 忽略解析错误 } } } } /** * 处理事件 */ _handleEvent(conversationId, eventType, data, streamStore, onComplete) { switch (eventType) { case 'process_step': streamStore.updateStep(conversationId, data.step) break case 'done': streamStore.completeStream(conversationId, data) if (onComplete) onComplete() break case 'error': streamStore.errorStream(conversationId, data.content) if (onComplete) onComplete() break } } /** * 取消指定会话的流 */ cancelStream(conversationId) { const stream = this.activeStreams[conversationId] if (stream) { stream.controller.abort() delete this.activeStreams[conversationId] } // 清除 store 中的流状态 const streamStore = useStreamStore() streamStore.clearStream(conversationId) } /** * 取消所有流 */ cancelAll() { Object.keys(this.activeStreams).forEach(conversationId => { this.cancelStream(conversationId) }) } /** * 检查指定会话是否有活跃流 */ hasActiveStream(conversationId) { return !!this.activeStreams[conversationId] } /** * 获取活跃流数量 */ getActiveCount() { return Object.keys(this.activeStreams).length } } // 导出单例 export const streamManager = new StreamManager() export default streamManager