#!/usr/bin/env python3 """ 优先级调度算法 (Priority) 支持 IO 模拟 """ from typing import List, Dict import heapq from base import ( Process, ProcessScheduler, generate_random_processes, print_processes ) class PriorityScheduler(ProcessScheduler): """优先级调度 (数值越小优先级越高) 支持 IO 模拟 """ def __init__(self, processes: List[Process], preemptive: bool = False): super().__init__(processes) self.preemptive = preemptive def schedule(self) -> Dict: """优先级调度算法 (支持IO模拟)""" all_processes = list(self.processes) for p in all_processes: self.init_process(p, p.arrival_time) events = [] for p in all_processes: heapq.heappush(events, (p.arrival_time, 'arrival', id(p), p)) ready_queue = [] # (priority, pid, process) completed = [] current_time = 0 while len(completed) < len(all_processes): while events and events[0][0] <= current_time: event_time, event_type, uid, p = heapq.heappop(events) if event_type == 'arrival': p.status = self.STATUS_READY heapq.heappush(ready_queue, (p.priority, id(p), p)) elif event_type == 'io_complete': if p.status == self.STATUS_IO_WAIT: p.status = self.STATUS_READY heapq.heappush(ready_queue, (p.priority, id(p), p)) if not ready_queue: if events: current_time = events[0][0] continue else: break _, _, current_process = heapq.heappop(ready_queue) if current_process.start_time == -1: current_process.start_time = current_time current_process.status = self.STATUS_RUNNING if current_process.current_cpu_idx < len(current_process.cpu_bursts): cpu_burst = current_process.cpu_bursts[current_process.current_cpu_idx] self.gantt_chart.append((current_time, current_process.pid, 'CPU', cpu_burst)) current_time += cpu_burst current_process.remaining_cpu_time -= cpu_burst current_process.current_cpu_idx += 1 io_idx = current_process.current_cpu_idx - 1 if io_idx < len(current_process.io_bursts): io_time = current_process.io_bursts[io_idx] current_process.status = self.STATUS_IO_WAIT heapq.heappush(events, (current_time + io_time, 'io_complete', id(current_process), current_process)) else: current_process.status = self.STATUS_TERMINATED current_process.completion_time = current_time self.calculate_metrics(current_process, current_time) completed.append(current_process) else: current_process.status = self.STATUS_TERMINATED current_process.completion_time = current_time self.calculate_metrics(current_process, current_time) completed.append(current_process) self.results = completed return self.print_results("Priority (优先级调度)") if __name__ == "__main__": processes = generate_random_processes(n=5, seed=42, io_probability=0.5) print_processes(processes, "测试数据") scheduler = PriorityScheduler(processes) scheduler.schedule()