#!/usr/bin/env python3 """ 进程调度算法模拟器 - 主程序 运行方式: python main.py # 默认小数据集测试 python main.py -n 100 # 生成 100 个随机进程 python main.py -n 1000 --seed 42 # 固定种子 42 python main.py -a P1,P2 # 指定运行的算法 python main.py --demo # 使用默认演示数据 参数说明: -n, --num: 进程数量 (默认 10) -s, --seed: 随机种子 (默认 42, 设为 None 则随机) -a, --algo: 运行的算法 (默认全部) --demo: 使用预设演示数据 """ import argparse import copy from typing import List, Dict, Optional from base import ( Process, ProcessScheduler, generate_random_processes, print_processes, print_comparison ) from fcfs import FCFSScheduler from sjf import SJFScheduler from rr import RoundRobinScheduler from priority import PriorityScheduler from mlfq import MLFQScheduler from hrrn import HRRNScheduler # 预设演示数据 (包含IO模拟) DEMO_PROCESSES = [ # CPU密集型进程 Process(pid='P1', arrival_time=0, burst_time=7, priority=3, cpu_bursts=[7], io_bursts=[], is_io_bound=False), # IO密集型进程: CPU -> IO -> CPU -> IO -> CPU Process(pid='P2', arrival_time=2, burst_time=8, priority=1, cpu_bursts=[3, 2, 3], io_bursts=[5, 4], is_io_bound=True), Process(pid='P3', arrival_time=4, burst_time=1, priority=4, cpu_bursts=[1], io_bursts=[], is_io_bound=False), # IO密集型进程 Process(pid='P4', arrival_time=5, burst_time=6, priority=2, cpu_bursts=[2, 2, 2], io_bursts=[6, 5], is_io_bound=True), Process(pid='P5', arrival_time=6, burst_time=2, priority=3, cpu_bursts=[2], io_bursts=[], is_io_bound=False), ] def get_algorithm_config() -> Dict: """算法配置:名称与调度器类的映射""" return { 'FCFS': FCFSScheduler, 'SJF': lambda p: SJFScheduler(p, preemptive=False), 'SRTF': lambda p: SJFScheduler(p, preemptive=True), 'Priority': lambda p: PriorityScheduler(p, preemptive=False), 'PriorityP': lambda p: PriorityScheduler(p, preemptive=True), 'RR': lambda p: RoundRobinScheduler(p, time_slice=4), 'MLFQ': lambda p: MLFQScheduler(p), 'HRRN': HRRNScheduler, } def run_all_algorithms(processes: List[Process], algorithms: Optional[List[str]] = None): """运行所有/指定调度算法并比较""" # 深拷贝进程列表 (使用 copy.deepcopy 确保 cpu_bursts/io_bursts 等列表字段独立) original_processes = [copy.deepcopy(p) for p in processes] # 默认运行所有算法 if algorithms is None: algorithms = list(get_algorithm_config().keys()) results = {} scheduler_map = get_algorithm_config() for algo_name in algorithms: if algo_name not in scheduler_map: print(f"警告: 未知算法 '{algo_name}', 跳过") continue # 每次重新创建进程列表 (深拷贝确保各算法互不影响) process_list = [copy.deepcopy(p) for p in original_processes] # 获取调度器 scheduler_class = scheduler_map[algo_name] scheduler = scheduler_class(process_list) # 运行调度 metrics = scheduler.schedule() results[algo_name] = metrics # 打印比较结果 print_comparison(results) # 打印最优指标提示 print("\n" + "="*70) print("指标分析") print("="*70) metrics_names = [ ('avg_waiting', '平均等待时间'), ('avg_turnaround', '平均周转时间'), ('avg_weighted_turnaround', '平均带权周转时间'), ('avg_response', '平均响应时间'), ] for metric_key, metric_name in metrics_names: values = {name: m[metric_key] for name, m in results.items()} best_algo = min(values, key=values.get) print(f"{metric_name}: {best_algo} ({values[best_algo]:.2f}) 最优") return results def main(): parser = argparse.ArgumentParser( description='进程调度算法模拟器', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) parser.add_argument('-n', '--num', type=int, default=10, help='随机生成的进程数量 (默认: 10)') parser.add_argument('-s', '--seed', type=int, default=42, help='随机种子 (默认: 42, 设为 None 则每次不同)') parser.add_argument('-a', '--algo', type=str, default=None, help='运行的算法,逗号分隔 (如: FCFS,SJF,RR)') parser.add_argument('--demo', action='store_true', help='使用预设演示数据') parser.add_argument('--arrival', type=str, default='0,50', help='到达时间范围 (格式: min,max)') parser.add_argument('--burst', type=str, default='1,20', help='服务时间范围 (格式: min,max)') parser.add_argument('--priority', type=str, default='1,10', help='优先级范围 (格式: min,max)') parser.add_argument('--io-prob', type=float, default=0.5, help='进程有IO操作的概率 0.0-1.0 (默认: 0.5)') parser.add_argument('--io-count', type=str, default='1,4', help='IO操作次数范围 (格式: min,max, 默认: 1,4)') parser.add_argument('--io-time', type=str, default='3,15', help='每次IO时间范围 (格式: min,max, 默认: 3,15)') parser.add_argument('--io-first', type=float, default=0.0, help='以IO开始的概率 0.0-1.0 (默认: 0.0, 即全部先CPU)') args = parser.parse_args() # 解析算法列表 algorithms = None if args.algo: algorithms = [a.strip() for a in args.algo.split(',')] # 生成测试数据 if args.demo: processes = DEMO_PROCESSES print_processes(processes, "演示数据 (5 个进程)") else: # 解析范围 arrival_range = tuple(map(int, args.arrival.split(','))) burst_range = tuple(map(int, args.burst.split(','))) priority_range = tuple(map(int, args.priority.split(','))) io_count_range = tuple(map(int, args.io_count.split(','))) io_time_range = tuple(map(int, args.io_time.split(','))) seed = args.seed if args.seed != -1 else None processes = generate_random_processes( n=args.num, seed=seed, arrival_range=arrival_range, burst_range=burst_range, priority_range=priority_range, io_probability=args.io_prob, io_count_range=io_count_range, io_time_range=io_time_range, io_first_prob=args.io_first ) title = f"随机测试数据 (n={args.num}, seed={args.seed if seed else 'None'})" print_processes(processes, title) # 运行算法 print(f"\n运行算法: {algorithms if algorithms else '全部'}") run_all_algorithms(processes, algorithms) if __name__ == "__main__": main()