Qwen: Qwen3.5-9B 在「实现多线程任务调度器」的评测结果
这是该 AI 模型在此测试用例上的详细评测结果。
基本信息
- 模型名称:Qwen: Qwen3.5-9B
- 用例名称:实现多线程任务调度器
- 测试类型:文本生成
- 评测维度:代码生成
系统提示词(System Prompt)
这是给 AI 模型的背景设定和角色指令:
你是一名资深后端工程师,专注于 Python 并发编程与系统设计。 回答要求: 1. 在编写代码前,先用 2-3 句话说明你选择的并发模型和线程安全策略。 2. 使用 Python 语言实现,代码需包含类型注解和必要的注释,尤其是关键并发点(如加锁位置)必须注释说明原因。 3. 代码结构清晰,遵循面向对象设计原则,类和方法职责单一。 4. 提供可直接运行的使用示例,示例中需体现多线程并发场景。 5. 输出格式:设计说明 → 完整代码 → 使用示例。
用户提示词(User Prompt)
这是用户给 AI 模型的具体任务要求:
## 实现一个线程安全的任务调度器(基础版) 请使用 Python 实现一个线程安全的单机任务调度器,核心目标是掌握多线程环境下的基础并发控制。 ### 功能要求 1. **任务定义**:每个任务包含以下属性: - `name`(str):任务名称 - `func`(Callable):任务执行函数 - `args` / `kwargs`(可选):传递给执行函数的参数 2. **任务队列**: - 支持通过 `add_task()` 方法向队列中添加任务 - 队列操作必须是线程安全的(考虑多个线程同时添加任务的场景) - 按照 FIFO(先进先出)顺序执行任务 3. **任务执行**: - 提供 `run_all()` 方法,使用**多线程**并发执行队列中的所有任务 - 需限制最大并发线程数(通过构造函数参数 `max_workers` 指定,默认为 3) - 每个任务执行完毕后,打印任务名称、执行状态(成功/失败)及耗时 4. **错误隔离**: - 单个任务执行失败(抛出异常)不应影响其他任务的执行 - 捕获并记录异常信息 ### 约束条件 - 必须使用 `threading.Lock` 或 `threading.RLock` 保护共享状态 - 必须使用 `concurrent.futures.ThreadPoolExecutor` 管理线程池 - 不允许使用第三方任务队列库(如 Celery、RQ) ### 输出示例
评测结果
综合得分
该模型在此用例的得分为:76.3 分(满分 100 分),已通过
基础难度评测结果
- 得分:76.3 分
- 通过状态:已通过
模型生成产物
以下是该模型针对此任务的实际生成内容:
### 设计说明 本调度器采用 `ThreadPoolExecutor` 统一创建和管理线程生命周期,利用 `threading.Lock` 对内部共享的任务列表 `_task_queue` 进行显式加锁保护,确保多线程调用 `add_task` 时的原子性与线程安全。在任务执行层面,通过 `run_all` 方法先获取任务快照再进行并发提交,并在每个执行函数内部包裹 `try-except` 块以隔离异常,保证单个任务失败不会阻塞或中断其他任务的运行。 ### 完整代码 ```python import threading import time from typing import Callable, Optional, Dict, Any, List from concurrent.futures import ThreadPoolExecutor, Future # 定义任务类型 Task = Dict[str, Any] class Task: """ 任务模型类,包含任务名称、执行函数及参数。 作为纯数据结构,不包含并发逻辑,确保职责单一。 """ def __init__(self, name: str, func: Callable, args: tuple = (), kwargs: Optional[Dict[str, Any]] = None): self.name = name self.func = func self.args = args self.kwargs = kwargs or {} class TaskScheduler: """ 线程安全的任务调度器。 负责任务存储、并发执行及错误隔离。 """ def __init__(self, max_workers: int = 3): """ :param max_workers: 线程池最大并发数,默认 3 """ self._max_workers = max_workers # 使用 Lock 保护任务队列 _task_queue 的读写安全 # 原因:多线程环境下,多个线程同时 append 到列表可能引发数据竞争或状态不一致 self._lock = threading.Lock() self._task_queue: List[Task] = [] def add_task(self, name: str, func: Callable, args: tuple = (), kwargs: Optional[Dict[str, Any]] = None) -> None: """ 向调度器添加任务。 :param name: 任务名称 :param func: 任务执行函数 :param args: 位置参数 :param kwargs: 关键字参数 """ task = Task(name=name, func=func, args=args, kwargs=kwargs) # 关键并发点:使用 with 语句自动获取锁,确保队列操作原子性 with self._lock: self._task_queue.append(task) def run_all(self) -> None: """ 并发执行队列中的所有任务。 1. 先加锁复制任务列表,防止执行期间添加新任务导致竞态条件。 2. 利用 ThreadPoolExecutor 管理线程并发。 3. 内部执行函数捕获异常,实现错误隔离。 """ # 获取当前任务快照,确保在 executor 提交期间队列状态稳定 with self._lock: tasks_snapshot = list(self._task_queue) self._task_queue.clear() # 执行后清空队列,避免重复执行 # 使用 ThreadPoolExecutor 管理线程池 with ThreadPoolExecutor(max_workers=self._max_workers) as executor: futures = [] # 按照 FIFO 顺序提交任务(提交顺序与快照顺序一致) # 注意:并发环境下任务完成顺序不保证严格 FIFO,但提交顺序固定 for task in tasks_snapshot: # 提交任务到线程池 future = executor.submit(self._execute_task_wrapper, task) futures.append(future) # 等待所有任务完成(可选,通常 run_all 会阻塞直到完成) for future in futures: future.result() @staticmethod...
AI 评审点评
以下是 AI 评审对该模型输出的点评:
【CLAUDE】该实现基本完成了任务调度器的核心需求,ThreadPoolExecutor 和 Lock 的使用方向正确,错误隔离机制有效。但存在一个严重代码错误(Task 类型别名被同名 class 覆盖),以及若干细节问题(无意义的 sleep、示例输出不准确、代码冗余),整体达到及格线以上但质量有待提升。 【GEMINI】该模型出色地完成了任务调度器的设计与实现。代码不仅满足了所有功能性要求,而且在并发安全性和代码可读性方面表现优异。模型对 `threading.Lock` 的使用非常规范,错误隔离机制实现得当,是一个高质量的工程实现示例。 【KIMI】核心功能基本实现,但存在关键缺陷:1) 任务定义中同时使用了 `Task` 类型别名(`Dict[str, Any]`)和 `Task` 类,导致类型冲突;2) `run_all` 方法中 `future.result()` 会阻塞等待每个任务完成,虽然保证了执行完成,但异常处理已在 `_execute_task_wrapper` 中完成,`future.result()` 仍会抛出异常导致主线程异常(虽然示例中未触发,但设计有隐患);3) 输出示例中任务名称为「T1-T4」但实际打印的是「Task 1」等,与代码逻辑不符;4) FIFO 执行顺序在并发场景下无法保证,但题目仅要求「按照 FIFO 顺序执行任务」理解为提交顺序即可,此点可接受。
进阶难度评测结果
- 得分:64.0 分
- 通过状态:已通过
模型生成产物
以下是该模型针对此任务的实际生成内容:
### 设计思路 本调度器采用**优先级队列(PriorityQueue)** 结合 **固定线程池模型** 实现任务调度。具体设计如下: 1. **并发模型**:使用 `threading.Thread` 管理 `max_workers` 个工作线程,每个空闲线程阻塞在 `queue.PriorityQueue` 上等待任务,确保并发数严格受控。 2. **优先级调度**:利用 `heapq` 实现的 `queue.PriorityQueue` 存储任务,通过 `(优先级负值, 时间戳, 任务对象)` 元组保证高优先级优先,同优先级按提交顺序 FIFO。 3. **超时与取消**:使用 `concurrent.futures.Future` 包装任务执行逻辑,配合 `wait(timeout)` 实现超时控制;维护全局 `CancelledTasks` 集合,工作线程获取任务后检查集合状态,实现取消等待中任务。 ```python import threading import queue import heapq import time import concurrent.futures from dataclasses import dataclass, field from typing import Callable, Any, Optional, Tuple from collections import defaultdict import traceback # ========================================== # 1. 任务定义与优先级常量 # ========================================== class Task: """ 任务对象,包含执行逻辑、参数、超时控制及回调。 优先级常量:数字越小优先级越高(3 高,2 中,1 低), 但在 PriorityQueue 中需存入负值以实现小值优先。 """ PRIORITY_HIGH = 3 PRIORITY_MEDIUM = 2 PRIORITY_LOW = 1 def __init__(self, name: str, func: Callable, args: tuple = (), kwargs: dict = None, priority: int = PRIORITY_MEDIUM, timeout: Optional[float] = None, callback: Optional[Callable] = None): self.name = name self.func = func self.args = args self.kwargs = kwargs or {} self.priority = priority self.timeout = timeout self.callback = callback self.timestamp = time.time() # 用于同优先级 FIFO self.result: Any = None self.error: Optional[Exception] = None self.is_done = False self.is_cancelled = False def __repr__(self): return f"Task(name={self.name}, pri={self.priority})" def __hash__(self): return hash(self.name) def __eq__(self, other): return isinstance(other, Task) and self.name == other.name # ========================================== 2. 优先级任务队列 # ========================================== class PriorityQueue: """ 线程安全的优先级队列。 存储元组:(优先级, 时间戳, 任务对象)。 优先级数值越小越优先,因此存储 -priority。 """ def __init__(self): self._queue = queue.PriorityQueue() self._lock = threading.Lock() def put(self, task: Task): # 将优先级取反,确保高优先级(3)在堆顶 priority = -task.priority self._queue.put((priority,...
AI 评审点评
以下是 AI 评审对该模型输出的点评:
【GEMINI】该实现虽然在接口定义和设计思路上符合要求,但代码质量存在严重问题。最核心的问题是工作线程池从未启动,导致整个调度器处于不可用状态。此外,代码中存在多处拼写错误、缺少导入以及重复调用回调的逻辑错误。并发控制机制也因在工作线程内再次开启新线程而失效。建议在实现并发系统时,务必确保线程生命周期的管理逻辑完整,并进行基本的运行测试。 【KIMI】代码整体实现了任务调度器的核心功能,设计思路清晰,代码结构良好。但在任务取消和并发安全方面存在一些实现细节问题,需要进一步完善。
困难难度评测结果
- 得分:42.0 分
- 通过状态:未通过
模型生成产物
以下是该模型针对此任务的实际生成内容:
# 高性能 DAG 任务调度器框架设计与实现 ## 第一部分:设计文档 ### 1. 整体架构说明 本调度器采用分层架构设计,各核心组件职责如下: | 组件 | 职责 | 技术选型 | |------|------|----------| | `DagValidator` | DAG合法性校验、拓扑排序、环检测 | Kahn算法 + ConcurrentHashMap | | `TaskManager` | 任务状态管理、依赖关系维护、就绪队列调度 | TaskState + BlockingQueue | | `TaskExecutor` | 任务执行逻辑、重试控制、中断处理 | CompletableFuture + 指数退避 | | `ThreadPoolManager` | 动态线程池管理、扩容缩容、拒绝策略 | ThreadPoolExecutor 包装器 | | `Scheduler` | 顶层协调器、生命周期监听、统计信息聚合 | 组合模式 | ### 2. DAG校验算法选择及死锁预防策略 **拓扑排序校验:** - 采用 **Kahn算法**(基于入度)进行拓扑排序 - 使用 `ConcurrentHashMap` 存储任务依赖关系 - 在任务提交时进行锁保护校验,确保线程安全 **死锁预防策略:** 1. 使用 `ReentrantLock` 公平锁而非 `synchronized` 2. 避免嵌套锁,采用单锁保护DAG结构 3. 设置锁超时机制(60秒)防止死锁 4. 使用单例模式管理DAG状态,减少锁竞争 ### 3. 线程池动态调整实现思路 **实现思路:** - 使用 `ThreadPoolExecutor` 包装器封装核心线程数和最大线程数 - 所有调整操作通过 `ReentrantLock` 保护 - 队列深度监控使用 `AtomicInteger` 实现无锁计数 - 队列积压阈值触发自动扩容 **潜在风险及应对:** | 风险 | 应对措施 | |------|----------| | 调整期间任务丢失 | 使用溢出缓冲区 + 告警日志 | | 线程创建失败 | 捕获异常并记录详细错误 | | 调整导致性能抖动 | 设置最小阈值限制缩容 | | 死锁风险 | 使用公平锁,设置锁超时 | ### 4. 关键设计模式说明 | 模式 | 应用场景 | 原因 | |------|----------|------| | **策略模式** | 重试策略、拒绝策略 | 方便扩展不同策略 | | **观察者模式** | 生命周期监听器 | 解耦事件通知 | | **工厂模式** | 任务创建 | 统一管理任务实例 | | **责任链模式** | 依赖处理 | 简化依赖执行流程 | | **模板方法模式** | 任务执行框架 | 统一执行流程 | ### 5. 核心数据结构选型理由 | 数据结构 | 选型理由 | |----------|----------| | `ConcurrentHashMap` | 高并发场景下线程安全 | | `BlockingQueue` | 支持阻塞等待,适合队列积压监控 | | `AtomicInteger` | 无锁计数,性能优异 | | `ReentrantLock` | 支持公平锁和锁超时 | | `CompletableFuture` | 异步任务执行和组合 | --- ## 第二部分:核心代码实现 ```java package com.example.scheduler.core; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.function.*; /** * 任务状态枚举 * 任务可能处于以下状态 */ public enum TaskStatus { PENDING("PENDING", 0), // 等待调度 RUNNING("RUNNING", 1), // 正在执行 SUCCESS("SUCCESS", 2), // 执行成功 FAILED("FAILED", 3), // 执行失败 SKIPPED("SKIPPED", 4), // 被跳过(依赖失败) RETRYING("RETRYING", 5), // 重试中 CANCELLED("CANCELLED", 6); // 已取消 private final String code; private final int priority; TaskStatus(String code, int priority) {...
AI 评审点评
以下是 AI 评审对该模型输出的点评:
【CLAUDE】该回答在设计文档部分展示了一定的架构思维,涵盖了Kahn算法、动态线程池、观察者模式等合理设计方向,表格形式的设计说明较为清晰。然而核心代码实现存在大量致命问题:多处编译错误(final字段赋值、接口实例化、受检异常处理缺失)导致代码无法运行;最核心的功能——当前置依赖完成后自动触发下游任务就绪——完全缺失或逻辑反向;TaskDefinition中的executor Supplier从未被实际调用,任务执行是空实现;资源泄漏问题(每次重试new ScheduledExecutorService)严重;多个public class置于同一文件违反Java基础规范。整体而言,代码更像是架构草图而非可运行的实现,与题目要求的「完整可运行的核心代码」差距显著,属于不及格水平。 【GEMINI】该候选方案在设计文档层面表现良好,展示了架构师的思维深度。但在代码实现阶段,存在严重的逻辑缺失(如任务调度流未打通)和明显的语法错误(如 `BlockingQueue` 实例化缺失泛型),导致代码无法直接运行。此外,对于并发场景下的状态一致性处理不够严谨,未能完全满足高性能调度器的生产级要求。 【KIMI】该实现存在严重的功能缺陷和代码质量问题,核心DAG调度逻辑完全错误,大量编译错误导致代码不可运行。虽然文档部分对架构有一定描述,但实现与文档严重不符。指数退避、动态线程池、依赖触发等核心功能均未正确实现,资源泄漏和中断处理错误明显。建议重新审视DAG的方向性、完善任务状态机转换、修复编译错误,并确保核心功能可验证运行。
相关链接
您可以通过以下链接查看更多相关内容: