perf: waiting_queue 改用 deque,pull_candidates 从 O(n²) 降到 O(1)

- list.pop(0) 每次左移全部元素,改 deque.popleft() 指针操作
- return_to_waiting 从 slice 整体复制改 appendleft 逐个插入
- 热路径 refill 阶段不再卡顿
This commit is contained in:
ViperEkura 2026-05-14 21:38:00 +08:00
parent e3382f6bb5
commit 513f1f7826
1 changed files with 9 additions and 5 deletions

View File

@ -2,8 +2,9 @@ import logging
import threading import threading
import time import time
import uuid import uuid
from collections import deque
from enum import Enum from enum import Enum
from typing import Any, Callable, Dict, List, Optional from typing import Any, Callable, Deque, Dict, List, Optional
from astrai.tokenize.tokenizer import AutoTokenizer from astrai.tokenize.tokenizer import AutoTokenizer
@ -76,7 +77,7 @@ class TaskManager:
self.max_seq_len = max_seq_len self.max_seq_len = max_seq_len
self.max_prompt_len = max_prompt_len self.max_prompt_len = max_prompt_len
self.waiting_queue: List[Task] = [] self.waiting_queue: Deque[Task] = deque()
self.active_tasks: List[Task] = [] self.active_tasks: List[Task] = []
self._task_event = threading.Event() self._task_event = threading.Event()
@ -129,7 +130,9 @@ class TaskManager:
def remove_task(self, task_id: str) -> List[Task]: def remove_task(self, task_id: str) -> List[Task]:
with self._lock: with self._lock:
removed_active = [t for t in self.active_tasks if t.task_id == task_id] removed_active = [t for t in self.active_tasks if t.task_id == task_id]
self.waiting_queue = [t for t in self.waiting_queue if t.task_id != task_id] self.waiting_queue = deque(
t for t in self.waiting_queue if t.task_id != task_id
)
self.active_tasks = [t for t in self.active_tasks if t.task_id != task_id] self.active_tasks = [t for t in self.active_tasks if t.task_id != task_id]
return removed_active return removed_active
@ -166,7 +169,7 @@ class TaskManager:
with self._lock: with self._lock:
take = min(n, len(self.waiting_queue)) take = min(n, len(self.waiting_queue))
for _ in range(take): for _ in range(take):
to_add.append(self.waiting_queue.pop(0)) to_add.append(self.waiting_queue.popleft())
return to_add return to_add
def activate(self, task: Task) -> None: def activate(self, task: Task) -> None:
@ -176,7 +179,8 @@ class TaskManager:
def return_to_waiting(self, tasks: List[Task]) -> None: def return_to_waiting(self, tasks: List[Task]) -> None:
with self._lock: with self._lock:
self.waiting_queue[:0] = tasks for task in reversed(tasks):
self.waiting_queue.appendleft(task)
def has_work(self) -> bool: def has_work(self) -> bool:
return bool(self.active_tasks or self.waiting_queue) return bool(self.active_tasks or self.waiting_queue)