postgraduate-prep/experiment/code/rr.py

132 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""
时间片轮转调度算法 (RR)
支持 IO 模拟
"""
from typing import List, Dict
from collections import deque
import heapq
from base import (
Process,
ProcessScheduler,
generate_random_processes,
print_processes
)
class RoundRobinScheduler(ProcessScheduler):
"""时间片轮转调度"""
def __init__(self, processes: List[Process], time_slice: int = 4):
super().__init__(processes)
self.time_slice = time_slice
def schedule(self) -> Dict:
"""RR 调度算法 (支持IO模拟)"""
all_processes = list(self.processes)
n = len(all_processes)
for p in all_processes:
self.init_process(p, p.arrival_time)
events = []
for p in all_processes:
heapq.heappush(events, (p.arrival_time, 'arrival', id(p), p))
ready_queue = deque()
completed = []
io_waiting = {} # {pid: IO完成时间}
process_map = {p.pid: p for p in all_processes} # pid -> Process 映射
current_time = 0
max_iterations = 10000
iterations = 0
while len(completed) < n and iterations < max_iterations:
iterations += 1
# 1. 处理到达事件
while events and events[0][0] <= current_time:
_, etype, _, p = heapq.heappop(events)
if etype == 'arrival':
p.status = self.STATUS_READY
ready_queue.append(p)
elif etype == 'io_complete':
io_waiting.pop(p.pid, None)
p.status = self.STATUS_READY
ready_queue.append(p)
# 2. 检查 IO 完成 (兜底,防止事件队列遗漏)
completed_io = []
for pid, io_end_time in list(io_waiting.items()):
if io_end_time <= current_time:
completed_io.append(pid)
for pid in completed_io:
io_waiting.pop(pid, None)
proc = process_map[pid]
proc.status = self.STATUS_READY
ready_queue.append(proc)
if not ready_queue:
# 队列为空,推进时间
next_events = [e[0] for e in events if e[0] > current_time]
next_io = [t for t in io_waiting.values() if t > current_time]
next_times = next_events + next_io
if next_times:
current_time = min(next_times)
continue
# 3. 取进程执行
p = ready_queue.popleft()
if p.start_time == -1:
p.start_time = current_time
p.status = self.STATUS_RUNNING
if p.current_cpu_idx < len(p.cpu_bursts):
cpu_burst = p.cpu_bursts[p.current_cpu_idx]
exec_t = min(self.time_slice, cpu_burst)
self.gantt_chart.append((current_time, p.pid, 'CPU', exec_t))
current_time += exec_t
p.remaining_cpu_time -= exec_t
if exec_t >= cpu_burst:
# CPU burst 完成
p.current_cpu_idx += 1
io_idx = p.current_cpu_idx - 1
if io_idx < len(p.io_bursts):
io_t = p.io_bursts[io_idx]
p.status = self.STATUS_IO_WAIT
io_waiting[p.pid] = current_time + io_t
heapq.heappush(events, (current_time + io_t, 'io_complete', id(p), p))
else:
p.status = self.STATUS_TERMINATED
p.completion_time = current_time
self.calculate_metrics(p, current_time)
completed.append(p)
else:
# 时间片用完,更新剩余 burst
p.cpu_bursts[p.current_cpu_idx] = cpu_burst - exec_t
p.status = self.STATUS_READY
ready_queue.append(p)
else:
p.status = self.STATUS_TERMINATED
p.completion_time = current_time
self.calculate_metrics(p, current_time)
completed.append(p)
self.results = completed
return self.print_results(f"RR (时间片轮转, q={self.time_slice})")
if __name__ == "__main__":
processes = generate_random_processes(n=5, seed=42, io_probability=0.5)
print_processes(processes, "测试数据")
scheduler = RoundRobinScheduler(processes, time_slice=4)
scheduler.schedule()