postgraduate-prep/experiment/code/base.py

450 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
进程调度算法模块 - 公共基础类
"""
import random
from dataclasses import dataclass, field
from typing import List, Dict, Optional
@dataclass
class Process:
"""进程数据结构
新增 IO 模拟字段:
cpu_bursts: CPU 计算时间段列表 (交替: CPU->IO->CPU->IO...)
io_bursts: IO 操作时间段列表
io_wait: 进程在 IO 设备上等待的时间 (阻塞时间)
示例:
cpu_bursts = [5, 3, 2] # 3段CPU计算时间
io_bursts = [2, 1] # 2次IO操作
进程执行顺序: CPU(5) -> IO(2) -> CPU(3) -> IO(1) -> CPU(2)
"""
pid: str
arrival_time: int = 0
burst_time: int = 0 # 总CPU时间 = sum(cpu_bursts)
priority: int = 0 # 数值越小优先级越高
# ========== 新增 IO 模拟字段 ==========
cpu_bursts: List[int] = field(default_factory=list) # CPU计算时间段
io_bursts: List[int] = field(default_factory=list) # IO操作时间段
io_wait: int = 0 # IO设备等待时间
is_io_bound: bool = False # 是否为IO密集型进程
starts_with_io: bool = False # 是否以IO开始默认False=先CPU
# =====================================
# 调度相关状态
remaining_time: int = field(default=0)
start_time: int = -1
completion_time: int = 0
waiting_time: int = 0
turnaround_time: int = 0
weighted_turnaround_time: float = 0.0 # 带权周转时间 = 周转时间/服务时间
response_time: int = 0
io_blocked_time: int = 0 # 因等待IO而阻塞的时间
def __post_init__(self):
self.remaining_time = self.burst_time
# 如果没有指定 cpu_bursts使用 burst_time 作为单段CPU时间
if not self.cpu_bursts and self.burst_time > 0:
self.cpu_bursts = [self.burst_time]
@property
def total_cpu_time(self) -> int:
"""总CPU计算时间"""
return self.burst_time
@property
def total_io_time(self) -> int:
"""总IO操作时间"""
return sum(self.io_bursts) if self.io_bursts else 0
@property
def num_io_operations(self) -> int:
"""IO操作次数"""
return len(self.io_bursts)
def reset(self):
"""重置进程状态"""
self.remaining_time = self.burst_time
self.start_time = -1
self.completion_time = 0
self.waiting_time = 0
self.turnaround_time = 0
self.weighted_turnaround_time = 0.0
self.response_time = 0
self.io_blocked_time = 0
class ProcessScheduler:
"""进程调度器基类 (支持IO模拟)"""
# 进程状态
STATUS_NEW = 'NEW'
STATUS_READY = 'READY'
STATUS_RUNNING = 'RUNNING'
STATUS_IO_WAIT = 'IO_WAIT' # 等待IO完成
STATUS_TERMINATED = 'TERMINATED'
def __init__(self, processes: List[Process]):
self.processes = [p for p in processes]
self.results: List[Process] = []
self.gantt_chart: List[tuple] = [] # 甘特图记录 (时间, PID, 事件)
# IO 模拟相关
self.io_queue: List[Process] = [] # IO 等待队列 (等待IO完成的进程)
self.io_completion_events: List[tuple] = [] # IO 完成事件 (时间, 进程)
def init_process(self, p: Process, current_time: int):
"""初始化进程状态"""
p.status = self.STATUS_READY
p.current_cpu_idx = 0 # 当前执行的 cpu_burst 索引
p.current_io_idx = -1 # 当前等待的 io_burst 索引 (-1表示未进入IO)
p.remaining_cpu_time = sum(p.cpu_bursts) if p.cpu_bursts else p.burst_time
p.io_wait_time = 0 # 累计IO等待时间
if p.start_time == -1:
p.start_time = current_time
def execute_cpu(self, p: Process, time_slice: int, current_time: int):
"""执行一段 CPU burst"""
if p.current_cpu_idx >= len(p.cpu_bursts):
return 0 # 没有更多CPU burst
cpu_burst = p.cpu_bursts[p.current_cpu_idx]
execute_time = min(time_slice, cpu_burst)
p.remaining_cpu_time -= execute_time
# 记录甘特图
self.gantt_chart.append((current_time, p.pid, 'CPU', execute_time))
if execute_time >= cpu_burst:
# CPU burst 执行完毕
p.current_cpu_idx += 1
# 检查是否有对应的 IO burst
if p.current_cpu_idx - 1 < len(p.io_bursts):
# 进入 IO 等待
io_time = p.io_bursts[p.current_cpu_idx - 1]
p.current_io_idx = p.current_cpu_idx - 1
p.io_burst_remaining = io_time
p.status = self.STATUS_IO_WAIT
self.io_completion_events.append((current_time + io_time, p))
return execute_time
# 没有更多 burst进程结束
p.status = self.STATUS_TERMINATED
p.completion_time = current_time + execute_time
self.results.append(p)
return execute_time
else:
# 时间片用完但 CPU burst 还没完
return execute_time
def handle_io_completions(self, current_time: int):
"""处理当前时刻完成的 IO 事件"""
completed = []
for i, (io_time, p) in enumerate(self.io_completion_events):
if io_time <= current_time:
completed.append(i)
p.status = self.STATUS_READY
p.io_wait_time += io_time - (current_time - p.io_burst_remaining)
# 移除已处理的事件 (倒序删除避免索引问题)
for i in reversed(completed):
self.io_completion_events.pop(i)
return completed
def calculate_metrics(self, p: Process, current_time: int):
"""
计算进程性能指标 (支持IO模拟)
周转时间 (Turnaround Time): CT - AT
进程从提交到完成的总时间
带权周转时间 (Weighted Turnaround Time): TAT / BT
周转时间与服务时间的比值,衡量进程相对延迟
值越大说明等待时间相对服务时间越长
等待时间 (Waiting Time): TAT - BT
进程在就绪队列中等待的时间总和
注意: 这里只计算CPU等待时间不包括IO等待时间
响应时间 (Response Time): ST - AT
从提交到首次运行的时间
"""
if p.status == self.STATUS_TERMINATED:
p.completion_time = current_time if p.completion_time == 0 else p.completion_time
else:
p.completion_time = current_time
p.turnaround_time = p.completion_time - p.arrival_time
# 等待时间 = 周转时间 - 总CPU时间 - IO等待时间
p.waiting_time = p.turnaround_time - p.total_cpu_time - p.io_wait_time
if p.waiting_time < 0:
p.waiting_time = 0
p.response_time = p.start_time - p.arrival_time if p.start_time > 0 else 0
# 带权周转时间 = 周转时间 / 服务时间
p.weighted_turnaround_time = p.turnaround_time / p.burst_time if p.burst_time > 0 else 0
p.io_wait = p.io_wait_time
def print_results(self, algorithm_name: str) -> Dict:
"""打印调度结果 (支持IO模拟)"""
n = len(self.results)
if n == 0:
return {}
avg_waiting = sum(p.waiting_time for p in self.results) / n
avg_turnaround = sum(p.turnaround_time for p in self.results) / n
avg_weighted_turnaround = sum(p.weighted_turnaround_time for p in self.results) / n
avg_response = sum(p.response_time for p in self.results) / n
# 计算IO相关指标
total_io_time = sum(p.total_io_time for p in self.results)
total_io_wait = sum(p.io_wait for p in self.results)
avg_io_wait = total_io_wait / n if n > 0 else 0
io_bound_count = sum(1 for p in self.results if p.is_io_bound)
print(f"\n{'='*85}")
print(f"算法: {algorithm_name}")
print(f"{'='*85}")
print(f"{'PID':<6}{'类型':<6}{'起始':<6}{'到达':<6}{'服务':<6}{'IO次数':<6}{'IO时间':<8}"
f"{'周转':<6}{'CPU等待':<8}{'IO等待':<8}{'响应':<6}")
print("-" * 85)
for p in self.results:
proc_type = "IO" if p.is_io_bound else "CPU"
start_phase = "IO" if p.starts_with_io else "CPU"
print(f"{p.pid:<6}{proc_type:<6}{start_phase:<6}{p.arrival_time:<6}{p.burst_time:<6}"
f"{p.num_io_operations:<6}{p.total_io_time:<8}"
f"{p.turnaround_time:<6}{p.waiting_time:<8}{p.io_wait:<8}{p.response_time:<6}")
print("-" * 85)
print(f"{'平均等待时间:':<22} {avg_waiting:.2f}")
print(f"{'平均周转时间:':<22} {avg_turnaround:.2f}")
print(f"{'平均带权周转时间:':<22} {avg_weighted_turnaround:.2f}")
print(f"{'平均响应时间:':<22} {avg_response:.2f}")
# CPU 利用率计算 (考虑IO模拟)
total_cpu_time = sum(p.total_cpu_time for p in self.results) # 纯CPU时间
first_arrival = min(p.arrival_time for p in self.results)
last_completion = max(p.completion_time for p in self.results)
total_time = last_completion - first_arrival
# CPU利用率 = CPU执行时间 / 总时间
# 注意: IO期间CPU可以运行其他进程所以利用率应该更高
utilization = (total_cpu_time / total_time) * 100 if total_time > 0 else 0
# 计算CPU实际空闲时间
cpu_busy_time = len(self.gantt_chart) * 1 # 每个时间单位被计入
# 简化: 空闲时间 = 总时间 - CPU总执行时间
idle_time = total_time - total_cpu_time
print(f"\n{'='*85}")
print("性能统计")
print("-" * 85)
print(f"{'总执行时间:':<22} {total_time}")
print(f"{'总CPU时间:':<22} {total_cpu_time}")
print(f"{'CPU利用率:':<22} {utilization:.2f}%")
print(f"{'总IO时间:':<22} {total_io_time}")
print(f"{'总IO等待时间:':<22} {total_io_wait}")
print(f"{'平均IO等待:':<22} {avg_io_wait:.2f}")
print(f"{'IO密集型进程:':<22} {io_bound_count}/{n}")
return {
'avg_waiting': avg_waiting,
'avg_turnaround': avg_turnaround,
'avg_weighted_turnaround': avg_weighted_turnaround,
'avg_response': avg_response,
'cpu_utilization': utilization,
'total_time': total_time,
'total_cpu_time': total_cpu_time,
'total_io_time': total_io_time,
'total_io_wait': total_io_wait,
'avg_io_wait': avg_io_wait,
'io_bound_count': io_bound_count
}
def generate_random_processes(
n: int = 10,
seed: Optional[int] = 42,
arrival_range: tuple = (0, 50),
burst_range: tuple = (1, 20),
priority_range: tuple = (1, 10),
io_probability: float = 0.5, # 进程有IO操作的概率
io_count_range: tuple = (1, 4), # IO操作次数范围
io_time_range: tuple = (3, 15), # 每次IO时间范围
io_first_prob: float = 0.0, # 以IO开始的概率 (0.0=全部先CPU, 1.0=全部先IO)
) -> List[Process]:
"""
随机生成测试进程集 (支持IO模拟)
Args:
n: 进程数量
seed: 随机种子,确保可复现
arrival_range: 到达时间范围 (min, max)
burst_range: 服务时间范围 (min, max)
priority_range: 优先级范围 (min, max)
io_probability: 进程有IO操作的概率 (0.0-1.0)
io_count_range: IO操作次数范围 (min, max)
io_time_range: 每次IO时间范围 (min, max)
io_first_prob: 以IO开始的概率 (0.0=全部先CPU, 1.0=全部先IO, 0.5=随机)
Returns:
进程列表
示例:
# 生成10个进程50%有IO全部先CPU
processes = generate_random_processes(n=10, io_probability=0.5)
# 生成IO密集型场景30%以IO开始
processes = generate_random_processes(n=10, io_probability=0.8, io_first_prob=0.3)
"""
if seed is not None:
random.seed(seed)
processes = []
for i in range(n):
# 生成基本属性
arrival = random.randint(*arrival_range)
total_burst = random.randint(*burst_range)
priority = random.randint(*priority_range)
# 确定是否为IO密集型进程
is_io_bound = random.random() < io_probability
# 确定起始阶段
starts_with_io = random.random() < io_first_prob if io_first_prob > 0 else False
cpu_bursts = []
io_bursts = []
if is_io_bound:
# IO密集型: 短CPU计算 + 长IO等待
num_io = random.randint(*io_count_range)
if starts_with_io:
# 先IO再CPU: IO -> CPU -> IO -> CPU -> ...
for j in range(num_io):
# IO burst
io_time = random.randint(*io_time_range)
io_bursts.append(io_time)
# CPU burst
remaining = total_burst - sum(cpu_bursts)
max_cpu = min(3, remaining - (num_io - j - 1))
if max_cpu < 1:
max_cpu = 1
cpu_time = random.randint(1, max_cpu)
cpu_bursts.append(cpu_time)
# 最后一段CPU
remaining = total_burst - sum(cpu_bursts)
if remaining > 0:
cpu_bursts.append(remaining)
else:
# 先CPU再IO: CPU -> IO -> CPU -> IO -> ...
remaining = total_burst
for j in range(num_io + 1):
if j < num_io:
# CPU burst: 短时间 (1-3)
max_cpu = min(3, remaining - (num_io - j))
if max_cpu < 1:
max_cpu = 1
cpu_time = random.randint(1, max_cpu)
else:
# 最后一段CPU时间
cpu_time = remaining
if cpu_time > 0:
cpu_bursts.append(cpu_time)
remaining -= cpu_time
if j < num_io and remaining > 0:
# IO burst: 较长时间 (长IO等待)
io_time = random.randint(*io_time_range)
io_bursts.append(io_time)
else:
# CPU密集型: 长CPU计算 + 无/少IO
cpu_bursts = [total_burst]
p = Process(
pid=f'P{i+1}',
arrival_time=arrival,
burst_time=total_burst,
priority=priority,
cpu_bursts=cpu_bursts,
io_bursts=io_bursts,
is_io_bound=is_io_bound,
starts_with_io=starts_with_io
)
processes.append(p)
# 按到达时间排序
processes.sort(key=lambda p: p.arrival_time)
return processes
def print_processes(processes: List[Process], title: str = "测试数据"):
"""打印进程信息 (包含IO模拟信息)"""
print(f"\n{'='*80}")
print(f"{title}")
print(f"{'='*80}")
print(f"{'PID':<6}{'类型':<6}{'起始':<6}{'到达':<6}{'服务':<6}{'IO次数':<6}{'IO时间':<8}{'优先级':<6}")
print("-" * 80)
io_bound_count = 0
io_first_count = 0
for p in processes:
proc_type = "IO" if p.is_io_bound else "CPU"
start_phase = "IO" if p.starts_with_io else "CPU"
if p.is_io_bound:
io_bound_count += 1
if p.starts_with_io:
io_first_count += 1
io_time = p.total_io_time
print(f"{p.pid:<6}{proc_type:<6}{start_phase:<6}{p.arrival_time:<6}{p.burst_time:<6}"
f"{p.num_io_operations:<6}{io_time:<8}{p.priority:<6}")
print("-" * 80)
print(f"总进程数: {len(processes)}, CPU密集型: {len(processes)-io_bound_count}, IO密集型: {io_bound_count}")
print(f"起始阶段: 先CPU: {len(processes)-io_first_count}, 先IO: {io_first_count}")
# 详细显示有IO的进程的CPU和IO burst
io_processes = [p for p in processes if p.io_bursts]
if io_processes:
print(f"\n{'='*80}")
print("IO 操作详细 (执行模式)")
print("-" * 80)
for p in io_processes:
# 构建执行模式字符串
pattern = []
if p.starts_with_io:
# 先IO再CPU
for i, io in enumerate(p.io_bursts):
pattern.append(f"IO({io})")
if i < len(p.cpu_bursts):
pattern.append(f"CPU({p.cpu_bursts[i]})")
else:
# 先CPU再IO
for i, cpu in enumerate(p.cpu_bursts):
pattern.append(f"CPU({cpu})")
if i < len(p.io_bursts):
pattern.append(f"IO({p.io_bursts[i]})")
print(f"{p.pid}: {' -> '.join(pattern)}")
def print_comparison(results: Dict[str, Dict]):
"""打印算法比较结果 (包含IO指标)"""
print("\n" + "="*100)
print("算法性能比较")
print("="*100)
print(f"{'算法':<8}{'平均等待':<10}{'平均周转':<10}{'平均带权周转':<12}{'平均响应':<10}{'CPU利用率':<10}{'IO阻塞':<10}")
print("-" * 100)
for name, metrics in results.items():
print(f"{name:<8}{metrics['avg_waiting']:<10.2f}{metrics['avg_turnaround']:<10.2f}"
f"{metrics.get('avg_weighted_turnaround', 0):<12.2f}"
f"{metrics['avg_response']:<10.2f}"
f"{metrics['cpu_utilization']:<10.2f}%"
f"{metrics.get('avg_io_wait', 0):<10.2f}")