#!/usr/bin/env python3 """ 优先级调度算法 (Priority Scheduling) """ from typing import List, Dict from base import ( Process, ProcessScheduler, generate_random_processes, print_processes ) class PriorityScheduler(ProcessScheduler): """优先级调度""" def __init__(self, processes: List[Process], preemptive: bool = False): super().__init__(processes) self.preemptive = preemptive def schedule(self) -> Dict: """优先级调度算法""" if self.preemptive: return self.schedule_preemptive() return self.schedule_non_preemptive() def schedule_non_preemptive(self) -> Dict: """非抢占式优先级调度""" processes = sorted(self.processes, key=lambda p: p.arrival_time) ready_queue: List[Process] = [] current_time = 0 completed = 0 while completed < len(processes): for p in processes: if p.arrival_time <= current_time and p not in ready_queue and p not in self.results: ready_queue.append(p) if ready_queue: ready_queue.sort(key=lambda p: p.priority) p = ready_queue.pop(0) p.start_time = current_time current_time += p.burst_time self.calculate_metrics(p, current_time) self.results.append(p) completed += 1 else: current_time += 1 return self.print_results("优先级调度 (非抢占)") def schedule_preemptive(self) -> Dict: """抢占式优先级调度""" processes = sorted(self.processes, key=lambda p: p.arrival_time) ready_queue: List[Process] = [] current_time = 0 completed = 0 current_process = None while completed < len(processes): for p in processes: if p.arrival_time <= current_time and p not in ready_queue and p not in self.results: ready_queue.append(p) if ready_queue: ready_queue.sort(key=lambda p: p.priority) if current_process is None or current_process.priority > ready_queue[0].priority: if current_process and current_process.remaining_time > 0: ready_queue.append(current_process) current_process = ready_queue.pop(0) if current_process.start_time == -1: current_process.start_time = current_time current_process.remaining_time -= 1 current_time += 1 if current_process.remaining_time == 0: self.calculate_metrics(current_process, current_time) self.results.append(current_process) completed += 1 current_process = None else: current_time += 1 return self.print_results("优先级调度 (抢占)") if __name__ == "__main__": processes = generate_random_processes(n=5, seed=42) print_processes(processes, "测试数据") print("\n" + "="*60) print("非抢占式优先级调度") print("="*60) scheduler1 = PriorityScheduler(processes, preemptive=False) scheduler1.schedule() print("\n" + "="*60) print("抢占式优先级调度") print("="*60) scheduler2 = PriorityScheduler(processes, preemptive=True) scheduler2.schedule()