Luxx/dashboard/src/utils/streamManager.js

218 lines
5.7 KiB
JavaScript

/**
* StreamManager - 管理多个并发 SSE 流
*
* 功能:
* 1. 同时管理多个会话的流式请求
* 2. 支持取消、重试等操作
* 3. 与 Pinia store 集成
*/
import { useStreamStore } from './streamStore.js'
class StreamManager {
constructor() {
// 存储所有活跃的流:{ conversationId: { abort, promise } }
this.activeStreams = {}
// SSE 解码器
this.decoder = new TextDecoder()
}
/**
* 启动一个新的流
* @param {string} conversationId - 会话 ID
* @param {object} data - 请求数据
* @param {string} userMessageId - 用户消息 ID
*/
async startStream(conversationId, data, userMessageId) {
const streamStore = useStreamStore()
// 如果该会话已有活跃流,先取消
if (this.activeStreams[conversationId]) {
this.cancelStream(conversationId)
}
const controller = new AbortController()
this.activeStreams[conversationId] = { controller }
// 初始化 store 中的流状态
streamStore.initStream(conversationId, userMessageId)
const promise = this._executeStream(conversationId, data, controller.signal)
this.activeStreams[conversationId].promise = promise
return promise
}
/**
* 执行 SSE 流
*/
async _executeStream(conversationId, data, signal) {
const streamStore = useStreamStore()
const token = localStorage.getItem('access_token')
try {
const res = await fetch('/api/messages/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
conversation_id: data.conversation_id,
content: data.content,
thinking_enabled: data.thinking_enabled || false,
enabled_tools: data.enabled_tools || []
}),
signal
})
if (!res.ok) {
const err = await res.json().catch(() => ({}))
throw new Error(err.message || `HTTP ${res.status}`)
}
const reader = res.body.getReader()
let buffer = ''
let completed = false
while (true) {
const { done, value } = await reader.read()
if (value) {
buffer += this.decoder.decode(value, { stream: true })
}
if (done) {
// 处理 buffer 中剩余的数据
this._processBuffer(conversationId, buffer, streamStore, () => {
completed = true
})
if (!completed) {
streamStore.errorStream(conversationId, 'stream ended without done event')
}
break
}
const lines = buffer.split('\n')
buffer = lines.pop() || ''
this._processLines(conversationId, lines, streamStore)
}
// 流结束但没有收到 done 事件
if (!completed) {
streamStore.errorStream(conversationId, 'stream ended unexpectedly')
}
} catch (e) {
if (e.name !== 'AbortError') {
console.error('Stream error:', e)
streamStore.errorStream(conversationId, e.message)
}
} finally {
// 清理活跃流记录
delete this.activeStreams[conversationId]
}
}
/**
* 处理缓冲区
*/
_processBuffer(conversationId, buffer, streamStore, onComplete) {
const lines = buffer.split('\n')
let currentEvent = ''
for (const line of lines) {
if (line.startsWith('event: ')) {
currentEvent = line.slice(7).trim()
} else if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6))
this._handleEvent(conversationId, currentEvent, data, streamStore, onComplete)
} catch (e) {
console.error('SSE parse error:', e, 'line:', line)
}
}
}
}
/**
* 处理行
*/
_processLines(conversationId, lines, streamStore) {
let currentEvent = ''
for (const line of lines) {
if (line.startsWith('event: ')) {
currentEvent = line.slice(7).trim()
} else if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6))
this._handleEvent(conversationId, currentEvent, data, streamStore, null)
} catch (e) {
// 忽略解析错误
}
}
}
}
/**
* 处理事件
*/
_handleEvent(conversationId, eventType, data, streamStore, onComplete) {
switch (eventType) {
case 'process_step':
streamStore.updateStep(conversationId, data.step)
break
case 'done':
streamStore.completeStream(conversationId, data)
if (onComplete) onComplete()
break
case 'error':
streamStore.errorStream(conversationId, data.content)
if (onComplete) onComplete()
break
}
}
/**
* 取消指定会话的流
*/
cancelStream(conversationId) {
const stream = this.activeStreams[conversationId]
if (stream) {
stream.controller.abort()
delete this.activeStreams[conversationId]
}
// 清除 store 中的流状态
const streamStore = useStreamStore()
streamStore.clearStream(conversationId)
}
/**
* 取消所有流
*/
cancelAll() {
Object.keys(this.activeStreams).forEach(conversationId => {
this.cancelStream(conversationId)
})
}
/**
* 检查指定会话是否有活跃流
*/
hasActiveStream(conversationId) {
return !!this.activeStreams[conversationId]
}
/**
* 获取活跃流数量
*/
getActiveCount() {
return Object.keys(this.activeStreams).length
}
}
// 导出单例
export const streamManager = new StreamManager()
export default streamManager