postgraduate-prep/experiment/code/sjf.py

156 lines
6.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
最短作业优先调度算法 (SJF/SRTF)
Shortest Job First / Shortest Remaining Time First
支持 IO 模拟
"""
from typing import List, Dict
from collections import deque
import heapq
from base import (
Process,
ProcessScheduler,
generate_random_processes,
print_processes
)
class SJFScheduler(ProcessScheduler):
"""最短作业优先调度 (SJF/SRTF)
SJF: 非抢占式,选择剩余时间最短的进程
SRTF: 抢占式,选择剩余时间最短的进程
支持 IO 模拟
"""
def __init__(self, processes: List[Process], preemptive: bool = False):
super().__init__(processes)
self.preemptive = preemptive
def schedule(self) -> Dict:
"""SJF/SRTF 调度算法 (支持IO模拟)"""
all_processes = list(self.processes)
for p in all_processes:
self.init_process(p, p.arrival_time)
events = []
for p in all_processes:
heapq.heappush(events, (p.arrival_time, 'arrival', id(p), p))
ready_queue = []
completed = []
current_time = 0
while len(completed) < len(all_processes):
while events and events[0][0] <= current_time:
event_time, event_type, uid, p = heapq.heappop(events)
if event_type == 'arrival':
p.status = self.STATUS_READY
ready_queue.append(p)
elif event_type == 'io_complete':
if p.status == self.STATUS_IO_WAIT:
p.status = self.STATUS_READY
ready_queue.append(p)
if not ready_queue:
if events:
current_time = events[0][0]
continue
else:
break
ready_queue.sort(key=lambda p: (p.remaining_cpu_time, id(p)))
if self.preemptive:
# SRTF: 一次只执行1个时间单位随时检查新到达进程
current_process = ready_queue[0]
if current_process.start_time == -1:
current_process.start_time = current_time
current_process.status = self.STATUS_RUNNING
if current_process.current_cpu_idx < len(current_process.cpu_bursts):
cpu_burst = current_process.cpu_bursts[current_process.current_cpu_idx]
exec_t = 1 # 每次只执行1单位
self.gantt_chart.append((current_time, current_process.pid, 'CPU', exec_t))
current_time += exec_t
current_process.remaining_cpu_time -= exec_t
cpu_burst -= exec_t
current_process.cpu_bursts[current_process.current_cpu_idx] = cpu_burst
if cpu_burst == 0:
ready_queue.pop(0) # CPU burst完成移出就绪队列
current_process.current_cpu_idx += 1
io_idx = current_process.current_cpu_idx - 1
if io_idx < len(current_process.io_bursts):
io_time = current_process.io_bursts[io_idx]
current_process.status = self.STATUS_IO_WAIT
heapq.heappush(events, (current_time + io_time, 'io_complete', id(current_process), current_process))
else:
current_process.status = self.STATUS_TERMINATED
current_process.completion_time = current_time
self.calculate_metrics(current_process, current_time)
completed.append(current_process)
else:
current_process.status = self.STATUS_READY
else:
ready_queue.pop(0)
current_process.status = self.STATUS_TERMINATED
current_process.completion_time = current_time
self.calculate_metrics(current_process, current_time)
completed.append(current_process)
else:
# SJF 非抢占: 一次执行完整CPU burst
current_process = ready_queue.pop(0)
if current_process.start_time == -1:
current_process.start_time = current_time
current_process.status = self.STATUS_RUNNING
if current_process.current_cpu_idx < len(current_process.cpu_bursts):
cpu_burst = current_process.cpu_bursts[current_process.current_cpu_idx]
self.gantt_chart.append((current_time, current_process.pid, 'CPU', cpu_burst))
current_time += cpu_burst
current_process.remaining_cpu_time -= cpu_burst
current_process.current_cpu_idx += 1
io_idx = current_process.current_cpu_idx - 1
if io_idx < len(current_process.io_bursts):
io_time = current_process.io_bursts[io_idx]
current_process.status = self.STATUS_IO_WAIT
heapq.heappush(events, (current_time + io_time, 'io_complete', id(current_process), current_process))
else:
current_process.status = self.STATUS_TERMINATED
current_process.completion_time = current_time
self.calculate_metrics(current_process, current_time)
completed.append(current_process)
else:
current_process.status = self.STATUS_TERMINATED
current_process.completion_time = current_time
self.calculate_metrics(current_process, current_time)
completed.append(current_process)
self.results = completed
algo_name = "SRTF (最短剩余时间优先)" if self.preemptive else "SJF (最短作业优先)"
return self.print_results(algo_name)
if __name__ == "__main__":
processes = generate_random_processes(n=5, seed=42, io_probability=0.5)
print_processes(processes, "测试数据")
print("\n--- SJF (非抢占) ---")
scheduler = SJFScheduler(processes, preemptive=False)
scheduler.schedule()