Claude Opus 4 7 在「实现拓扑排序任务调度器」的评测结果
这是该 AI 模型在此测试用例上的详细评测结果。
基本信息
- 模型名称:Claude Opus 4 7
- 用例名称:实现拓扑排序任务调度器
- 测试类型:文本生成
- 评测维度:代码生成
系统提示词(System Prompt)
这是给 AI 模型的背景设定和角色指令:
你是一名资深后端算法工程师,专注于图算法与任务调度领域。 回答要求: 1. 代码实现清晰简洁,优先保证逻辑正确性,使用 Python 或其他主流语言均可 2. 必须包含完整可运行的代码,并附有简要的算法思路说明 3. 对关键步骤(如入度计算、队列处理、环路检测)添加注释 4. 提供至少两个测试用例验证:一个正常有序图,一个含环图 5. 输出结果需明确区分「成功排序」与「存在循环依赖」两种情况
用户提示词(User Prompt)
这是用户给 AI 模型的具体任务要求:
## 任务:实现基础拓扑排序算法 请实现一个拓扑排序函数,用于确定一组任务的合法执行顺序。 ### 背景说明 在项目管理中,某些任务必须在其他任务完成后才能开始(依赖关系)。 拓扑排序可以找到一个满足所有依赖关系的执行顺序。 若任务之间存在循环依赖(如 A 依赖 B,B 又依赖 A),则无法排序。 ### 输入格式 - `tasks`:任务 ID 列表,例如 `[「A」, 「B」, 「C」, 「D」]` - `dependencies`:依赖关系列表,每项为 `(先决任务, 后续任务)` 的元组, 表示「先决任务必须在后续任务之前完成」, 例如 `[(「A」, 「B」), (「A」, 「C」), (「B」, 「D」)]` 表示 A→B→D,A→C ### 输出格式 - 若排序成功:返回/打印一个合法的任务执行顺序列表,例如 `[「A」, 「B」, 「C」, 「D」]` - 若存在循环依赖:返回/打印明确的错误提示,例如 `「Error: 存在循环依赖,无法完成拓扑排序」` ### 功能要求 1. 使用 **Kahn 算法(BFS 入度法)** 或 **DFS 后序遍历法** 实现拓扑排序 2. 正确处理以下情况: - 标准有向无环图(DAG):输出合法执行顺序 - 含环图:检测并报告循环依赖错误 - 孤立节点(无任何依赖关系的任务):应包含在输出中 - 空任务列表:返回空列表 3. 函数签名建议:`topological_sort(tasks: list, dependencies: list) -> list` ### 测试用例 **用例 1(正常 DAG)**
评测结果
综合得分
该模型在此用例的得分为:94.2 分(满分 100 分),已通过
基础难度评测结果
- 得分:94.2 分
- 通过状态:已通过
模型生成产物
以下是该模型针对此任务的实际生成内容:
```python from collections import deque, defaultdict def topological_sort(tasks, dependencies): """ 使用 Kahn 算法(BFS 入度法)实现拓扑排序 参数: tasks: 任务 ID 列表 dependencies: 依赖关系列表,格式为 [(先决任务, 后续任务), ...] 返回: 成功: 返回合法的任务执行顺序列表 失败: 返回错误信息字符串 """ # 边界情况:空任务列表 if not tasks: return [] # 初始化数据结构 in_degree = {task: 0 for task in tasks} # 每个任务的入度 graph = defaultdict(list) # 邻接表:任务 -> 依赖它的后续任务列表 # 构建图和计算入度 for pre, post in dependencies: graph[pre].append(post) in_degree[post] += 1 # 将所有入度为 0 的任务加入队列(可以立即执行的任务) queue = deque([task for task in tasks if in_degree[task] == 0]) result = [] # BFS 处理 while queue: # 取出一个入度为 0 的任务 current = queue.popleft() result.append(current) # 遍历所有依赖当前任务的后续任务 for neighbor in graph[current]: in_degree[neighbor] -= 1 # 移除当前任务后,后续任务入度减 1 if in_degree[neighbor] == 0: # 入度变为 0,可以执行 queue.append(neighbor) # 检测循环依赖:如果处理的任务数少于总任务数,说明存在环 if len(result) != len(tasks): return "Error: 存在循环依赖,无法完成拓扑排序" return result # ========== 测试用例 ========== # 测试用例 1:正常 DAG print("=== 测试用例 1:正常有向无环图 ===") tasks1 = ['A', 'B', 'C', 'D'] dependencies1 = [('A', 'B'), ('A', 'C'), ('B', 'D'), ('C', 'D')] result1 = topological_sort(tasks1, dependencies1) print(f"任务列表: {tasks1}") print(f"依赖关系: {dependencies1}") print(f"执行顺序: {result1}") print() # 测试用例 2:含环图 print("=== 测试用例 2:存在循环依赖 ===") tasks2 = ['A', 'B', 'C'] dependencies2 = [('A', 'B'), ('B', 'C'), ('C', 'A')] # A→B→C→A 形成环 result2 = topological_sort(tasks2, dependencies2) print(f"任务列表: {tasks2}") print(f"依赖关系: {dependencies2}") print(f"结果: {result2}") print() # 测试用例 3:包含孤立节点 print("=== 测试用例 3:包含孤立节点 ===") tasks3 = ['A', 'B', 'C', 'D', 'E'] dependencies3 = [('A', 'B'), ('C', 'D')] # E 是孤立节点 result3 = topological_sort(tasks3, dependencies3) print(f"任务列表: {tasks3}") print(f"依赖关系: {dependencies3}") print(f"执行顺序: {result3}") print() # 测试用例 4:空任务列表 print("=== 测试用例 4:空任务列表 ===") tasks4 = [] dependencies4 = [] result4 = topological_sort(tasks4, dependencies4) print(f"任务列表: {tasks4}") print(f"执行顺序: {result4}") print() # 测试用例 5:复杂 DAG...
AI 评审点评
以下是 AI 评审对该模型输出的点评:
【CLAUDE】这是一份高质量的拓扑排序实现。代码完全满足题目的所有功能要求,使用标准Kahn算法实现,逻辑清晰正确,测试覆盖全面。算法实现规范,关键步骤注释到位,复杂度分析准确。代码风格优秀,函数签名和文档完整,变量命名清晰。提供的5个测试用例超出要求,涵盖了正常DAG、循环依赖、孤立节点、空列表和复杂依赖等多种场景,充分验证了实现的正确性。整体表现优异,是一份可直接用于生产环境的高质量代码。 【GEMINI】这是一个模范的算法实现方案。代码严谨、注释清晰、测试充分,完美解决了拓扑排序及循环依赖检测的任务需求,展现了资深后端算法工程师的专业水平。 【KIMI】整体而言,这段代码实现了Kahn算法(BFS入度法)进行拓扑排序,正确处理了各类边界情况。代码风格良好,关键步骤有注释说明,测试用例覆盖全面。在算法实现规范性和代码质量方面表现优秀。
进阶难度评测结果
- 得分:91.0 分
- 通过状态:已通过
模型生成产物
以下是该模型针对此任务的实际生成内容:
我会为你设计一个完整的任务调度系统,支持并行执行和最优执行计划计算。 ```python from collections import defaultdict, deque from typing import List, Dict, Set, Tuple import json class Task: """任务节点定义""" def __init__(self, task_id: str, duration: int, description: str = ""): self.task_id = task_id self.duration = duration # 任务执行时长 self.description = description def __repr__(self): return f"Task({self.task_id}, duration={self.duration})" class TaskScheduler: """ 基于拓扑排序的任务调度器 核心数据结构: - tasks: Dict[str, Task] - 任务ID到任务对象的映射 - graph: Dict[str, List[str]] - 邻接表表示的依赖图(A->B表示A依赖B完成后才能执行) - in_degree: Dict[str, int] - 每个任务的入度(依赖数量) 算法选择理由: - 拓扑排序(Kahn算法):O(V+E)时间复杂度,适合DAG的依赖解析 - 关键路径法(CPM):计算最早/最晚开始时间,识别关键路径 - 层次分组:基于拓扑层次识别可并行执行的任务组 """ def __init__(self): self.tasks: Dict[str, Task] = {} self.graph: Dict[str, List[str]] = defaultdict(list) # task_id -> [dependent_task_ids] self.reverse_graph: Dict[str, List[str]] = defaultdict(list) # 反向图,用于关键路径计算 self.in_degree: Dict[str, int] = defaultdict(int) def add_task(self, task: Task): """ 添加任务节点 时间复杂度:O(1) """ if task.task_id in self.tasks: raise ValueError(f"任务 {task.task_id} 已存在") self.tasks[task.task_id] = task if task.task_id not in self.in_degree: self.in_degree[task.task_id] = 0 def add_dependency(self, task_id: str, depends_on: str): """ 添加依赖关系:task_id 依赖于 depends_on 即 depends_on 必须在 task_id 之前完成 时间复杂度:O(1) """ if task_id not in self.tasks: raise ValueError(f"任务 {task_id} 不存在") if depends_on not in self.tasks: raise ValueError(f"依赖任务 {depends_on} 不存在") self.graph[depends_on].append(task_id) self.reverse_graph[task_id].append(depends_on) self.in_degree[task_id] += 1 def _detect_cycle(self) -> bool: """ 检测循环依赖 时间复杂度:O(V+E) """ visited = set() rec_stack = set() def dfs(node: str) -> bool: visited.add(node) rec_stack.add(node) for neighbor in self.graph[node]: if neighbor not in visited: if dfs(neighbor): return True elif neighbor in rec_stack: return True rec_stack.remove(node) return False for task_id in self.tasks: if task_id not in visited: if...
AI 评审点评
以下是 AI 评审对该模型输出的点评:
【CLAUDE】这是一个高质量的任务调度系统实现。算法选择恰当(Kahn 拓扑排序 + CPM 关键路径),实现正确且高效。功能完整,覆盖所有核心需求和边界情况,异常处理规范。代码结构优秀,文档详尽,注释清晰,具有良好的工程实践价值。输出格式兼顾结构化数据和人类可读性,实用性强。整体达到资深工程师水准,可直接用于生产环境参考。 【GEMINI】这是一份非常优秀的 AI 生成结果。作为资深后端架构师的角色定位明确,代码实现专业且健壮。虽然在关键路径的算法细节(LST 计算锚点)上存在细微逻辑瑕疵,但整体功能、并行调度逻辑和工程质量均处于高水平,完全能够解决用户提出的任务调度需求。 【KIMI】整体上,代码实现了题目要求的任务调度系统功能,算法选择合理,代码结构清晰。在正确性、完整性和代码质量方面都表现不错。不过在异常处理和代码注释方面还有改进空间。
困难难度评测结果
- 得分:84.9 分
- 通过状态:已通过
模型生成产物
以下是该模型针对此任务的实际生成内容:
我将实现一个生产级的动态 DAG 任务调度引擎,包含完整的状态管理、资源调度和条件分支功能。 ```python from dataclasses import dataclass, field from enum import Enum from typing import Dict, List, Set, Optional, Callable, Any from collections import deque, defaultdict import time import json # ==================== 枚举定义 ==================== class TaskStatus(Enum): """任务状态枚举""" PENDING = "pending" # 初始状态 READY = "ready" # 依赖满足,等待资源 RUNNING = "running" # 执行中 SUCCESS = "success" # 成功完成 FAILED = "failed" # 失败(含重试耗尽) SKIPPED = "skipped" # 条件不满足被跳过 class DependencyCondition(Enum): """依赖触发条件""" ON_SUCCESS = "on_success" # 前置任务成功时触发 ON_FAILURE = "on_failure" # 前置任务失败时触发 ON_COMPLETE = "on_complete" # 无论成功失败均触发 # ==================== 数据类定义 ==================== @dataclass class ResourceRequirement: """资源需求""" cpu_cores: int = 1 memory_mb: int = 512 def __post_init__(self): if self.cpu_cores bool: """检查是否有足够资源""" return (self.available_cpu >= req.cpu_cores and self.available_memory >= req.memory_mb) def allocate(self, req: ResourceRequirement) -> bool: """分配资源""" if not self.can_allocate(req): return False self.available_cpu -= req.cpu_cores self.available_memory -= req.memory_mb # 更新峰值 used_cpu = self.total_cpu - self.available_cpu used_memory = self.total_memory - self.available_memory self.peak_cpu_usage = max(self.peak_cpu_usage, used_cpu) self.peak_memory_usage = max(self.peak_memory_usage, used_memory) return True def release(self, req: ResourceRequirement): """释放资源""" self.available_cpu = min(self.total_cpu, self.available_cpu + req.cpu_cores) self.available_memory = min(self.total_memory, self.available_memory + req.memory_mb) # ==================== 调度引擎核心 ==================== class DAGScheduler: """ 动态 DAG 任务调度引擎 核心职责: 1. 维护任务 DAG 并保证无环性 2. 根据依赖关系和资源约束调度任务 3. 处理条件分支和失败重试 4. 提供执行计划和监控指标 """ def __init__(self, total_cpu: int = 8, total_memory: int = 16384): self.tasks: Dict[str, Task] = {} self.resource_pool = ResourcePool(total_cpu, total_memory) # 依赖关系图(邻接表) self.downstream_map: Dict[str, Set[str]] =...
AI 评审点评
以下是 AI 评审对该模型输出的点评:
【CLAUDE】这是一个结构完整、工程化程度较高的任务调度引擎实现,代码组织清晰,类型注解和文档完善,功能覆盖了题目要求的所有核心点。主要优点在于模块化设计、资源管理机制和执行计划生成。然而在核心调度逻辑上存在明显缺陷:同步执行模型导致无法真正实现并行调度,重试机制的阻塞式 sleep 和资源泄漏问题会导致生产环境故障,条件依赖的 SKIPPED 状态传播不完整。这些问题使得代码虽然「看起来完整」但实际运行时会出现资源死锁、调度停滞等严重问题。建议重构为异步调度模型(如使用线程池或协程),修复资源释放逻辑,并补充针对边界情况的单元测试。总体而言,这是一个及格偏上的实现,展示了良好的代码组织能力,但核心算法的正确性需要改进才能达到「生产可用」的标准。 【GEMINI】这是一个非常高质量的实现。作者不仅完成了基础的拓扑排序调度,还深入实现了资源约束下的并行度模拟、复杂的条件分支逻辑以及生产环境所需的监控指标(如关键路径分析)。代码体现了高级后端架构师的水平,逻辑严密,算法扎实,完全符合‘生产可用’的设计初衷。 【KIMI】整体来看,这个实现已经非常全面和完善,涵盖了题目要求的所有核心功能点,代码结构清晰,模块化设计合理。在核心功能实现正确性、功能完整性、代码质量等方面都表现良好。但是,还可以在边界情况处理、注释和命名优化等方面进一步完善,提升代码的健壮性和可维护性。
相关链接
您可以通过以下链接查看更多相关内容: