postgraduate-prep/experiment/code/base.py

174 lines
6.0 KiB
Python

#!/usr/bin/env python3
"""
进程调度算法模块 - 公共基础类
"""
import random
from dataclasses import dataclass, field
from typing import List, Dict, Optional
@dataclass
class Process:
"""进程数据结构"""
pid: str
arrival_time: int = 0
burst_time: int = 0
priority: int = 0 # 数值越小优先级越高
remaining_time: int = field(default=0)
start_time: int = -1
completion_time: int = 0
waiting_time: int = 0
turnaround_time: int = 0
weighted_turnaround_time: float = 0.0 # 带权周转时间 = 周转时间/服务时间
response_time: int = 0
def __post_init__(self):
self.remaining_time = self.burst_time
def reset(self):
"""重置进程状态"""
self.remaining_time = self.burst_time
self.start_time = -1
self.completion_time = 0
self.waiting_time = 0
self.turnaround_time = 0
self.weighted_turnaround_time = 0.0
self.response_time = 0
class ProcessScheduler:
"""进程调度器基类"""
def __init__(self, processes: List[Process]):
self.processes = [p for p in processes]
self.results: List[Process] = []
def calculate_metrics(self, p: Process, current_time: int):
"""
计算进程性能指标
周转时间 (Turnaround Time): CT - AT
进程从提交到完成的总时间
带权周转时间 (Weighted Turnaround Time): TAT / BT
周转时间与服务时间的比值,衡量进程相对延迟
值越大说明等待时间相对服务时间越长
等待时间 (Waiting Time): TAT - BT
进程在就绪队列中等待的时间总和
响应时间 (Response Time): ST - AT
从提交到首次运行的时间
"""
p.completion_time = current_time
p.turnaround_time = p.completion_time - p.arrival_time
p.waiting_time = p.turnaround_time - p.burst_time
p.response_time = p.start_time - p.arrival_time
# 带权周转时间 = 周转时间 / 服务时间
p.weighted_turnaround_time = p.turnaround_time / p.burst_time if p.burst_time > 0 else 0
def print_results(self, algorithm_name: str) -> Dict:
"""打印调度结果"""
n = len(self.results)
avg_waiting = sum(p.waiting_time for p in self.results) / n
avg_turnaround = sum(p.turnaround_time for p in self.results) / n
avg_weighted_turnaround = sum(p.weighted_turnaround_time for p in self.results) / n
avg_response = sum(p.response_time for p in self.results) / n
print(f"\n{'='*70}")
print(f"算法: {algorithm_name}")
print(f"{'='*70}")
print(f"{'PID':<6}{'到达':<6}{'服务':<6}{'开始':<6}{'完成':<6}"
f"{'等待':<6}{'周转':<6}{'带权周转':<10}{'响应':<6}")
print("-" * 70)
for p in self.results:
print(f"{p.pid:<6}{p.arrival_time:<6}{p.burst_time:<6}"
f"{p.start_time:<6}{p.completion_time:<6}"
f"{p.waiting_time:<6}{p.turnaround_time:<6}"
f"{p.weighted_turnaround_time:<10.2f}"
f"{p.response_time:<6}")
print("-" * 70)
print(f"{'平均等待时间:':<18} {avg_waiting:.2f}")
print(f"{'平均周转时间:':<18} {avg_turnaround:.2f}")
print(f"{'平均带权周转时间:':<18} {avg_weighted_turnaround:.2f}")
print(f"{'平均响应时间:':<18} {avg_response:.2f}")
# CPU 利用率
total_burst = sum(p.burst_time for p in self.results)
total_time = self.results[-1].completion_time - self.results[0].arrival_time
utilization = (total_burst / total_time) * 100 if total_time > 0 else 0
print(f"{'CPU 利用率:':<18} {utilization:.2f}%")
return {
'avg_waiting': avg_waiting,
'avg_turnaround': avg_turnaround,
'avg_weighted_turnaround': avg_weighted_turnaround,
'avg_response': avg_response,
'cpu_utilization': utilization
}
def generate_random_processes(
n: int = 10,
seed: Optional[int] = 42,
arrival_range: tuple = (0, 50),
burst_range: tuple = (1, 20),
priority_range: tuple = (1, 10)
) -> List[Process]:
"""
随机生成测试进程集
Args:
n: 进程数量
seed: 随机种子,确保可复现
arrival_range: 到达时间范围 (min, max)
burst_range: 服务时间范围 (min, max)
priority_range: 优先级范围 (min, max)
Returns:
进程列表
"""
if seed is not None:
random.seed(seed)
processes = []
for i in range(n):
p = Process(
pid=f'P{i+1}',
arrival_time=random.randint(*arrival_range),
burst_time=random.randint(*burst_range),
priority=random.randint(*priority_range)
)
processes.append(p)
# 按到达时间排序
processes.sort(key=lambda p: p.arrival_time)
return processes
def print_processes(processes: List[Process], title: str = "测试数据"):
"""打印进程信息"""
print(f"\n{'='*60}")
print(f"{title}")
print(f"{'='*60}")
print(f"{'PID':<8}{'到达时间':<10}{'服务时间':<10}{'优先级':<10}")
print("-" * 60)
for p in processes:
print(f"{p.pid:<8}{p.arrival_time:<10}{p.burst_time:<10}{p.priority:<10}")
def print_comparison(results: Dict[str, Dict]):
"""打印算法比较结果"""
print("\n" + "="*90)
print("算法性能比较")
print("="*90)
print(f"{'算法':<10}{'平均等待':<12}{'平均周转':<12}{'平均带权周转':<14}{'平均响应':<12}{'CPU利用率':<12}")
print("-" * 90)
for name, metrics in results.items():
print(f"{name:<10}{metrics['avg_waiting']:<12.2f}{metrics['avg_turnaround']:<12.2f}"
f"{metrics.get('avg_weighted_turnaround', 0):<14.2f}"
f"{metrics['avg_response']:<12.2f}{metrics['cpu_utilization']:<12.2f}%")