学习主题:Plan-and-Execute模式任务分解实战
任务目标:实现一个计划生成器,将复杂需求拆解为有序子任务,并验证计划可行性

一、任务概述
基于昨日学习的Plan-and-Execute模式原理,今天将动手实现该模式的核心组件——计划生成器(Task Planner)。计划生成器负责:
- 需求解析:理解用户输入的复杂需求,明确核心目标与约束条件
- 任务分解:将全局目标拆解为原子级可执行的子任务
- 依赖分析:识别任务间的先后关系,构建有向无环图(DAG)
- 执行排序:基于依赖关系生成最优任务执行顺序
- 可行性验证:检查计划是否可执行,资源是否满足
任务场景示例:
- 用户输入:”我需要研究AI Agent市场,分析技术趋势和竞争格局,生成一份详细报告”
- 计划生成器执行流程:
- 解析需求:明确目标是市场分析报告,包含趋势和竞争分析
- 任务分解:生成4个子任务(数据收集→趋势分析→竞争分析→报告生成)
- 依赖分析:趋势分析依赖数据收集,竞争分析依赖数据收集,报告生成依赖前两者
- 执行排序:数据收集(并行)→ 趋势分析 + 竞争分析(并行)→ 报告生成
- 可行性验证:检查所需工具(网络搜索、数据分析、文档生成)是否可用
二、实现目标
完成本实战任务后,你将产出:
- 一个可运行的计划生成器Python模块:
- 实现任务分解算法,支持多维度拆解
- 构建依赖图并进行拓扑排序
- 提供计划可行性验证机制
- 掌握以下核心技能:
- 复杂需求的结构化解析方法
- 任务分解的多维度策略设计
- 依赖关系建模与环检测算法
- 计划可行性的多角度验证
- 通过验收测试:
- 3道选择题测试理论理解深度
- 1个扩展编码任务验证算法实现能力
三、技术要点
3.1 任务分解算法
关键分解维度:
- 功能维度:按业务功能划分(数据收集、分析处理、结果生成)
- 数据维度:按数据流向划分(输入→处理→输出)
- 时间维度:按时间顺序划分(前期准备、中期执行、后期交付)
- 资源维度:按资源需求划分(计算密集型、IO密集型、人工交互)
算法设计原则:
def decompose_task(goal: str) -> List[Task]:
"""任务分解算法框架
Args:
goal: 目标描述文本
Returns:
任务对象列表,包含描述、工具、依赖关系
Algorithm Steps:
1. 语义分析:提取关键实体和动作
2. 维度映射:将目标映射到多个分解维度
3. 原子化:生成不可再分的原子任务
4. 关系建立:识别任务间的依赖关系
5. 优化排序:基于依赖关系和资源约束优化执行顺序
"""
pass
3.2 依赖关系建模
依赖类型识别:
# 数据依赖:任务B需要任务A的输出作为输入
data_dependency = {"source": "task_A", "target": "task_B", "type": "data"}
# 控制依赖:任务B必须等待任务A完成后才能开始
control_dependency = {"source": "task_A", "target": "task_B", "type": "control"}
# 资源依赖:任务共享有限资源,需要串行执行
resource_dependency = {"source": "task_A", "target": "task_B", "type": "resource"}
依赖图构建与环检测:
def build_dependency_graph(tasks: List[Task]) -> Dict[str, List[str]]:
"""构建任务依赖图
Args:
tasks: 任务对象列表
Returns:
邻接表表明的依赖图
Key Features:
1. 支持多种依赖类型
2. 自动检测和报告依赖环
3. 提供可视化输出选项
"""
pass
def detect_cycles(graph: Dict[str, List[str]]) -> List[List[str]]:
"""检测依赖图中的环
Args:
graph: 依赖图邻接表
Returns:
发现的环列表
Algorithm:
使用深度优先搜索(DFS)检测有向图中的环
"""
pass
3.3 计划可行性验证
验证维度:
class PlanValidator:
"""计划可行性验证器"""
def validate_tools(self, tasks: List[Task], available_tools: Set[str]) -> bool:
"""验证所需工具是否可用"""
pass
def validate_resources(self, tasks: List[Task], resource_constraints: Dict) -> bool:
"""验证资源约束是否满足"""
pass
def validate_dependencies(self, tasks: List[Task]) -> bool:
"""验证依赖关系是否合理"""
pass
def validate_time_constraints(self, tasks: List[Task], max_time: float) -> bool:
"""验证时间约束是否满足"""
pass
验证结果报告:
class ValidationReport:
"""验证结果详细报告"""
def __init__(self):
self.passed = True
self.issues = []
self.suggestions = []
def add_issue(self, severity: str, description: str, suggestion: str = None):
"""记录验证问题"""
self.passed = False
self.issues.append({"severity": severity, "description": description})
if suggestion:
self.suggestions.append(suggestion)
def generate_summary(self) -> str:
"""生成验证摘要"""
pass
四、参考代码框架
以下是一个完整的计划生成器实现框架,你需要补充关键代码部分(标记为TODO的地方):
#!/usr/bin/env python3
"""
计划生成器实战实现
基于Plan-and-Execute模式,实现复杂需求的任务分解与计划生成
"""
import os
import json
from typing import List, Dict, Set, Optional, Tuple
from collections import defaultdict, deque
import re
# ==================== 1. 数据结构定义 ====================
class Task:
"""任务对象定义"""
def __init__(self, task_id: str, description: str, tool: str = None,
dependencies: List[str] = None, estimated_time: float = 0,
priority: str = "medium"):
self.id = task_id
self.description = description
self.tool = tool # 执行任务所需的工具
self.dependencies = dependencies or [] # 依赖的任务ID列表
self.estimated_time = estimated_time # 预估执行时间(分钟)
self.priority = priority # 优先级:high/medium/low
self.status = "pending" # 任务状态:pending/running/completed/failed
def to_dict(self) -> Dict:
"""转换为字典格式"""
return {
"id": self.id,
"description": self.description,
"tool": self.tool,
"dependencies": self.dependencies,
"estimated_time": self.estimated_time,
"priority": self.priority,
"status": self.status
}
def __repr__(self) -> str:
return f"Task(id={self.id}, desc={self.description[:30]}...)"
# ==================== 2. 计划生成器核心类 ====================
class TaskPlanner:
"""计划生成器:负责任务分解与计划生成"""
def __init__(self, available_tools: Set[str] = None):
self.available_tools = available_tools or set()
self.plan_cache = {} # 计划缓存,避免重复计算
def analyze_goal(self, goal: str) -> Dict:
"""目标分析:理解用户需求,提取关键信息
Args:
goal: 用户原始目标描述
Returns:
解析后的结构化目标信息
TODO: 实现语义分析,提取关键动作、对象、约束条件
"""
# 示例:简单关键词提取
analysis_result = {
"actions": [],
"objects": [],
"constraints": [],
"domain": "unknown"
}
# 提取动作关键词
action_keywords = ["研究", "分析", "生成", "制作", "计算", "查询", "处理"]
for keyword in action_keywords:
if keyword in goal:
analysis_result["actions"].append(keyword)
# 提取对象(名词短语)
# 简单实现:使用正则匹配
noun_pattern = r'([ws]+?)(?=s研究|s分析|s生成|s制作|s计算|s查询|s处理|$)'
matches = re.findall(noun_pattern, goal)
if matches:
analysis_result["objects"].extend([m.strip() for m in matches])
# 识别领域
domain_keywords = {
"AI Agent": ["AI Agent", "智能体", "Agent"],
"市场分析": ["市场", "竞争", "趋势"],
"技术研究": ["技术", "框架", "算法"]
}
for domain, keywords in domain_keywords.items():
if any(keyword in goal for keyword in keywords):
analysis_result["domain"] = domain
break
return analysis_result
def decompose_task(self, goal: str) -> List[Task]:
"""任务分解:将复杂目标拆解为原子级子任务
Args:
goal: 解析后的目标描述
Returns:
任务对象列表
TODO: 实现完整的多维度任务分解算法
"""
# 检查缓存
cache_key = hash(goal)
if cache_key in self.plan_cache:
return self.plan_cache[cache_key]
# 分析目标
analysis = self.analyze_goal(goal)
# 根据分析结果生成任务
tasks = []
# 示例:根据领域生成不同的任务分解
if analysis["domain"] == "AI Agent":
tasks = [
Task("step1", "收集AI Agent市场数据", "web_search", [], 10, "high"),
Task("step2", "分析技术发展趋势", "data_analysis", ["step1"], 15, "high"),
Task("step3", "研究竞争格局", "competitive_analysis", ["step1"], 12, "medium"),
Task("step4", "生成市场分析报告", "report_generation", ["step2", "step3"], 20, "high")
]
elif analysis["domain"] == "市场分析":
tasks = [
Task("step1", "识别目标市场范围", "market_scoping", [], 8, "high"),
Task("step2", "收集市场规模数据", "data_collection", ["step1"], 15, "high"),
Task("step3", "分析增长驱动因素", "growth_analysis", ["step2"], 12, "medium"),
Task("step4", "预测未来趋势", "forecasting", ["step3"], 10, "medium")
]
else:
# 通用分解策略
tasks = [
Task("step1", "理解需求并明确目标", "analyze", [], 5, "high"),
Task("step2", "收集相关信息和数据", "collect", ["step1"], 10, "high"),
Task("step3", "分析和处理信息", "process", ["step2"], 15, "medium"),
Task("step4", "生成最终结果", "generate", ["step3"], 10, "high")
]
# 缓存结果
self.plan_cache[cache_key] = tasks
return tasks
def build_dependency_graph(self, tasks: List[Task]) -> Dict[str, List[str]]:
"""构建任务依赖图
Args:
tasks: 任务对象列表
Returns:
邻接表表明的依赖图,key为源任务ID,value为依赖它的目标任务ID列表
TODO: 实现依赖图构建算法,注意处理多种依赖类型
"""
graph = defaultdict(list)
for task in tasks:
for dep in task.dependencies:
graph[dep].append(task.id)
return dict(graph)
def topological_sort(self, graph: Dict[str, List[str]]) -> List[str]:
"""拓扑排序:确定任务执行顺序
Args:
graph: 依赖图邻接表
Returns:
任务ID的有序列表,满足依赖关系
TODO: 实现Kahn算法进行拓扑排序,包含环检测
"""
# 计算所有节点
all_nodes = set(graph.keys())
for deps in graph.values():
all_nodes.update(deps)
# 计算入度
in_degree = {node: 0 for node in all_nodes}
for node, deps in graph.items():
for dep in deps:
in_degree[dep] += 1
# 初始化队列:入度为0的节点
queue = deque([node for node in all_nodes if in_degree[node] == 0])
sorted_order = []
while queue:
node = queue.popleft()
sorted_order.append(node)
# 减少后继节点的入度
for neighbor in graph.get(node, []):
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# 检查是否有环
if len(sorted_order) != len(all_nodes):
print("警告:依赖图中存在环,返回部分排序结果")
return sorted_order
def validate_plan(self, tasks: List[Task]) -> Tuple[bool, List[str]]:
"""验证计划可行性
Args:
tasks: 任务对象列表
Returns:
(是否可行, 问题描述列表)
TODO: 实现多维度计划验证逻辑
"""
issues = []
# 检查工具可用性
for task in tasks:
if task.tool and task.tool not in self.available_tools:
issues.append(f"任务 {task.id} 需要工具 '{task.tool}',但该工具不可用")
# 检查依赖关系
task_ids = {task.id for task in tasks}
for task in tasks:
for dep in task.dependencies:
if dep not in task_ids:
issues.append(f"任务 {task.id} 依赖不存在的任务: {dep}")
# 检查估计时间合理性
for task in tasks:
if task.estimated_time <= 0:
issues.append(f"任务 {task.id} 的预估时间不合理: {task.estimated_time}")
return len(issues) == 0, issues
def generate_plan(self, goal: str) -> Dict:
"""生成完整计划:主入口函数
Args:
goal: 用户原始目标描述
Returns:
完整计划信息
Workflow:
1. 任务分解
2. 依赖图构建
3. 拓扑排序
4. 可行性验证
"""
print(f"开始生成计划: {goal}")
print("-" * 50)
# 1. 任务分解
tasks = self.decompose_task(goal)
print(f"分解出 {len(tasks)} 个任务")
# 2. 构建依赖图
dependency_graph = self.build_dependency_graph(tasks)
print(f"依赖图包含 {len(dependency_graph)} 个依赖关系")
# 3. 拓扑排序
execution_order = self.topological_sort(dependency_graph)
print(f"执行顺序: {execution_order}")
# 4. 可行性验证
is_valid, issues = self.validate_plan(tasks)
if not is_valid:
print("计划验证失败:")
for issue in issues:
print(f" - {issue}")
else:
print("计划验证通过")
# 构建结果
result = {
"goal": goal,
"tasks": [task.to_dict() for task in tasks],
"dependency_graph": dependency_graph,
"execution_order": execution_order,
"is_valid": is_valid,
"issues": issues
}
return result
# ==================== 3. 示例工具与运行演示 ====================
class DemoTools:
"""演示工具集"""
@staticmethod
def web_search(query: str) -> str:
"""网络搜索工具(模拟)"""
return f"搜索 '{query}' 的结果:模拟数据"
@staticmethod
def data_analysis(data_source: str) -> str:
"""数据分析工具(模拟)"""
return f"分析 '{data_source}' 的结果:趋势向上"
@staticmethod
def report_generation(content: str) -> str:
"""报告生成工具(模拟)"""
return f"生成的报告内容:{content}"
def run_demo():
"""运行演示"""
print("=" * 60)
print("计划生成器实战演示")
print("=" * 60)
# 初始化工具集
available_tools = {"web_search", "data_analysis", "report_generation"}
# 创建计划生成器
planner = TaskPlanner(available_tools)
# 示例目标
goals = [
"研究AI Agent市场,分析技术趋势和竞争格局,生成一份详细报告",
"分析电商平台的用户行为,找出转化率提升的关键因素",
"准备一份关于机器学习最新进展的技术分享PPT"
]
for i, goal in enumerate(goals, 1):
print(f"
示例 {i}: {goal}")
print("-" * 40)
# 生成计划
plan = planner.generate_plan(goal)
# 输出任务摘要
print("
任务摘要:")
for task in plan["tasks"]:
status_symbol = "✅" if task["status"] == "completed" else "⏳"
print(f" {status_symbol} {task['id']}: {task['description']}")
if plan["is_valid"]:
print("
计划可行,执行顺序:", " → ".join(plan["execution_order"]))
else:
print("
⚠️ 计划存在问题,需要调整")
print("-" * 40)
if __name__ == "__main__":
# 运行演示
try:
run_demo()
except Exception as e:
print(f"执行失败: {str(e)}")
print("提议检查代码逻辑和依赖项")
五、验收标准
5.1 理论测试(3道选择题)
请选择正确答案:
- Plan-and-Execute模式与ReAct模式的核心区别是什么?
- A. Plan-and-Execute使用更大的语言模型
- B. Plan-and-Execute采用“规划-执行”分离架构
- C. ReAct模式成本更低
- D. Plan-and-Execute只适用于简单任务
- 考察点:架构模式核心差异理解
- 任务分解时,以下哪个维度关注的是“数据如何在不同任务间流动”?
- A. 功能维度
- B. 数据维度
- C. 时间维度
- D. 资源维度
- 考察点:任务分解多维度策略掌握
- 拓扑排序在计划生成器中的主要作用是什么?
- A. 计算任务执行成本
- B. 确定满足依赖关系的任务执行顺序
- C. 评估任务优先级
- D. 分配计算资源
- 考察点:核心算法应用场景理解
5.2 实践测试(小型编码任务)
扩展任务:改善上述计划生成器,添加更智能的任务分解策略。
任务要求:
- 在TaskPlanner.analyze_goal()方法中实现更精细的语义分析,提取至少3个不同类型的实体
- 在TaskPlanner.decompose_task()方法中实现基于动作类型的分支策略(例如,“研究”类任务与“分析”类任务采用不同的分解模式)
- 在TaskPlanner.validate_plan()方法中添加资源冲突检测(假设每个任务需要特定的资源类型)
实现提示:
# 更精细的语义分析
def analyze_goal_advanced(self, goal: str) -> Dict:
"""进阶版目标分析"""
# 提取:动作类型、目标对象、约束条件、质量要求、时间框架
pass
# 基于动作类型的分支策略
def decompose_by_action_type(self, analysis: Dict) -> List[Task]:
"""根据动作类型选择分解策略"""
action_type = analysis.get("primary_action", "")
if action_type == "研究":
return self._decompose_research_task(analysis)
elif action_type == "分析":
return self._decompose_analysis_task(analysis)
# ...
# 资源冲突检测
def detect_resource_conflicts(self, tasks: List[Task]) -> List[str]:
"""检测任务间的资源冲突"""
# 假设资源类型:CPU、内存、网络、专用工具
pass
验收标准:
- analyze_goal()方法能提取3种以上实体类型
- decompose_task()方法能根据动作类型选择不同分解策略
- validate_plan()方法包含资源冲突检测逻辑
- 代码通过基本测试,能处理多样化的输入目标
六、调试提议
6.1 常见问题排查
问题1:任务分解结果不合理
# 解决方案:添加调试输出,分析语义提取结果
def decompose_task(self, goal: str):
analysis = self.analyze_goal(goal)
print(f"DEBUG - 分析结果: {analysis}") # 查看提取的实体
# 根据分析结果调整分解策略
问题2:依赖图中存在环
# 解决方案:实现环检测算法并输出详细信息
def detect_cycles_detailed(self, graph: Dict):
"""详细环检测"""
visited = set()
path = []
cycles = []
def dfs(node):
visited.add(node)
path.append(node)
for neighbor in graph.get(node, []):
if neighbor in path: # 发现环
cycle_start = path.index(neighbor)
cycles.append(path[cycle_start:])
elif neighbor not in visited:
dfs(neighbor)
path.pop()
for node in graph:
if node not in visited:
dfs(node)
return cycles
问题3:计划验证过于严格或宽松
# 解决方案:调整验证阈值,添加配置选项
class PlanValidatorConfig:
"""计划验证配置"""
def __init__(self):
self.require_tool_availability = True
self.max_task_time = 120 # 最大单任务时间(分钟)
self.min_task_time = 1 # 最小单任务时间
self.allow_missing_dependencies = False
6.2 分步调试策略
- 第一步:测试目标分析模块
- # 单独测试语义分析
planner = TaskPlanner()
analysis = planner.analyze_goal(“研究AI Agent市场趋势”)
print(f”分析结果: {analysis}”) - 第二步:测试任务分解算法
- # 测试分解算法
tasks = planner.decompose_task(“研究AI Agent市场趋势”)
for task in tasks:
print(f”任务: {task.id} – {task.description}”)
print(f” 依赖: {task.dependencies}”) - 第三步:验证完整工作流
- # 运行完整生成流程
plan = planner.generate_plan(“研究AI Agent市场趋势”)
print(f”计划可行性: {plan['is_valid']}”)
print(f”执行顺序: {plan['execution_order']}”)
6.3 性能优化提示
- 缓存优化:
- # 使用LRU缓存避免重复计算
from functools import lru_cache@lru_cache(maxsize=100)
def decompose_task_cached(self, goal: str) -> List[Task]:
pass - 并行处理:
- # 对独立任务进行并行验证
from concurrent.futures import ThreadPoolExecutordef validate_tasks_parallel(self, tasks: List[Task]):
with ThreadPoolExecutor() as executor:
results = list(executor.map(self.validate_single_task, tasks))
return all(results) - 增量更新:
- # 支持增量式计划更新,避免完全重新生成
def update_plan(self, plan: Dict, new_constraint: str):
“””基于新约束更新现有计划”””
# 只重新验证受影响的部分
pass
七、学习总结
完成本实战任务后,请检查以下学习成果:
✅ 核心技能掌握
- 理解Plan-and-Execute模式中计划生成器的核心作用
- 实现多维度任务分解算法
- 构建依赖图并进行拓扑排序
- 设计多角度计划可行性验证机制
✅ 代码质量要求
- 模块化设计,职责清晰
- 错误处理机制完善
- 可扩展性强,支持新维度和策略
- 注释完整,文档清晰
✅ 问题解决能力
- 能独立调试任务分解逻辑
- 能根据需求扩展新的验证维度
- 能优化算法性能和准确性
下一步行动
- 运行参考框架:执行提供的代码框架,理解计划生成器的工作流程
- 完成验收测试:回答选择题,实现进阶任务分解策略
- 记录学习卡片:总结今日实战的核心收获与困难
- 预习明日主题:了解反思型架构(Reflexion)的基本原理
© 版权声明
文章版权归作者所有,未经允许请勿转载。
相关文章
暂无评论...


