#!/usr/bin/env python3 """ 多级反馈队列调度算法 (MLFQ) 支持 IO 模拟 """ from typing import List, Dict from collections import deque import heapq from base import ( Process, ProcessScheduler, generate_random_processes, print_processes ) class MLFQScheduler(ProcessScheduler): """多级反馈队列调度""" def __init__(self, processes: List[Process], num_queues: int = 3, base_time_slice: int = 4): super().__init__(processes) self.num_queues = num_queues self.base_time_slice = base_time_slice def get_time_slice(self, queue_idx: int) -> int: return self.base_time_slice * (2 ** queue_idx) def schedule(self) -> Dict: """MLFQ 调度算法 (支持IO模拟)""" all_processes = list(self.processes) n = len(all_processes) for p in all_processes: self.init_process(p, p.arrival_time) p.queue_idx = 0 # 事件: (时间, 类型, uid, 进程) events = [] for p in all_processes: heapq.heappush(events, (p.arrival_time, 'arrival', id(p), p)) queues = [deque() for _ in range(self.num_queues)] completed = [] io_waiting = {} # {pid: IO完成时间} process_map = {p.pid: p for p in all_processes} # pid -> Process 映射 current_time = 0 max_iterations = 10000 # 防止死循环 iterations = 0 while len(completed) < n and iterations < max_iterations: iterations += 1 # 1. 处理到达事件 while events and events[0][0] <= current_time: _, etype, _, p = heapq.heappop(events) if etype == 'arrival': p.status = self.STATUS_READY queues[0].append(p) elif etype == 'io_complete': io_waiting.pop(p.pid, None) p.status = self.STATUS_READY queues[p.queue_idx].append(p) # 2. 检查 IO 完成 (兜底,防止事件队列遗漏) completed_io = [] for pid, io_end_time in list(io_waiting.items()): if io_end_time <= current_time: completed_io.append(pid) for pid in completed_io: io_waiting.pop(pid, None) proc = process_map[pid] proc.status = self.STATUS_READY queues[proc.queue_idx].append(proc) # 3. 找最高优先级非空队列 q_idx = -1 for i in range(self.num_queues): if queues[i]: q_idx = i break if q_idx == -1: # 所有队列为空,推进时间 next_events = [e[0] for e in events if e[0] > current_time] next_io = [t for t in io_waiting.values() if t > current_time] next_times = next_events + next_io if next_times: current_time = min(next_times) continue # 4. 取进程执行 p = queues[q_idx].popleft() if p.start_time == -1: p.start_time = current_time p.status = self.STATUS_RUNNING ts = self.get_time_slice(q_idx) if p.current_cpu_idx < len(p.cpu_bursts): cpu_burst = p.cpu_bursts[p.current_cpu_idx] exec_t = min(ts, cpu_burst) self.gantt_chart.append((current_time, p.pid, 'CPU', exec_t)) current_time += exec_t p.remaining_cpu_time -= exec_t if exec_t >= cpu_burst: p.current_cpu_idx += 1 io_idx = p.current_cpu_idx - 1 if io_idx < len(p.io_bursts): io_t = p.io_bursts[io_idx] p.status = self.STATUS_IO_WAIT io_waiting[p.pid] = current_time + io_t heapq.heappush(events, (current_time + io_t, 'io_complete', id(p), p)) else: p.status = self.STATUS_TERMINATED p.completion_time = current_time self.calculate_metrics(p, current_time) completed.append(p) else: # 时间片用完,更新剩余 burst p.cpu_bursts[p.current_cpu_idx] = cpu_burst - exec_t p.status = self.STATUS_READY nq = min(q_idx + 1, self.num_queues - 1) p.queue_idx = nq queues[nq].append(p) else: p.status = self.STATUS_TERMINATED p.completion_time = current_time self.calculate_metrics(p, current_time) completed.append(p) self.results = completed return self.print_results("MLFQ (多级反馈队列)") if __name__ == "__main__": processes = generate_random_processes(n=5, seed=42, io_probability=0.5) print_processes(processes, "测试数据") scheduler = MLFQScheduler(processes) scheduler.schedule()