postgraduate-prep/experiment/code/base.py

150 lines
4.8 KiB
Python

#!/usr/bin/env python3
"""
进程调度算法模块 - 公共基础类
"""
import random
from dataclasses import dataclass, field
from typing import List, Dict, Optional
@dataclass
class Process:
"""进程数据结构"""
pid: str
arrival_time: int = 0
burst_time: int = 0
priority: int = 0 # 数值越小优先级越高
remaining_time: int = field(default=0)
start_time: int = -1
completion_time: int = 0
waiting_time: int = 0
turnaround_time: int = 0
response_time: int = 0
def __post_init__(self):
self.remaining_time = self.burst_time
def reset(self):
"""重置进程状态"""
self.remaining_time = self.burst_time
self.start_time = -1
self.completion_time = 0
self.waiting_time = 0
self.turnaround_time = 0
self.response_time = 0
class ProcessScheduler:
"""进程调度器基类"""
def __init__(self, processes: List[Process]):
self.processes = [p for p in processes]
self.results: List[Process] = []
def calculate_metrics(self, p: Process, current_time: int):
"""计算进程性能指标"""
p.completion_time = current_time
p.turnaround_time = p.completion_time - p.arrival_time
p.waiting_time = p.turnaround_time - p.burst_time
p.response_time = p.start_time - p.arrival_time
def print_results(self, algorithm_name: str) -> Dict:
"""打印调度结果"""
n = len(self.results)
avg_waiting = sum(p.waiting_time for p in self.results) / n
avg_turnaround = sum(p.turnaround_time for p in self.results) / n
avg_response = sum(p.response_time for p in self.results) / n
print(f"\n{'='*60}")
print(f"算法: {algorithm_name}")
print(f"{'='*60}")
print(f"{'PID':<8}{'到达':<8}{'服务':<8}{'开始':<8}{'完成':<8}"
f"{'等待':<8}{'周转':<8}{'响应':<8}")
print("-" * 60)
for p in self.results:
print(f"{p.pid:<8}{p.arrival_time:<8}{p.burst_time:<8}"
f"{p.start_time:<8}{p.completion_time:<8}"
f"{p.waiting_time:<8}{p.turnaround_time:<8}"
f"{p.response_time:<8}")
print("-" * 60)
print(f"{'平均等待时间:':<15} {avg_waiting:.2f}")
print(f"{'平均周转时间:':<15} {avg_turnaround:.2f}")
print(f"{'平均响应时间:':<15} {avg_response:.2f}")
# CPU 利用率
total_burst = sum(p.burst_time for p in self.results)
total_time = self.results[-1].completion_time - self.results[0].arrival_time
utilization = (total_burst / total_time) * 100 if total_time > 0 else 0
print(f"{'CPU 利用率:':<15} {utilization:.2f}%")
return {
'avg_waiting': avg_waiting,
'avg_turnaround': avg_turnaround,
'avg_response': avg_response,
'cpu_utilization': utilization
}
def generate_random_processes(
n: int = 10,
seed: Optional[int] = 42,
arrival_range: tuple = (0, 50),
burst_range: tuple = (1, 20),
priority_range: tuple = (1, 10)
) -> List[Process]:
"""
随机生成测试进程集
Args:
n: 进程数量
seed: 随机种子,确保可复现
arrival_range: 到达时间范围 (min, max)
burst_range: 服务时间范围 (min, max)
priority_range: 优先级范围 (min, max)
Returns:
进程列表
"""
if seed is not None:
random.seed(seed)
processes = []
for i in range(n):
p = Process(
pid=f'P{i+1}',
arrival_time=random.randint(*arrival_range),
burst_time=random.randint(*burst_range),
priority=random.randint(*priority_range)
)
processes.append(p)
# 按到达时间排序
processes.sort(key=lambda p: p.arrival_time)
return processes
def print_processes(processes: List[Process], title: str = "测试数据"):
"""打印进程信息"""
print(f"\n{'='*60}")
print(f"{title}")
print(f"{'='*60}")
print(f"{'PID':<8}{'到达时间':<10}{'服务时间':<10}{'优先级':<10}")
print("-" * 60)
for p in processes:
print(f"{p.pid:<8}{p.arrival_time:<10}{p.burst_time:<10}{p.priority:<10}")
def print_comparison(results: Dict[str, Dict]):
"""打印算法比较结果"""
print("\n" + "="*60)
print("算法性能比较")
print("="*60)
print(f"{'算法':<15}{'平均等待':<12}{'平均周转':<12}{'平均响应':<12}{'CPU利用率':<12}")
print("-" * 60)
for name, metrics in results.items():
print(f"{name:<15}{metrics['avg_waiting']:<12.2f}{metrics['avg_turnaround']:<12.2f}"
f"{metrics['avg_response']:<12.2f}{metrics['cpu_utilization']:<12.2f}%")