#!/usr/bin/env python3 """ 时间片轮转调度算法 (RR) Round Robin """ from collections import deque from typing import List, Dict from base import ( Process, ProcessScheduler, generate_random_processes, print_processes ) class RoundRobinScheduler(ProcessScheduler): """时间片轮转调度 (Round Robin)""" def __init__(self, processes: List[Process], time_slice: int = 4): super().__init__(processes) self.time_slice = time_slice def schedule(self) -> Dict: """RR 调度算法""" processes = sorted(self.processes, key=lambda p: p.arrival_time) ready_queue = deque() current_time = 0 completed = 0 index = 0 while completed < len(processes): while index < len(processes) and processes[index].arrival_time <= current_time: ready_queue.append(processes[index]) index += 1 if ready_queue: p = ready_queue.popleft() if p.start_time == -1: p.start_time = current_time exec_time = min(self.time_slice, p.remaining_time) p.remaining_time -= exec_time current_time += exec_time while index < len(processes) and processes[index].arrival_time <= current_time: ready_queue.append(processes[index]) index += 1 if p.remaining_time > 0: ready_queue.append(p) else: self.calculate_metrics(p, current_time) self.results.append(p) completed += 1 else: current_time += 1 return self.print_results(f"RR (时间片轮转, time_slice={self.time_slice})") if __name__ == "__main__": processes = generate_random_processes(n=5, seed=42) print_processes(processes, "测试数据") for ts in [2, 4, 8]: print("\n" + "="*60) print(f"时间片轮转 (time_slice={ts})") print("="*60) scheduler = RoundRobinScheduler(processes, time_slice=ts) scheduler.schedule()