AI Agent开发技术: 实现Plan-and-Execute模式 (langgraph)

内容分享2小时前发布
0 0 0
全能 AI 聚合平台 免费

一站式接入主流 AI 大模型,支持对话 · 生图 · 生视频,即开即用

ChatGPT Claude Gemini Grok DeepSeek 通义千问 Ollama
AI对话 AI生图 AI视频
免费使用 →

学习主题:Plan-and-Execute模式任务分解实战

任务目标:实现一个计划生成器,将复杂需求拆解为有序子任务,并验证计划可行性

AI Agent开发技术: 实现Plan-and-Execute模式 (langgraph)


一、任务概述

基于昨日学习的Plan-and-Execute模式原理,今天将动手实现该模式的核心组件——计划生成器(Task Planner)。计划生成器负责:

  1. 需求解析:理解用户输入的复杂需求,明确核心目标与约束条件
  2. 任务分解:将全局目标拆解为原子级可执行的子任务
  3. 依赖分析:识别任务间的先后关系,构建有向无环图(DAG)
  4. 执行排序:基于依赖关系生成最优任务执行顺序
  5. 可行性验证:检查计划是否可执行,资源是否满足

任务场景示例

  • 用户输入:”我需要研究AI Agent市场,分析技术趋势和竞争格局,生成一份详细报告”
  • 计划生成器执行流程:
  • 解析需求:明确目标是市场分析报告,包含趋势和竞争分析
  • 任务分解:生成4个子任务(数据收集→趋势分析→竞争分析→报告生成)
  • 依赖分析:趋势分析依赖数据收集,竞争分析依赖数据收集,报告生成依赖前两者
  • 执行排序:数据收集(并行)→ 趋势分析 + 竞争分析(并行)→ 报告生成
  • 可行性验证:检查所需工具(网络搜索、数据分析、文档生成)是否可用

二、实现目标

完成本实战任务后,你将产出:

  1. 一个可运行的计划生成器Python模块
  2. 实现任务分解算法,支持多维度拆解
  3. 构建依赖图并进行拓扑排序
  4. 提供计划可行性验证机制
  5. 掌握以下核心技能
  6. 复杂需求的结构化解析方法
  7. 任务分解的多维度策略设计
  8. 依赖关系建模与环检测算法
  9. 计划可行性的多角度验证
  10. 通过验收测试
  11. 3道选择题测试理论理解深度
  12. 1个扩展编码任务验证算法实现能力

三、技术要点

3.1 任务分解算法

关键分解维度:

  • 功能维度:按业务功能划分(数据收集、分析处理、结果生成)
  • 数据维度:按数据流向划分(输入→处理→输出)
  • 时间维度:按时间顺序划分(前期准备、中期执行、后期交付)
  • 资源维度:按资源需求划分(计算密集型、IO密集型、人工交互)

算法设计原则:

 def decompose_task(goal: str) -> List[Task]:
     """任务分解算法框架
     
     Args:
         goal: 目标描述文本
         
     Returns:
         任务对象列表,包含描述、工具、依赖关系
         
     Algorithm Steps:
         1. 语义分析:提取关键实体和动作
         2. 维度映射:将目标映射到多个分解维度
         3. 原子化:生成不可再分的原子任务
         4. 关系建立:识别任务间的依赖关系
         5. 优化排序:基于依赖关系和资源约束优化执行顺序
     """
     pass

3.2 依赖关系建模

依赖类型识别:

 # 数据依赖:任务B需要任务A的输出作为输入
 data_dependency = {"source": "task_A", "target": "task_B", "type": "data"}
 
 # 控制依赖:任务B必须等待任务A完成后才能开始
 control_dependency = {"source": "task_A", "target": "task_B", "type": "control"}
 
 # 资源依赖:任务共享有限资源,需要串行执行
 resource_dependency = {"source": "task_A", "target": "task_B", "type": "resource"}

依赖图构建与环检测:

 def build_dependency_graph(tasks: List[Task]) -> Dict[str, List[str]]:
     """构建任务依赖图
     
     Args:
         tasks: 任务对象列表
         
     Returns:
         邻接表表明的依赖图
         
     Key Features:
         1. 支持多种依赖类型
         2. 自动检测和报告依赖环
         3. 提供可视化输出选项
     """
     pass
 
 def detect_cycles(graph: Dict[str, List[str]]) -> List[List[str]]:
     """检测依赖图中的环
     
     Args:
         graph: 依赖图邻接表
         
     Returns:
         发现的环列表
         
     Algorithm:
         使用深度优先搜索(DFS)检测有向图中的环
     """
     pass

3.3 计划可行性验证

验证维度:

 class PlanValidator:
     """计划可行性验证器"""
     
     def validate_tools(self, tasks: List[Task], available_tools: Set[str]) -> bool:
         """验证所需工具是否可用"""
         pass
     
     def validate_resources(self, tasks: List[Task], resource_constraints: Dict) -> bool:
         """验证资源约束是否满足"""
         pass
     
     def validate_dependencies(self, tasks: List[Task]) -> bool:
         """验证依赖关系是否合理"""
         pass
     
     def validate_time_constraints(self, tasks: List[Task], max_time: float) -> bool:
         """验证时间约束是否满足"""
         pass

验证结果报告:

 class ValidationReport:
     """验证结果详细报告"""
     
     def __init__(self):
         self.passed = True
         self.issues = []
         self.suggestions = []
     
     def add_issue(self, severity: str, description: str, suggestion: str = None):
         """记录验证问题"""
         self.passed = False
         self.issues.append({"severity": severity, "description": description})
         if suggestion:
             self.suggestions.append(suggestion)
     
     def generate_summary(self) -> str:
         """生成验证摘要"""
         pass

四、参考代码框架

以下是一个完整的计划生成器实现框架,你需要补充关键代码部分(标记为TODO的地方):

 #!/usr/bin/env python3
 """
 计划生成器实战实现
 基于Plan-and-Execute模式,实现复杂需求的任务分解与计划生成
 """
 
 import os
 import json
 from typing import List, Dict, Set, Optional, Tuple
 from collections import defaultdict, deque
 import re
 
 # ==================== 1. 数据结构定义 ====================
 class Task:
     """任务对象定义"""
     
     def __init__(self, task_id: str, description: str, tool: str = None,
                  dependencies: List[str] = None, estimated_time: float = 0,
                  priority: str = "medium"):
         self.id = task_id
         self.description = description
         self.tool = tool  # 执行任务所需的工具
         self.dependencies = dependencies or []  # 依赖的任务ID列表
         self.estimated_time = estimated_time  # 预估执行时间(分钟)
         self.priority = priority  # 优先级:high/medium/low
         self.status = "pending"  # 任务状态:pending/running/completed/failed
         
     def to_dict(self) -> Dict:
         """转换为字典格式"""
         return {
             "id": self.id,
             "description": self.description,
             "tool": self.tool,
             "dependencies": self.dependencies,
             "estimated_time": self.estimated_time,
             "priority": self.priority,
             "status": self.status
         }
     
     def __repr__(self) -> str:
         return f"Task(id={self.id}, desc={self.description[:30]}...)"
 
 # ==================== 2. 计划生成器核心类 ====================
 class TaskPlanner:
     """计划生成器:负责任务分解与计划生成"""
     
     def __init__(self, available_tools: Set[str] = None):
         self.available_tools = available_tools or set()
         self.plan_cache = {}  # 计划缓存,避免重复计算
     
     def analyze_goal(self, goal: str) -> Dict:
         """目标分析:理解用户需求,提取关键信息
         
         Args:
             goal: 用户原始目标描述
             
         Returns:
             解析后的结构化目标信息
             
         TODO: 实现语义分析,提取关键动作、对象、约束条件
         """
         # 示例:简单关键词提取
         analysis_result = {
             "actions": [],
             "objects": [],
             "constraints": [],
             "domain": "unknown"
         }
         
         # 提取动作关键词
         action_keywords = ["研究", "分析", "生成", "制作", "计算", "查询", "处理"]
         for keyword in action_keywords:
             if keyword in goal:
                 analysis_result["actions"].append(keyword)
         
         # 提取对象(名词短语)
         # 简单实现:使用正则匹配
         noun_pattern = r'([ws]+?)(?=s研究|s分析|s生成|s制作|s计算|s查询|s处理|$)'
         matches = re.findall(noun_pattern, goal)
         if matches:
             analysis_result["objects"].extend([m.strip() for m in matches])
         
         # 识别领域
         domain_keywords = {
             "AI Agent": ["AI Agent", "智能体", "Agent"],
             "市场分析": ["市场", "竞争", "趋势"],
             "技术研究": ["技术", "框架", "算法"]
         }
         
         for domain, keywords in domain_keywords.items():
             if any(keyword in goal for keyword in keywords):
                 analysis_result["domain"] = domain
                 break
         
         return analysis_result
     
     def decompose_task(self, goal: str) -> List[Task]:
         """任务分解:将复杂目标拆解为原子级子任务
         
         Args:
             goal: 解析后的目标描述
             
         Returns:
             任务对象列表
             
         TODO: 实现完整的多维度任务分解算法
         """
         # 检查缓存
         cache_key = hash(goal)
         if cache_key in self.plan_cache:
             return self.plan_cache[cache_key]
         
         # 分析目标
         analysis = self.analyze_goal(goal)
         
         # 根据分析结果生成任务
         tasks = []
         
         # 示例:根据领域生成不同的任务分解
         if analysis["domain"] == "AI Agent":
             tasks = [
                 Task("step1", "收集AI Agent市场数据", "web_search", [], 10, "high"),
                 Task("step2", "分析技术发展趋势", "data_analysis", ["step1"], 15, "high"),
                 Task("step3", "研究竞争格局", "competitive_analysis", ["step1"], 12, "medium"),
                 Task("step4", "生成市场分析报告", "report_generation", ["step2", "step3"], 20, "high")
             ]
         elif analysis["domain"] == "市场分析":
             tasks = [
                 Task("step1", "识别目标市场范围", "market_scoping", [], 8, "high"),
                 Task("step2", "收集市场规模数据", "data_collection", ["step1"], 15, "high"),
                 Task("step3", "分析增长驱动因素", "growth_analysis", ["step2"], 12, "medium"),
                 Task("step4", "预测未来趋势", "forecasting", ["step3"], 10, "medium")
             ]
         else:
             # 通用分解策略
             tasks = [
                 Task("step1", "理解需求并明确目标", "analyze", [], 5, "high"),
                 Task("step2", "收集相关信息和数据", "collect", ["step1"], 10, "high"),
                 Task("step3", "分析和处理信息", "process", ["step2"], 15, "medium"),
                 Task("step4", "生成最终结果", "generate", ["step3"], 10, "high")
             ]
         
         # 缓存结果
         self.plan_cache[cache_key] = tasks
         
         return tasks
     
     def build_dependency_graph(self, tasks: List[Task]) -> Dict[str, List[str]]:
         """构建任务依赖图
         
         Args:
             tasks: 任务对象列表
             
         Returns:
             邻接表表明的依赖图,key为源任务ID,value为依赖它的目标任务ID列表
             
         TODO: 实现依赖图构建算法,注意处理多种依赖类型
         """
         graph = defaultdict(list)
         
         for task in tasks:
             for dep in task.dependencies:
                 graph[dep].append(task.id)
         
         return dict(graph)
     
     def topological_sort(self, graph: Dict[str, List[str]]) -> List[str]:
         """拓扑排序:确定任务执行顺序
         
         Args:
             graph: 依赖图邻接表
             
         Returns:
             任务ID的有序列表,满足依赖关系
             
         TODO: 实现Kahn算法进行拓扑排序,包含环检测
         """
         # 计算所有节点
         all_nodes = set(graph.keys())
         for deps in graph.values():
             all_nodes.update(deps)
         
         # 计算入度
         in_degree = {node: 0 for node in all_nodes}
         for node, deps in graph.items():
             for dep in deps:
                 in_degree[dep] += 1
         
         # 初始化队列:入度为0的节点
         queue = deque([node for node in all_nodes if in_degree[node] == 0])
         sorted_order = []
         
         while queue:
             node = queue.popleft()
             sorted_order.append(node)
             
             # 减少后继节点的入度
             for neighbor in graph.get(node, []):
                 in_degree[neighbor] -= 1
                 if in_degree[neighbor] == 0:
                     queue.append(neighbor)
         
         # 检查是否有环
         if len(sorted_order) != len(all_nodes):
             print("警告:依赖图中存在环,返回部分排序结果")
         
         return sorted_order
     
     def validate_plan(self, tasks: List[Task]) -> Tuple[bool, List[str]]:
         """验证计划可行性
         
         Args:
             tasks: 任务对象列表
             
         Returns:
             (是否可行, 问题描述列表)
             
         TODO: 实现多维度计划验证逻辑
         """
         issues = []
         
         # 检查工具可用性
         for task in tasks:
             if task.tool and task.tool not in self.available_tools:
                 issues.append(f"任务 {task.id} 需要工具 '{task.tool}',但该工具不可用")
         
         # 检查依赖关系
         task_ids = {task.id for task in tasks}
         for task in tasks:
             for dep in task.dependencies:
                 if dep not in task_ids:
                     issues.append(f"任务 {task.id} 依赖不存在的任务: {dep}")
         
         # 检查估计时间合理性
         for task in tasks:
             if task.estimated_time <= 0:
                 issues.append(f"任务 {task.id} 的预估时间不合理: {task.estimated_time}")
         
         return len(issues) == 0, issues
     
     def generate_plan(self, goal: str) -> Dict:
         """生成完整计划:主入口函数
         
         Args:
             goal: 用户原始目标描述
             
         Returns:
             完整计划信息
             
         Workflow:
             1. 任务分解
             2. 依赖图构建
             3. 拓扑排序
             4. 可行性验证
         """
         print(f"开始生成计划: {goal}")
         print("-" * 50)
         
         # 1. 任务分解
         tasks = self.decompose_task(goal)
         print(f"分解出 {len(tasks)} 个任务")
         
         # 2. 构建依赖图
         dependency_graph = self.build_dependency_graph(tasks)
         print(f"依赖图包含 {len(dependency_graph)} 个依赖关系")
         
         # 3. 拓扑排序
         execution_order = self.topological_sort(dependency_graph)
         print(f"执行顺序: {execution_order}")
         
         # 4. 可行性验证
         is_valid, issues = self.validate_plan(tasks)
         
         if not is_valid:
             print("计划验证失败:")
             for issue in issues:
                 print(f"  - {issue}")
         else:
             print("计划验证通过")
         
         # 构建结果
         result = {
             "goal": goal,
             "tasks": [task.to_dict() for task in tasks],
             "dependency_graph": dependency_graph,
             "execution_order": execution_order,
             "is_valid": is_valid,
             "issues": issues
         }
         
         return result
 
 # ==================== 3. 示例工具与运行演示 ====================
 class DemoTools:
     """演示工具集"""
     
     @staticmethod
     def web_search(query: str) -> str:
         """网络搜索工具(模拟)"""
         return f"搜索 '{query}' 的结果:模拟数据"
     
     @staticmethod
     def data_analysis(data_source: str) -> str:
         """数据分析工具(模拟)"""
         return f"分析 '{data_source}' 的结果:趋势向上"
     
     @staticmethod
     def report_generation(content: str) -> str:
         """报告生成工具(模拟)"""
         return f"生成的报告内容:{content}"
 
 def run_demo():
     """运行演示"""
     print("=" * 60)
     print("计划生成器实战演示")
     print("=" * 60)
     
     # 初始化工具集
     available_tools = {"web_search", "data_analysis", "report_generation"}
     
     # 创建计划生成器
     planner = TaskPlanner(available_tools)
     
     # 示例目标
     goals = [
         "研究AI Agent市场,分析技术趋势和竞争格局,生成一份详细报告",
         "分析电商平台的用户行为,找出转化率提升的关键因素",
         "准备一份关于机器学习最新进展的技术分享PPT"
     ]
     
     for i, goal in enumerate(goals, 1):
         print(f"
 示例 {i}: {goal}")
         print("-" * 40)
         
         # 生成计划
         plan = planner.generate_plan(goal)
         
         # 输出任务摘要
         print("
任务摘要:")
         for task in plan["tasks"]:
             status_symbol = "✅" if task["status"] == "completed" else "⏳"
             print(f"  {status_symbol} {task['id']}: {task['description']}")
         
         if plan["is_valid"]:
             print("
 计划可行,执行顺序:", " → ".join(plan["execution_order"]))
         else:
             print("
⚠️ 计划存在问题,需要调整")
         
         print("-" * 40)
 
 if __name__ == "__main__":
     # 运行演示
     try:
         run_demo()
     except Exception as e:
         print(f"执行失败: {str(e)}")
         print("提议检查代码逻辑和依赖项")

五、验收标准

5.1 理论测试(3道选择题)

请选择正确答案

  1. Plan-and-Execute模式与ReAct模式的核心区别是什么?
  2. A. Plan-and-Execute使用更大的语言模型
  3. B. Plan-and-Execute采用“规划-执行”分离架构
  4. C. ReAct模式成本更低
  5. D. Plan-and-Execute只适用于简单任务
  6. 考察点:架构模式核心差异理解
  7. 任务分解时,以下哪个维度关注的是“数据如何在不同任务间流动”?
  8. A. 功能维度
  9. B. 数据维度
  10. C. 时间维度
  11. D. 资源维度
  12. 考察点:任务分解多维度策略掌握
  13. 拓扑排序在计划生成器中的主要作用是什么?
  14. A. 计算任务执行成本
  15. B. 确定满足依赖关系的任务执行顺序
  16. C. 评估任务优先级
  17. D. 分配计算资源
  18. 考察点:核心算法应用场景理解

5.2 实践测试(小型编码任务)

扩展任务:改善上述计划生成器,添加更智能的任务分解策略。

任务要求

  1. 在TaskPlanner.analyze_goal()方法中实现更精细的语义分析,提取至少3个不同类型的实体
  2. 在TaskPlanner.decompose_task()方法中实现基于动作类型的分支策略(例如,“研究”类任务与“分析”类任务采用不同的分解模式)
  3. 在TaskPlanner.validate_plan()方法中添加资源冲突检测(假设每个任务需要特定的资源类型)

实现提示

 # 更精细的语义分析
 def analyze_goal_advanced(self, goal: str) -> Dict:
     """进阶版目标分析"""
     # 提取:动作类型、目标对象、约束条件、质量要求、时间框架
     pass
 
 # 基于动作类型的分支策略
 def decompose_by_action_type(self, analysis: Dict) -> List[Task]:
     """根据动作类型选择分解策略"""
     action_type = analysis.get("primary_action", "")
     if action_type == "研究":
         return self._decompose_research_task(analysis)
     elif action_type == "分析":
         return self._decompose_analysis_task(analysis)
     # ...
 
 # 资源冲突检测
 def detect_resource_conflicts(self, tasks: List[Task]) -> List[str]:
     """检测任务间的资源冲突"""
     # 假设资源类型:CPU、内存、网络、专用工具
     pass

验收标准

  • analyze_goal()方法能提取3种以上实体类型
  • decompose_task()方法能根据动作类型选择不同分解策略
  • validate_plan()方法包含资源冲突检测逻辑
  • 代码通过基本测试,能处理多样化的输入目标

六、调试提议

6.1 常见问题排查

问题1:任务分解结果不合理

 # 解决方案:添加调试输出,分析语义提取结果
 def decompose_task(self, goal: str):
     analysis = self.analyze_goal(goal)
     print(f"DEBUG - 分析结果: {analysis}")  # 查看提取的实体
     # 根据分析结果调整分解策略

问题2:依赖图中存在环

 # 解决方案:实现环检测算法并输出详细信息
 def detect_cycles_detailed(self, graph: Dict):
     """详细环检测"""
     visited = set()
     path = []
     cycles = []
     
     def dfs(node):
         visited.add(node)
         path.append(node)
         for neighbor in graph.get(node, []):
             if neighbor in path:  # 发现环
                 cycle_start = path.index(neighbor)
                 cycles.append(path[cycle_start:])
             elif neighbor not in visited:
                 dfs(neighbor)
         path.pop()
     
     for node in graph:
         if node not in visited:
             dfs(node)
     
     return cycles

问题3:计划验证过于严格或宽松

 # 解决方案:调整验证阈值,添加配置选项
 class PlanValidatorConfig:
     """计划验证配置"""
     def __init__(self):
         self.require_tool_availability = True
         self.max_task_time = 120  # 最大单任务时间(分钟)
         self.min_task_time = 1    # 最小单任务时间
         self.allow_missing_dependencies = False

6.2 分步调试策略

  1. 第一步:测试目标分析模块
  2. # 单独测试语义分析
    planner = TaskPlanner()
    analysis = planner.analyze_goal(“研究AI Agent市场趋势”)
    print(f”分析结果: {analysis}”)
  3. 第二步:测试任务分解算法
  4. # 测试分解算法
    tasks = planner.decompose_task(“研究AI Agent市场趋势”)
    for task in tasks:
    print(f”任务: {task.id} – {task.description}”)
    print(f” 依赖: {task.dependencies}”)
  5. 第三步:验证完整工作流
  6. # 运行完整生成流程
    plan = planner.generate_plan(“研究AI Agent市场趋势”)
    print(f”计划可行性: {plan['is_valid']}”)
    print(f”执行顺序: {plan['execution_order']}”)

6.3 性能优化提示

  1. 缓存优化
  2. # 使用LRU缓存避免重复计算
    from functools import lru_cache

    @lru_cache(maxsize=100)
    def decompose_task_cached(self, goal: str) -> List[Task]:
    pass

  3. 并行处理
  4. # 对独立任务进行并行验证
    from concurrent.futures import ThreadPoolExecutor

    def validate_tasks_parallel(self, tasks: List[Task]):
    with ThreadPoolExecutor() as executor:
    results = list(executor.map(self.validate_single_task, tasks))
    return all(results)

  5. 增量更新
  6. # 支持增量式计划更新,避免完全重新生成
    def update_plan(self, plan: Dict, new_constraint: str):
    “””基于新约束更新现有计划”””
    # 只重新验证受影响的部分
    pass

七、学习总结

完成本实战任务后,请检查以下学习成果:

✅ 核心技能掌握

  • 理解Plan-and-Execute模式中计划生成器的核心作用
  • 实现多维度任务分解算法
  • 构建依赖图并进行拓扑排序
  • 设计多角度计划可行性验证机制

✅ 代码质量要求

  • 模块化设计,职责清晰
  • 错误处理机制完善
  • 可扩展性强,支持新维度和策略
  • 注释完整,文档清晰

✅ 问题解决能力

  • 能独立调试任务分解逻辑
  • 能根据需求扩展新的验证维度
  • 能优化算法性能和准确性


下一步行动

  1. 运行参考框架:执行提供的代码框架,理解计划生成器的工作流程
  2. 完成验收测试:回答选择题,实现进阶任务分解策略
  3. 记录学习卡片:总结今日实战的核心收获与困难
  4. 预习明日主题:了解反思型架构(Reflexion)的基本原理
© 版权声明

相关文章

暂无评论

none
暂无评论...