Claude Opus 4 7 在「实现多线程任务调度器」的评测结果

这是该 AI 模型在此测试用例上的详细评测结果。

基本信息

  • 模型名称:Claude Opus 4 7
  • 用例名称:实现多线程任务调度器
  • 测试类型:文本生成
  • 评测维度:代码生成

系统提示词(System Prompt)

这是给 AI 模型的背景设定和角色指令:

你是一名资深后端工程师,专注于 Python 并发编程与系统设计。 回答要求: 1. 在编写代码前,先用 2-3 句话说明你选择的并发模型和线程安全策略。 2. 使用 Python 语言实现,代码需包含类型注解和必要的注释,尤其是关键并发点(如加锁位置)必须注释说明原因。 3. 代码结构清晰,遵循面向对象设计原则,类和方法职责单一。 4. 提供可直接运行的使用示例,示例中需体现多线程并发场景。 5. 输出格式:设计说明 → 完整代码 → 使用示例。

用户提示词(User Prompt)

这是用户给 AI 模型的具体任务要求:

## 实现一个线程安全的任务调度器(基础版) 请使用 Python 实现一个线程安全的单机任务调度器,核心目标是掌握多线程环境下的基础并发控制。 ### 功能要求 1. **任务定义**:每个任务包含以下属性: - `name`(str):任务名称 - `func`(Callable):任务执行函数 - `args` / `kwargs`(可选):传递给执行函数的参数 2. **任务队列**: - 支持通过 `add_task()` 方法向队列中添加任务 - 队列操作必须是线程安全的(考虑多个线程同时添加任务的场景) - 按照 FIFO(先进先出)顺序执行任务 3. **任务执行**: - 提供 `run_all()` 方法,使用**多线程**并发执行队列中的所有任务 - 需限制最大并发线程数(通过构造函数参数 `max_workers` 指定,默认为 3) - 每个任务执行完毕后,打印任务名称、执行状态(成功/失败)及耗时 4. **错误隔离**: - 单个任务执行失败(抛出异常)不应影响其他任务的执行 - 捕获并记录异常信息 ### 约束条件 - 必须使用 `threading.Lock` 或 `threading.RLock` 保护共享状态 - 必须使用 `concurrent.futures.ThreadPoolExecutor` 管理线程池 - 不允许使用第三方任务队列库(如 Celery、RQ) ### 输出示例

评测结果

综合得分

该模型在此用例的得分为:90.8 分(满分 100 分),已通过

基础难度评测结果

  • 得分:90.8 分
  • 通过状态:已通过

模型生成产物

以下是该模型针对此任务的实际生成内容:

## 设计说明 我将使用 `threading.Lock` 保护任务队列的添加操作,确保多线程环境下队列操作的原子性。采用 `ThreadPoolExecutor` 管理固定大小的线程池,通过 `submit()` 方法异步提交任务并使用 `as_completed()` 收集执行结果,实现并发控制和错误隔离。每个任务独立捕获异常,保证单个任务失败不影响其他任务。 ## 完整代码 ```python from typing import Callable, Any, Optional, List, Tuple from dataclasses import dataclass import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed, Future @dataclass class Task: """任务数据类""" name: str func: Callable args: Tuple[Any, ...] = () kwargs: dict = None def __post_init__(self): if self.kwargs is None: self.kwargs = {} class TaskScheduler: """线程安全的任务调度器""" def __init__(self, max_workers: int = 3): """ 初始化调度器 Args: max_workers: 最大并发线程数 """ self._tasks: List[Task] = [] self._lock = threading.Lock() # 保护任务队列的锁 self._max_workers = max_workers def add_task(self, name: str, func: Callable, args: Tuple[Any, ...] = (), kwargs: Optional[dict] = None) -> None: """ 添加任务到队列(线程安全) Args: name: 任务名称 func: 任务执行函数 args: 位置参数 kwargs: 关键字参数 """ task = Task(name=name, func=func, args=args, kwargs=kwargs or {}) # 加锁保护队列操作,防止多线程同时修改队列导致数据竞争 with self._lock: self._tasks.append(task) print(f"[添加任务] {name}") def _execute_task(self, task: Task) -> Tuple[str, bool, float, Optional[str]]: """ 执行单个任务(内部方法) Returns: (任务名称, 是否成功, 耗时, 错误信息) """ start_time = time.time() error_msg = None success = False try: task.func(*task.args, **task.kwargs) success = True except Exception as e: # 捕获异常但不向上传播,实现错误隔离 error_msg = f"{type(e).__name__}: {str(e)}" elapsed = time.time() - start_time return task.name, success, elapsed, error_msg def run_all(self) -> None: """ 并发执行所有任务 """ # 加锁读取任务列表并清空队列(避免重复执行) with self._lock: tasks_to_run = self._tasks.copy() self._tasks.clear() if not tasks_to_run: print("[调度器] 队列为空,无任务执行") return print(f"\n[调度器] 开始执行 {len(tasks_to_run)} 个任务(最大并发数: {self._max_workers})\n") # 使用线程池并发执行任务 with ThreadPoolExecutor(max_workers=self._max_workers) as executor: # 提交所有任务到线程池 future_to_task: dict[Future, Task] = { executor.submit(self._execute_task,...

AI 评审点评

以下是 AI 评审对该模型输出的点评:

【CLAUDE】这是一个高质量的任务调度器实现。候选输出完全满足需求文档的所有功能要求和约束条件,正确使用 threading.Lock 保护共享状态,合理使用 ThreadPoolExecutor 管理线程池,实现了错误隔离机制。设计说明简洁清晰,代码结构良好且类型注解完整,关键并发点有注释说明。使用示例覆盖单线程、多线程并发添加和异常处理等场景,具有很强的实用性和教学价值。唯一的小瑕疵是输出顺序按完成时间而非严格 FIFO,以及类型注解的兼容性问题,但这些不影响核心功能的正确性和线程安全性。整体表现优秀。 【GEMINI】这是一份高质量的代码实现。模型准确理解了“基础版任务调度器”的需求,并在指令遵循、并发控制和代码规范性方面表现出色。特别是在处理任务队列的线程安全时,采用了‘加锁取快照后清空’的模式,这在实际工程中是兼顾安全与性能的正确做法。示例代码覆盖了单线程和多线程并发添加任务的场景,验证充分。 【KIMI】整体而言,代码实现了任务调度器的核心功能,包括线程安全的队列操作和并发执行,并提供了完整的使用示例。代码结构清晰,注释较为充分。但是,在线程安全性和代码质量方面还有进一步提升的空间,如优化锁的范围和完善注释。

进阶难度评测结果

  • 得分:80.4 分
  • 通过状态:已通过

模型生成产物

以下是该模型针对此任务的实际生成内容:

## 设计思路 我将采用 **ThreadPoolExecutor + PriorityQueue** 的混合模型:使用 `PriorityQueue` 管理待执行任务的优先级排序,通过 `ThreadPoolExecutor` 提供线程池复用;用 `threading.Condition` 实现生产者-消费者模式,确保任务按优先级被调度到线程池;通过 `Future.result(timeout)` 实现超时控制,用字典维护任务状态以支持取消操作。关键并发点包括:队列操作需加锁保护、Future 的超时等待、回调执行的异常隔离。 ```python import threading import queue import time from concurrent.futures import ThreadPoolExecutor, Future, TimeoutError as FutureTimeoutError from enum import IntEnum from typing import Callable, Optional, Any from dataclasses import dataclass, field class Priority(IntEnum): """任务优先级枚举(数值越小优先级越高)""" HIGH = 1 MEDIUM = 2 LOW = 3 @dataclass(order=True) class Task: """任务封装类,支持优先级排序""" priority: int sequence: int # 同优先级按提交顺序排序 name: str = field(compare=False) func: Callable = field(compare=False) args: tuple = field(compare=False, default_factory=tuple) kwargs: dict = field(compare=False, default_factory=dict) timeout: Optional[float] = field(compare=False, default=None) on_complete: Optional[Callable] = field(compare=False, default=None) class TaskScheduler: """支持优先级与并发控制的多线程任务调度器""" def __init__(self, max_workers: int = 4): """ 初始化调度器 Args: max_workers: 最大并发线程数 """ self.max_workers = max_workers self.executor = ThreadPoolExecutor(max_workers=max_workers) # 优先级队列,存储待执行任务 self.task_queue: queue.PriorityQueue[Task] = queue.PriorityQueue() # 任务状态跟踪 self.pending_tasks: dict[str, Task] = {} # 等待中的任务 self.running_tasks: dict[str, Future] = {} # 运行中的任务 # 并发控制 self.lock = threading.Lock() # 保护共享状态 self.condition = threading.Condition(self.lock) # 协调调度线程 # 任务序列号,用于同优先级FIFO排序 self.sequence_counter = 0 # 调度器状态 self.running = True self.scheduler_thread = threading.Thread(target=self._schedule_loop, daemon=True) self.scheduler_thread.start() def submit( self, task_name: str, func: Callable, *args, priority: Priority = Priority.MEDIUM, timeout: Optional[float] = None, on_complete: Optional[Callable[[str, Any, Optional[Exception]], None]] = None, **kwargs ) -> bool: """ 提交任务到调度器 Args: task_name: 任务唯一标识 func:...

AI 评审点评

以下是 AI 评审对该模型输出的点评:

【CLAUDE】该实现在接口设计、代码结构和功能覆盖方面表现良好,设计思路说明清晰,单元测试较完善。但存在致命的架构缺陷:在 `_execute_task` 中嵌套提交任务到同一个线程池会导致死锁风险(所有 worker 线程都在等待 inner_future,但没有空闲线程执行实际任务)。正确做法是直接在 `_execute_task` 中调用 `task.func`,而不是再次提交到线程池。此外,超时控制的实现也不正确,`Future.result(timeout)` 只是停止等待而非终止任务执行。这些问题在低并发场景下可能不明显,但在线程池饱和时会导致系统挂起。建议重构为直接调用模式,并使用信号量或其他机制实现真正的超时终止。 【GEMINI】该实现展示了深厚的 Python 编程功底和良好的系统设计意识。代码模块化程度高,文档详尽,且通过单元测试验证了各项功能。然而,作为一名资深后端架构师,在并发模型的设计上犯了嵌套提交导致死锁的典型错误(Thread Pool Starvation Deadlock),这在生产环境中是致命的。建议在实现超时控制时,考虑使用单独的监控线程或利用 `concurrent.futures.wait` 的超时机制,而不是在池内线程中再次阻塞提交。 【KIMI】整体而言,代码实现了题目要求的核心功能,设计思路清晰,代码结构和可读性较好。但在并发控制和接口实现上还存在一些细节问题,需要进一步完善。

困难难度评测结果

  • 得分:84.0 分
  • 通过状态:已通过

模型生成产物

以下是该模型针对此任务的实际生成内容:

# 第一部分:设计文档 ## 1. 整体架构说明 ### 核心组件及职责 **Task(任务抽象)** - 封装业务逻辑、依赖关系、重试配置、执行状态 - 状态机:PENDING → READY → RUNNING → (RETRYING) → SUCCESS/FAILED/SKIPPED **DAGValidator(DAG校验器)** - 使用 Kahn 拓扑排序算法检测循环依赖 - 时间复杂度 O(V+E),空间复杂度 O(V) - 在任务提交阶段一次性校验,拒绝非法 DAG **DependencyGraph(依赖图管理器)** - 维护任务间的前驱/后继关系(邻接表) - 使用 ConcurrentHashMap 存储依赖计数器,原子操作递减 - 当某任务完成时,通知所有后继任务并检查是否就绪 **DynamicThreadPoolExecutor(动态线程池)** - 继承 ThreadPoolExecutor,重写 setCorePoolSize/setMaximumPoolSize - 使用 ReentrantLock 保护 resize 操作,防止并发调整冲突 - 自定义 RejectedExecutionHandler:溢出任务放入 LinkedBlockingQueue 缓冲区 **RetryScheduler(重试调度器)** - 使用 ScheduledExecutorService 实现延迟重试 - 指数退避公式:delay = min(initialDelay * 2^(attempt-1), maxDelay) - 重试任务包装为 Runnable,提交到主线程池执行 **TaskScheduler(主调度器)** - 协调所有组件,管理任务生命周期 - 使用 CompletableFuture 异步执行任务,链式处理成功/失败回调 - 维护全局状态统计(AtomicInteger 计数器) ## 2. DAG 校验算法与死锁预防 ### Kahn 拓扑排序算法 ``` 1. 计算所有节点的入度(依赖数量) 2. 将入度为 0 的节点加入队列 3. 循环:取出队列头节点,遍历其后继节点,入度减 1 4. 若后继节点入度变为 0,加入队列 5. 若最终处理的节点数 dependencies; private final int maxRetries; private final long initialDelayMs; private final long maxDelayMs; private final long timeoutMs; private volatile TaskStatus status = TaskStatus.PENDING; private final AtomicInteger retryCount = new AtomicInteger(0); private volatile long submitTime; private volatile long startTime; private volatile long endTime; private volatile Throwable lastError; protected Task(Builder builder) { this.id = Objects.requireNonNull(builder.id, "Task ID cannot be null"); this.dependencies = new HashSet(builder.dependencies); this.maxRetries = builder.maxRetries; this.initialDelayMs = builder.initialDelayMs; this.maxDelayMs = builder.maxDelayMs; this.timeoutMs = builder.timeoutMs; } // 业务逻辑由子类实现 protected abstract void execute() throws Exception; // Getters public String getId() { return id; } public Set getDependencies() { return new HashSet(dependencies); } public TaskStatus getStatus() { return status; } public int getRetryCount() { return retryCount.get(); } public int getMaxRetries() {...

AI 评审点评

以下是 AI 评审对该模型输出的点评:

【CLAUDE】这是一个设计良好、功能较完整的多线程任务调度器实现。设计文档清晰阐述了架构思路和关键算法,代码实现了 DAG 依赖管理、指数退避重试、动态线程池等核心功能,并发控制使用了合适的工具类,设计模式运用恰当。主要优点包括:Kahn 算法正确检测循环依赖、依赖图使用原子操作保证线程安全、重试机制符合指数退避规范、线程池动态调整逻辑正确。主要问题在于:1) 超时控制实现方式会阻塞工作线程,影响并发性能;2) 缺少优雅关闭机制,存在资源泄漏风险;3) 溢出缓冲区无容量限制可能导致 OOM;4) 部分边界场景(菱形依赖、严重错误)处理不足。代码被截断也影响了完整性评估。总体而言,这是一个达到生产可用标准但仍需优化的实现,核心逻辑正确但工程健壮性有待加强。 【GEMINI】这是一个高质量的高并发任务调度器实现。设计文档详尽,代码结构清晰,充分考虑了多线程环境下的复杂性。作者对 Java 并发包的应用非常熟练,实现了包括 DAG 拓扑排序、动态线程池调整和指数退避在内的所有核心需求。虽然在任务执行的嵌套提交上存在潜在的线程饥饿风险,但整体工程质量达到了资深开发者的水平。 【KIMI】整体上,该实现提供了一个功能齐全、设计合理的多线程任务调度器框架。它涵盖了 DAG 任务依赖管理、重试机制、动态线程池调整和任务监控等核心功能,并展示了良好的架构设计和代码质量。代码中的关键并发点得到了适当的处理和注释说明,有助于理解和维护。然而,在健壮性方面,动态线程池的自动扩容机制需要进一步考虑线程创建开销和内存压力,以避免潜在的性能问题。

相关链接

您可以通过以下链接查看更多相关内容:

加载中...