MAF Workflow 核心概念详解
本课概览
Microsoft Agent Framework (MAF)提供了一套强劲的Workflow(工作流)框架,用于编排和协调多个智能体(Agent)或处理组件的执行流程。
本课将以通俗易懂的方式,协助你理解 MAF Workflow 的核心概念:
|
概念 |
说明 |
类比 |
|---|---|---|
| Workflow |
工作流定义 |
流程图 |
| Executor |
执行器/处理节点 |
工人 |
| Edge |
连接边 |
➡️ 传送带 |
| SuperStep |
超步/批量处理 |
⏱️ 处理周期 |
| WorkflowContext |
工作流上下文 |
工作台 |
| WorkflowEvent |
工作流事件 |
通知 |
| Run |
运行实例 |
一次执行 |
| Checkpoint |
检查点 |
存档点 |

让我们逐一深入了解这些概念!
为什么需要 Workflow?
在构建 AI 智能体应用时,我们常常需要:
- 多步骤处理:用户请求需要经过多个处理环节
- 多智能体协作:不同的 Agent 各司其职,协同完成任务
- 条件分支:根据处理结果决定下一步走向
- 并行处理:某些任务可以同时进行以提高效率
- 状态管理:需要在处理过程中保存和恢复状态
传统方式的问题:
// ❌ 硬编码的流程,难以维护和扩展
var result1 = await agent1.ProcessAsync(input);
if (result1.Success)
{
var result2 = await agent2.ProcessAsync(result1.Data);
// ... 越来越复杂
}
使用 Workflow 的优势:
// ✅ 声明式定义,清晰可维护
var workflow = new WorkflowBuilder(startExecutor)
.AddEdge(executor1, executor2)
.AddEdge(executor2, executor3, condition: x => x.Success)
.Build;

一、核心概念:Executor(执行器)
什么是 Executor?
Executor(执行器)是 Workflow 中的最小工作单元,类似于:
|
类比 |
说明 |
|---|---|
|
工厂里的工人 |
每个工人负责一道工序 |
|
乐高积木块 |
每个积木有特定功能,组合成整体 |
|
电路中的元件 |
接收输入信号,输出处理结果 |

Executor 的核心特征
- 唯一标识(Id):每个 Executor 有一个唯一的 ID,用于在 Workflow 中引用
- 消息处理:接收特定类型的输入消息,处理后产生输出消息
- 路由配置:通过
ConfigureRoutes方法定义能处理哪些类型的消息 - 状态感知:可以通过
IWorkflowContext访问和修改工作流状态
️ Executor 的类型层次
MAF 提供了多种 Executor 类型,满足不同场景需求:
|
类型 |
用途 |
示例场景 |
|---|---|---|
Executor |
处理消息,无返回值 |
日志记录、通知发送 |
Executor |
处理消息,有返回值 |
数据转换、AI 调用 |
FunctionExecutor |
用委托函数快速创建 |
简单处理逻辑 |
StatefulExecutor |
需要维护状态的执行器 |
会话管理、计数器 |
Executor 源码解析
从源码中可以看到 Executor 的核心设计:
// 来自 Executor.cs
publicabstractclassExecutor : IIdentified
{
// 唯一标识符
publicstring Id { get; }
// 配置消息路由(子类必须实现)
protected abstract RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder);
// 执行消息处理
publicasync ValueTaskobject?> ExecuteAsync(
object message,
TypeId messageType,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 记录调用事件
await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message));
// 路由消息到正确的处理器
CallResult? result = awaitthis.Router.RouteMessageAsync(message, context, ...);
// 记录完成或失败事件
ExecutorEvent executionResult = result?.IsSuccess is not false
? new ExecutorCompletedEvent(this.Id, result?.Result)
: new ExecutorFailedEvent(this.Id, result.Exception);
await context.AddEventAsync(executionResult);
return result.Result;
}
}
关键点:
-
✅ 每个 Executor 有唯一 ID
- ✅ 通过
ConfigureRoutes声明能处理的消息类型 - ✅ 执行过程会产生事件(
ExecutorInvokedEvent、ExecutorCompletedEvent等)
二、核心概念:Edge(边)
➡️ 什么是 Edge?
Edge(边)是连接两个 Executor 的消息通道,类似于:
|
类比 |
说明 |
|---|---|
|
工厂传送带 |
把上一道工序的产品传送到下一道工序 |
|
水管 |
把水从一个容器引导到另一个容器 |
|
邮路 |
把信件从发件人送到收件人 |

Edge 的三种类型
MAF 支持三种类型的 Edge,适用于不同的流程模式:

|
类型 |
说明 |
使用场景 |
|---|---|---|
| Direct(直连) |
一对一连接 |
顺序处理流程 |
| FanOut(扇出) |
一对多连接 |
并行分发任务 |
| FanIn(扇入) |
多对一连接 |
汇聚多个结果 |
Edge 源码解析
从源码可以看到 Edge 的核心结构:
// 来自 Edge.cs
publicenum EdgeKind
{
Direct, // 直连:一对一
FanOut, // 扇出:一对多
FanIn // 扇入:多对一
}
publicsealedclassEdge
{
public EdgeKind Kind { get; init; } // 边的类型
public EdgeData Data { get; init; } // 边的具体数据
}
// 来自 DirectEdgeData.cs - 直连边的数据
publicsealedclassDirectEdgeData : EdgeData
{
publicstring SourceId { get; } // 源 Executor ID
publicstring SinkId { get; } // 目标 Executor ID
public Funcobject?, bool>? Condition; // 可选的条件判断
}
Direct Edge 示意图:

关键点:
-
✅ Edge 定义了消息从哪里来、到哪里去
- ✅ Direct Edge 支持 条件路由(只有满足条件的消息才传递)
- ✅ FanOut Edge 可以实现 广播或分区逻辑
三、核心概念:Workflow(工作流)
什么是 Workflow?
Workflow(工作流)是将多个 Executor 通过 Edge 连接起来的完整流程定义,类似于:
|
类比 |
说明 |
|---|---|
|
流程图 |
定义了从开始到结束的完整流程 |
|
生产线 |
多个工位通过传送带连接成完整生产线 |
|
乐谱 |
规定了演奏的顺序和节奏 |

Workflow 的核心属性
从源码中可以看到 Workflow 的核心结构:
// 来自 Workflow.cs
publicclassWorkflow
{
// 起始 Executor 的 ID
publicstring StartExecutorId { get; }
// 工作流名称(可选)
publicstring? Name { get; internal init; }
// 工作流描述(可选)
publicstring? Description { get; internal init; }
// Executor 绑定字典
internal Dictionarystring, ExecutorBinding> ExecutorBindings { get; init; }
// 边的集合(按源节点分组)
internal Dictionarystring, HashSet
// 输出 Executor 集合
internal HashSetstring> OutputExecutors { get; init; }
}
使用 WorkflowBuilder 构建工作流
MAF 采用 建造者模式(Builder Pattern)来构建 Workflow,这使得工作流的定义更加直观:
// 创建工作流示例
var workflow = new WorkflowBuilder(startExecutor) // 指定起点
.WithName("订单处理工作流") // 设置名称
.WithDescription("处理电商订单的完整流程") // 设置描述
.AddEdge(receiveOrder, validateOrder) // 添加边
.AddEdge(validateOrder, processPayment,
condition: x => x.IsValid) // 条件边
.AddEdge(processPayment, sendNotification)
.WithOutputFrom(sendNotification) // 指定输出节点
.Build; // 构建工作流

关键方法:
|
方法 |
说明 |
|---|---|
AddEdge(source, sink) |
添加直连边 |
AddEdge(..., condition) |
添加条件边 |
AddFanOut(source, sinks) |
添加扇出边 |
AddFanIn(sources, sink) |
添加扇入边 |
WithOutputFrom(executor) |
指定输出节点 |
BindExecutor(executor) |
绑定占位符执行器 |
Build |
构建最终的 Workflow |
四、核心概念:SuperStep(超步)
⏱️ 什么是 SuperStep?
SuperStep(超步)是 Workflow 执行的基本处理周期。可以类比为:
|
类比 |
说明 |
|---|---|
|
游戏中的”回合” |
每个回合内所有玩家同时行动 |
|
⏰ 工厂的”班次” |
每个班次内完成一批任务 |
|
海浪的”一波” |
一波消息被处理,然后产生下一波 |

SuperStep 的执行流程
每个 SuperStep 内部执行的步骤:

关键事件:
-
SuperStepStartedEvent:超步开始 -
SuperStepCompletedEvent:超步完成
// SuperStep 事件定义
public class SuperStepEvent(int stepNumber, object? data = null) : WorkflowEvent(data)
{
// 超步的序号(从 0 开始)
public int StepNumber => stepNumber;
}
五、核心概念:WorkflowContext(工作流上下文)
什么是 WorkflowContext?
WorkflowContext(工作流上下文)是 Executor 执行时的运行环境,类似于:
|
类比 |
说明 |
|---|---|
|
️ 工人的工作台 |
提供工具、材料和通信渠道 |
|
通信枢纽 |
允许各个工位之间传递信息 |
|
共享内存 |
存储和读取状态数据 |

IWorkflowContext 核心接口
// 来自 IWorkflowContext.cs
publicinterfaceIWorkflowContext
{
// 添加工作流事件(在当前 SuperStep 结束时触发)
ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default);
// 发送消息给下游 Executor(在下一个 SuperStep 处理)
ValueTask SendMessageAsync(object message, string? targetId, CancellationToken cancellationToken = default);
// 输出工作流结果
ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default);
// 请求在当前 SuperStep 结束时停止工作流
ValueTask RequestHaltAsync;
// 读取状态
ValueTask
// 读取或初始化状态
ValueTask
// 更新状态(排队更新,在 SuperStep 结束时应用)
ValueTask QueueStateUpdateAsync
}
关键点:
- ✅ 消息传递:通过
SendMessageAsync在 Executor 之间传递消息 - ✅ 状态管理:支持读取、初始化和更新状态
- ✅ 事件通知:通过
AddEventAsync发出事件 - ✅ 流程控制:通过
RequestHaltAsync停止工作流
六、核心概念:WorkflowEvent(工作流事件)
什么是 WorkflowEvent?
WorkflowEvent(工作流事件)是工作流执行过程中产生的通知消息,类似于:
|
类比 |
说明 |
|---|---|
|
广播通知 |
向所有人广播系统状态变化 |
|
日志记录 |
记录系统执行过程中的关键节点 |
|
事件订阅 |
允许外部监听并响应特定事件 |

事件分类
|
事件层级 |
事件类型 |
说明 |
|---|---|---|
| 工作流级别 | WorkflowStartedEvent |
工作流开始执行 |
WorkflowOutputEvent |
工作流产生输出 |
|
WorkflowErrorEvent |
工作流发生错误 |
|
WorkflowWarningEvent |
工作流产生警告 |
|
| 超步级别 | SuperStepStartedEvent |
超步开始 |
SuperStepCompletedEvent |
超步完成 |
|
| 执行器级别 | ExecutorInvokedEvent |
Executor 被调用 |
ExecutorCompletedEvent |
Executor 完成处理 |
|
ExecutorFailedEvent |
Executor 处理失败 |
七、核心概念:Run(运行实例)
什么是 Run?
Run(运行实例)是 Workflow 的一次具体执行,类似于:
|
类比 |
说明 |
|---|---|
|
电影的一场放映 |
同一部电影可以放映多场 |
|
生产线的一个批次 |
同一条生产线可以生产多个批次 |
|
游戏的一局 |
同一个游戏可以玩多局 |

Run 的核心特性
// 来自 Run.cs
publicsealedclassRun : IAsyncDisposable
{
// 运行实例的唯一标识符
publicstring RunId => this._runHandle.RunId;
// 获取当前执行状态
public ValueTask GetStatusAsync(CancellationToken cancellationToken = default);
// 获取所有产生的事件
public IEnumerable
// 获取自上次访问后的新事件
public IEnumerable
// 恢复执行(带外部响应)
public async ValueTaskbool> ResumeAsync(IEnumerable
}
RunStatus(运行状态)
public enum RunStatus
{
NotStarted, // 尚未开始
Idle, // 空闲(已暂停,无待处理请求)
PendingRequests, // 等待外部响应
Ended, // 已结束
Running // 正在运行
}

八、核心概念:Checkpoint(检查点)
什么是 Checkpoint?
Checkpoint(检查点)是工作流在某个时刻的完整状态快照,类似于:
|
类比 |
说明 |
|---|---|
|
游戏存档 |
保存游戏进度,随时可以读档继续 |
|
照片 |
记录某一时刻的完整状态 |
|
书签 |
标记阅读进度,下次从这里继续 |

Checkpoint 的核心信息
// 来自 CheckpointInfo.cs
publicsealedclassCheckpointInfo
{
// 运行实例的唯一标识符
publicstring RunId { get; }
// 检查点的唯一标识符
publicstring CheckpointId { get; }
}
// 检查点的完整数据(来自 Checkpoint.cs)
internalsealedclassCheckpoint
{
publicint StepNumber { get; } // 超步编号
public WorkflowInfo Workflow { get; } // 工作流信息
public RunnerStateData RunnerData { get; } // 运行器状态
public Dictionary
public Dictionary
public CheckpointInfo? Parent { get; } // 父检查点
}
Checkpoint 的使用场景
|
场景 |
说明 |
|---|---|
| 故障恢复 |
系统崩溃后从最近的检查点恢复 |
| 长时间运行 |
分段执行,每段结束保存进度 |
| 人机交互 |
等待用户输入时保存状态 |
| 调试回放 |
从任意检查点重新执行 |
| 版本分支 |
从同一个检查点创建多个分支执行 |

九、核心概念关系图
概念之间的关系
让我们把所有核心概念联系起来,看看它们是如何协作的:

生命周期视角
从工作流的完整生命周期来看:

消息流视角
从消息在工作流中的流动来看:

关键理解:
- 消息驱动:Executor 之间通过消息传递数据
- 异步批处理:同一 SuperStep 内的 Executor 可以并行执行
- 边控制流向:Edge 决定消息从哪里到哪里
- 状态隔离:每个 SuperStep 结束时应用状态更新
十、实际应用示例
场景:电商订单处理工作流
让我们通过一个实际场景来理解这些概念的应用:

概念对应关系
|
概念 |
在此场景中的体现 |
|---|---|
| Workflow |
整个订单处理流程 |
| Executor |
每个处理步骤(接收、验证、支付等) |
| Edge |
步骤之间的连接(含条件判断) |
| SuperStep |
每一轮处理(如:超步1处理接收,超步2处理验证…) |
| WorkflowContext |
提供订单状态读写、消息发送能力 |
| WorkflowEvent |
每个步骤的开始/完成/失败事件 |
| Run |
一个具体订单的处理过程 |
| Checkpoint |
处理中途保存的状态(如支付完成后保存) |
场景:多智能体协作工作流
在 AI Agent 场景中,Workflow 可以用来编排多个 Agent 的协作:

Workflow 的优势:
|
优势 |
说明 |
|---|---|
| 灵活编排 |
可以轻松调整 Agent 之间的协作关系 |
| 状态管理 |
自动管理各 Agent 的状态和上下文 |
| ⏸️ 可中断/恢复 |
支持人机交互,随时暂停和恢复 |
| 可观测性 |
通过事件追踪整个执行过程 |
| ️ 容错能力 |
通过检查点支持故障恢复 |
十一、概念总结
核心概念速查表
|
概念 |
定义 |
关键类 |
核心职责 |
|---|---|---|---|
| Executor |
执行器/处理节点 |
Executor,Executor,FunctionExecutor |
处理消息,产生输出 |
| Edge |
连接边 |
Edge,EdgeData,DirectEdgeData |
定义消息流向和条件 |
| Workflow |
工作流定义 |
Workflow,WorkflowBuilder |
组织 Executor 和 Edge |
| SuperStep |
超步/批量处理周期 |
SuperStepEvent,SuperStepStartedEvent |
批量处理消息 |
| WorkflowContext |
工作流上下文 |
IWorkflowContext |
提供运行时服务 |
| WorkflowEvent |
工作流事件 |
WorkflowEvent,ExecutorEvent |
通知执行状态 |
| Run |
运行实例 |
Run,RunStatus |
管理一次执行 |
| Checkpoint |
检查点 |
CheckpointInfo,ICheckpointStore |
保存和恢复状态 |
记忆口诀
❝
“工作流程边连接,执行器来做处理”
“超步批量往前走,上下文中读写态”
“事件通知全过程,运行实例跑一次”
“检查点来存进度,随时恢复不怕挂”