postgraduate-prep/experiment/code/priority.py

148 lines
6.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
优先级调度算法 (Priority)
支持 IO 模拟
"""
from typing import List, Dict
import heapq
from base import (
Process,
ProcessScheduler,
generate_random_processes,
print_processes
)
class PriorityScheduler(ProcessScheduler):
"""优先级调度 (数值越小优先级越高)
支持 IO 模拟
"""
def __init__(self, processes: List[Process], preemptive: bool = False):
super().__init__(processes)
self.preemptive = preemptive
def schedule(self) -> Dict:
"""优先级调度算法 (支持IO模拟)"""
all_processes = list(self.processes)
for p in all_processes:
self.init_process(p, p.arrival_time)
events = []
for p in all_processes:
heapq.heappush(events, (p.arrival_time, 'arrival', id(p), p))
ready_queue = [] # (priority, id, process)
completed = []
current_time = 0
while len(completed) < len(all_processes):
while events and events[0][0] <= current_time:
event_time, event_type, uid, p = heapq.heappop(events)
if event_type == 'arrival':
p.status = self.STATUS_READY
heapq.heappush(ready_queue, (p.priority, id(p), p))
elif event_type == 'io_complete':
if p.status == self.STATUS_IO_WAIT:
p.status = self.STATUS_READY
heapq.heappush(ready_queue, (p.priority, id(p), p))
if not ready_queue:
if events:
current_time = events[0][0]
continue
else:
break
if self.preemptive:
# 抢占式: 一次只执行1个时间单位随时检查更高优先级的到达进程
_, _, current_process = ready_queue[0]
if current_process.start_time == -1:
current_process.start_time = current_time
current_process.status = self.STATUS_RUNNING
if current_process.current_cpu_idx < len(current_process.cpu_bursts):
cpu_burst = current_process.cpu_bursts[current_process.current_cpu_idx]
exec_t = 1
self.gantt_chart.append((current_time, current_process.pid, 'CPU', exec_t))
current_time += exec_t
current_process.remaining_cpu_time -= exec_t
cpu_burst -= exec_t
current_process.cpu_bursts[current_process.current_cpu_idx] = cpu_burst
if cpu_burst == 0:
heapq.heappop(ready_queue) # 从就绪队列移除
current_process.current_cpu_idx += 1
io_idx = current_process.current_cpu_idx - 1
if io_idx < len(current_process.io_bursts):
io_time = current_process.io_bursts[io_idx]
current_process.status = self.STATUS_IO_WAIT
heapq.heappush(events, (current_time + io_time, 'io_complete', id(current_process), current_process))
else:
current_process.status = self.STATUS_TERMINATED
current_process.completion_time = current_time
self.calculate_metrics(current_process, current_time)
completed.append(current_process)
else:
current_process.status = self.STATUS_READY
else:
heapq.heappop(ready_queue)
current_process.status = self.STATUS_TERMINATED
current_process.completion_time = current_time
self.calculate_metrics(current_process, current_time)
completed.append(current_process)
else:
# 非抢占式: 一次执行完整CPU burst
_, _, current_process = heapq.heappop(ready_queue)
if current_process.start_time == -1:
current_process.start_time = current_time
current_process.status = self.STATUS_RUNNING
if current_process.current_cpu_idx < len(current_process.cpu_bursts):
cpu_burst = current_process.cpu_bursts[current_process.current_cpu_idx]
self.gantt_chart.append((current_time, current_process.pid, 'CPU', cpu_burst))
current_time += cpu_burst
current_process.remaining_cpu_time -= cpu_burst
current_process.current_cpu_idx += 1
io_idx = current_process.current_cpu_idx - 1
if io_idx < len(current_process.io_bursts):
io_time = current_process.io_bursts[io_idx]
current_process.status = self.STATUS_IO_WAIT
heapq.heappush(events, (current_time + io_time, 'io_complete', id(current_process), current_process))
else:
current_process.status = self.STATUS_TERMINATED
current_process.completion_time = current_time
self.calculate_metrics(current_process, current_time)
completed.append(current_process)
else:
current_process.status = self.STATUS_TERMINATED
current_process.completion_time = current_time
self.calculate_metrics(current_process, current_time)
completed.append(current_process)
self.results = completed
algo_name = "Priority (抢占式)" if self.preemptive else "Priority (非抢占式)"
return self.print_results(algo_name)
if __name__ == "__main__":
processes = generate_random_processes(n=5, seed=42, io_probability=0.5)
print_processes(processes, "测试数据")
scheduler = PriorityScheduler(processes)
scheduler.schedule()