
1. 方案概述
以DeepResearch为例,如何才能构建一个强大、稳定且可扩展的商用应用,如何使用 库来编排一个循环、有状态的工作流。并且,这些难点要怎么解决呢
LangGraph
模糊指令下如何通过多轮追问明确需求长短记忆如何分别管理子任务失败如何回溯LLM API 限流和报错如何处理如何防护恶意指令的攻击工具调用如何监控和优化效率
2. 核心架构设计
我们将构建一个基于状态图(StateGraph)的 Agent。核心状态 在图的节点之间传递和修改。
AgentState
# 核心状态定义
from typing import List, TypedDict, Optional
class AgentState(TypedDict):
original_question: str # 用户的原始问题
clarified_question: str # 经过澄清后的明确问题
is_clarified: bool # 问题是否已明确
plan: Optional[List[str]] # LLM生成的任务规划
executed_steps: List[str] # 已执行的步骤记录
current_task_id: int # 当前正在执行的任务ID
results: dict # 每个步骤的结果
error_log: Optional[str] # 错误日志
error_count: int # 错误计数
short_term_memory: List[str] # 短期记忆(对话历史)
核心图节点(Nodes)与边(Edges):
: 入口节点。
start: 澄清指令节点,判断用户输入是否模糊。
clarify_instruction: 任务分解节点,将明确的指令分解为可执行的子任务。
decompose_task: 工具执行节点,调用各种研究工具(如搜索、网页浏览、数据分析)。
execute_tools: 反思与决策节点,根据工具执行结果判断下一步行动。
reflect_and_decide: 最终答案生成节点。
generate_final_answer: 错误处理节点。
handle_error: 出口节点。
end
流程图:
->
start –(模糊)–>
clarify_instruction
(等待用户输入)
–(清晰)–>
clarify_instruction
decompose_task
->
decompose_task
execute_tools
–(成功)–>
execute_tools
reflect_and_decide
–(失败)–>
execute_tools
handle_error
–(可重试)–>
handle_error (重新规划)
decompose_task
–(不可重试)–>
handle_error
end
–(未完成)–>
reflect_and_decide (执行下一任务)
execute_tools
–(已完成)–>
reflect_and_decide
generate_final_answer
->
generate_final_answer
end
3. 核心问题与解决方案思考
3.1 模糊指令下如何通过多轮追问明确需求
方案:
我们引入一个 节点和一个条件边。该节点使用一个专门的 LLM Chain 来判断用户问题的清晰度。如果模糊,它会生成追问问题,并等待用户输入;如果清晰,则流向任务分解。
clarify_instruction
def clarification_node(state: AgentState):
print(">> 节点: Clarification (指令澄清)")
prompt = f"""The user's query is: '{state['original_question']}'. Is this query clear enough for a research agent to execute?
If not, ask one single clarifying question. If it is clear, do not ask a question.
Respond in JSON with keys 'is_clear' (boolean) and 'question' (string, or null).
This is a vague query: '研究一下苹果'
This is a clear query: '研究一下苹果公司2024年第一季度的财报'
User query to analyze: {state['original_question']}
"""
# response = robust_llm_call([HumanMessage(content=prompt)], llm) # 真实LLM调用
response = mock_llm([HumanMessage(content=prompt)]) # 模拟LLM调用
data = json.loads(response.content)
state['is_clarified'] = data['is_clear']
if not data['is_clear']:
print(f"-> 指令模糊,需要追问: {data['question']}")
# 在真实应用中,这里会暂停并等待用户输入
# 为方便演示,我们自动提供一个清晰的回答
user_clarification = "我的意思是苹果公司"
print(f"-> (模拟)用户回应: {user_clarification}")
state['original_question'] += " " + user_clarification
state['is_clarified'] = True # 假设用户澄清后问题变清晰
state['clarified_question'] = state['original_question']
else:
print("-> 指令清晰,继续执行")
state['clarified_question'] = state['original_question']
return state
说明:
我们通过一个专门的节点 来分析用户意图。
clarify_instruction_node 函数作为条件路由,将清晰的指令导向后续任务,模糊的指令则可以暂停等待用户输入,从而实现多轮追问。
decide_after_clarification
3.2 长短记忆如何分别管理
方案:
短期记忆 (Short-Term Memory): 直接在 中管理,通常是一个消息列表(
AgentState 或
List[BaseMessage])。它记录了当前会话的所有交互,为上下文理解提供支持。长期记忆 (Long-Term Memory): 使用外部向量数据库(如 ChromaDB, FAISS, Weaviate)。每次成功完成一次复杂的 research 任务后,将“问题-最终答案-关键洞察”作为一个文档,进行 Embedding 后存入向量库。在任务开始前,可以先用用户的问题查询向量库,寻找相似的历史成功案例作为参考。
List[str]
class VectorStore:
def __init__(self):
self._storage = {}
print("--- 记忆库[长期]: 初始化成功 ---")
def add_memory(self, key: str, value: str):
print(f"--- 记忆库[长期]: 正在存储关于 '{key}' 的记忆 ---")
self._storage[key] = value
def recall_memory(self, query: str) -> Optional[str]:
# 简单模拟基于关键词的召回
for key, value in self._storage.items():
if key in query:
print(f"--- 记忆库[长期]: 从查询 '{query}' 中召回了关于 '{key}' 的记忆 ---")
return value
return None
说明:
短期记忆通过 中的
AgentState 字段在图的每次流转中维护。长期记忆则通过调用外部函数/节点与向量数据库交互,实现了知识的持久化和复用。
short_term_memory
3.3 子任务失败如何回溯
方案:
利用 LangGraph 的状态和条件边实现回溯。在 中添加
AgentState 和
error_log。当工具执行节点 (
error_count) 发生错误时:
execute_tools
用 捕获异常。将错误信息写入
try...except,并增加
error_log。通过条件边判断
error_count。如果小于最大重试次数,则路由到
error_count 节点。
handle_error 节点可以分析错误原因,并决定是重新规划(回到
handle_error 节点)还是尝试不同工具(回到
decompose_task 但传入不同参数)。如果超过次数,则终止任务。
execute_tools
def error_handler_node(state: AgentState):
print(">> 节点: Error Handler (错误处理)")
state['error_count'] += 1
error = state['error_log']
print(f"-> 错误计数: {state['error_count']}. 错误信息: {error}")
# 核心问题解决方案:回溯逻辑
if state['error_count'] > 2:
print("!! 错误次数过多,任务终止 !!")
return "end"
else:
print("-> 尝试重新规划以绕过错误")
# 清空旧计划,准备重新规划
state['plan'] = None
state['results'] = []
return "retry"
说明:
这是 LangGraph 处理失败的核心优势。通过在状态中记录错误并使用条件边,我们可以构建出复杂的、具备韧性的工作流,而不是遇到错误就立即崩溃。
3.4 LLM API 限流和报错如何处理
方案:
这通常在代码层面而不是图的结构层面解决。使用带有**指数退避重试(Exponential Backoff and Retry)**机制的装饰器或库来包装所有的 LLM 和外部 API 调用。 是一个出色的 Python 库。
tenacity
@retry(wait=wait_exponential(multiplier=1, min=2, max=6), stop=stop_after_attempt(3))
def robust_llm_call(messages: List[BaseMessage], llm, **kwargs):
"""一个包装了指数退避重试的LLM调用函数"""
return llm.invoke(messages, **kwargs)
说明:
通过 装饰器,API 的瞬时错误(如速率限制、网络波动)可以被自动、优雅地处理,大大增强了 Agent 的稳定性,而无需在 LangGraph 的业务逻辑中添加复杂的重试代码。
tenacity
3.5 恶意指令防护
方案:
建立一个“防火墙”节点 () 作为图的入口点。
guardrail_node
输入检测: 使用关键词过滤、正则表达式或更高级的 LLM 分类器(如 OpenAI Moderation API)来检测恶意输入(如:提示词注入、非法请求等)。工具使用权限控制: 确保 Agent 调用的工具权限受限。例如,执行代码的工具应在安全的 Docker 容器中运行,文件系统访问应严格限制。输出审查: 在生成最终答案前,可以增加一个审查步骤,确保输出内容不包含有害信息。
def guardrail_node(state: AgentState):
print(">> 节点: Guardrail (恶意指令防护)")
question = state['original_question']
if "ignore all previous instructions" in question.lower() or "忽略所有指令" in question:
print("!! 检测到恶意指令,任务终止 !!")
state['is_safe'] = False
else:
state['is_safe'] = True
return state
说明:
安全是商用产品的生命线。一个前置的 可以有效地将大部分恶意请求拦截在系统之外。
guardrail_node
3.6 工具调用监控和效率优化
方案:
监控 (Monitoring): 使用 。它是 LangChain 的配套可观测性平台,可以自动追踪 LangGraph 的每一次运行,可视化展示每个节点的输入输出、耗时、Token 消耗和工具调用详情。这对于调试、分析性能瓶颈和审计至关重要。效率优化 (Optimization):
LangSmith
缓存 (Caching): 对确定性强且重复调用频繁的工具(如数据库查询、简单的 API 调用)启用缓存。LangChain 内置了多种缓存后端(内存、SQLite、Redis)。工具并行化 (Parallel Execution): 对于互不依赖的子任务,可以考虑并行执行。LangGraph 的 功能(通过
Dynamic Graph 的
add_node 参数)可以支持动态地并行执行节点。
Managed
说明:
商业化运营离不开强大的监控。LangSmith 是与 LangGraph 集成最紧密的工具,只需设置几个环境变量即可开启。缓存则是成本和性能优化的关键手段。
4. 小结一下
我们尝试利用 LangGraph 的核心优势,系统性地解决商用 Agent 在指令理解、记忆管理、错误处理、API 稳定性、安全性及可观测性等方面的关键挑战。通过精心设计的状态图和模块化的代码实现,我们可以构建一个既强大又可靠的 DeepResearch Agent。



