DC娱乐网

NET+AI | Workflow | 一文理清工作流核心概念(1)

MAF Workflow 核心概念详解 本课概览Microsoft Agent Framework (MAF)提供了一套

MAF Workflow 核心概念详解

本课概览

Microsoft Agent Framework (MAF)提供了一套强大的Workflow(工作流)框架,用于编排和协调多个智能体(Agent)或处理组件的执行流程。

本课将以通俗易懂的方式,帮助你理解 MAF Workflow 的核心概念:

概念

说明

类比

Workflow

工作流定义

流程图

Executor

执行器/处理节点

工人

Edge

连接边

➡️ 传送带

SuperStep

超步/批量处理

⏱️ 处理周期

WorkflowContext

工作流上下文

工作台

WorkflowEvent

工作流事件

通知

Run

运行实例

一次执行

Checkpoint

检查点

存档点

让我们逐一深入了解这些概念!

为什么需要 Workflow?

在构建 AI 智能体应用时,我们经常需要:

多步骤处理:用户请求需要经过多个处理环节 多智能体协作:不同的 Agent 各司其职,协同完成任务 条件分支:根据处理结果决定下一步走向 并行处理:某些任务可以同时进行以提高效率 状态管理:需要在处理过程中保存和恢复状态

传统方式的问题:

// ❌ 硬编码的流程,难以维护和扩展var result1 = await agent1.ProcessAsync(input);if (result1.Success){var result2 = await agent2.ProcessAsync(result1.Data);// ... 越来越复杂}

使用 Workflow 的优势:

// ✅ 声明式定义,清晰可维护var workflow = new WorkflowBuilder(startExecutor).AddEdge(executor1, executor2).AddEdge(executor2, executor3, condition: x => x.Success).Build;

一、核心概念:Executor(执行器) 什么是 Executor?

Executor(执行器)是 Workflow 中的最小工作单元,类似于:

类比

说明

‍ 工厂里的工人

每个工人负责一道工序

乐高积木块

每个积木有特定功能,组合成整体

电路中的元件

接收输入信号,输出处理结果

Executor 的核心特征 唯一标识(Id):每个 Executor 有一个唯一的 ID,用于在 Workflow 中引用 消息处理:接收特定类型的输入消息,处理后产生输出消息 路由配置:通过ConfigureRoutes方法定义能处理哪些类型的消息 状态感知:可以通过IWorkflowContext访问和修改工作流状态

️ Executor 的类型层次

MAF 提供了多种 Executor 类型,满足不同场景需求:

类型

用途

示例场景

Executor

处理消息,无返回值

日志记录、通知发送

Executor

处理消息,有返回值

数据转换、AI 调用

FunctionExecutor

用委托函数快速创建

简单处理逻辑

StatefulExecutor

需要维护状态的执行器

会话管理、计数器

Executor 源码解析

从源码中可以看到 Executor 的核心设计:

// 来自 Executor.cspublicabstractclassExecutor : IIdentified{// 唯一标识符publicstring Id { get; }// 配置消息路由(子类必须实现)protected abstract RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder);// 执行消息处理publicasync ValueTaskobject?> ExecuteAsync(object message,TypeId messageType,IWorkflowContext context,CancellationToken cancellationToken = default){// 记录调用事件await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message));// 路由消息到正确的处理器CallResult? result = awaitthis.Router.RouteMessageAsync(message, context, ...);// 记录完成或失败事件ExecutorEvent executionResult = result?.IsSuccess is not false? new ExecutorCompletedEvent(this.Id, result?.Result): new ExecutorFailedEvent(this.Id, result.Exception);await context.AddEventAsync(executionResult);return result.Result;}}

关键点:

✅ 每个 Executor 有唯一 ID

✅ 通过 ConfigureRoutes声明能处理的消息类型 ✅ 执行过程会产生事件( ExecutorInvokedEvent、ExecutorCompletedEvent等)

二、核心概念:Edge(边)➡️ 什么是 Edge?

Edge(边)是连接两个 Executor 的消息通道,类似于:

类比

说明

工厂传送带

把上一道工序的产品传送到下一道工序

水管

把水从一个容器引导到另一个容器

邮路

把信件从发件人送到收件人

Edge 的三种类型

MAF 支持三种类型的 Edge,适用于不同的流程模式:

类型

说明

使用场景

Direct(直连)

一对一连接

顺序处理流程

FanOut(扇出)

一对多连接

并行分发任务

FanIn(扇入)

多对一连接

汇聚多个结果

Edge 源码解析

从源码可以看到 Edge 的核心结构:

// 来自 Edge.cspublicenum EdgeKind{Direct, // 直连:一对一FanOut, // 扇出:一对多FanIn // 扇入:多对一}publicsealedclassEdge{public EdgeKind Kind { get; init; } // 边的类型public EdgeData Data { get; init; } // 边的具体数据}// 来自 DirectEdgeData.cs - 直连边的数据publicsealedclassDirectEdgeData : EdgeData{publicstring SourceId { get; } // 源 Executor IDpublicstring SinkId { get; } // 目标 Executor IDpublic Funcobject?, bool>? Condition; // 可选的条件判断}

Direct Edge 示意图:

关键点:

✅ Edge 定义了消息从哪里来、到哪里去

✅ Direct Edge 支持 条件路由(只有满足条件的消息才传递) ✅ FanOut Edge 可以实现 广播或分区逻辑

三、核心概念:Workflow(工作流) 什么是 Workflow?

Workflow(工作流)是将多个 Executor 通过 Edge 连接起来的完整流程定义,类似于:

类比

说明

流程图

定义了从开始到结束的完整流程

生产线

多个工位通过传送带连接成完整生产线

乐谱

规定了演奏的顺序和节奏

Workflow 的核心属性

从源码中可以看到 Workflow 的核心结构:

// 来自 Workflow.cspublicclassWorkflow{// 起始 Executor 的 IDpublicstring StartExecutorId { get; }// 工作流名称(可选)publicstring? Name { get; internal init; }// 工作流描述(可选)publicstring? Description { get; internal init; }// Executor 绑定字典internal Dictionarystring, ExecutorBinding> ExecutorBindings { get; init; }// 边的集合(按源节点分组)internal Dictionarystring, HashSet// 输出 Executor 集合internal HashSetstring> OutputExecutors { get; init; }}

使用 WorkflowBuilder 构建工作流

MAF 采用 建造者模式(Builder Pattern)来构建 Workflow,这使得工作流的定义更加直观:

// 创建工作流示例var workflow = new WorkflowBuilder(startExecutor) // 指定起点.WithName("订单处理工作流") // 设置名称.WithDescription("处理电商订单的完整流程") // 设置描述.AddEdge(receiveOrder, validateOrder) // 添加边.AddEdge(validateOrder, processPayment,condition: x => x.IsValid) // 条件边.AddEdge(processPayment, sendNotification).WithOutputFrom(sendNotification) // 指定输出节点.Build; // 构建工作流

关键方法:

方法

说明

AddEdge(source, sink)

添加直连边

AddEdge(..., condition)

添加条件边

AddFanOut(source, sinks)

添加扇出边

AddFanIn(sources, sink)

添加扇入边

WithOutputFrom(executor)

指定输出节点

BindExecutor(executor)

绑定占位符执行器

Build

构建最终的 Workflow

四、核心概念:SuperStep(超步)⏱️ 什么是 SuperStep?

SuperStep(超步)是 Workflow 执行的基本处理周期。可以类比为:

类比

说明

游戏中的"回合"

每个回合内所有玩家同时行动

⏰ 工厂的"班次"

每个班次内完成一批任务

海浪的"一波"

一波消息被处理,然后产生下一波

SuperStep 的执行流程

每个 SuperStep 内部执行的步骤:

关键事件:

SuperStepStartedEvent:超步开始 SuperStepCompletedEvent:超步完成// SuperStep 事件定义public SuperStepEvent(int stepNumber, object? data = null) : WorkflowEvent(data){// 超步的序号(从 0 开始)public int StepNumber => stepNumber;}

五、核心概念:WorkflowContext(工作流上下文) 什么是 WorkflowContext?

WorkflowContext(工作流上下文)是 Executor 执行时的运行环境,类似于:

类比

说明

️ 工人的工作台

提供工具、材料和通信渠道

通信枢纽

允许各个工位之间传递信息

共享内存

存储和读取状态数据

IWorkflowContext 核心接口// 来自 IWorkflowContext.cspublicinterfaceIWorkflowContext{// 添加工作流事件(在当前 SuperStep 结束时触发)ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default);// 发送消息给下游 Executor(在下一个 SuperStep 处理)ValueTask SendMessageAsync(object message, string? targetId, CancellationToken cancellationToken = default);// 输出工作流结果ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default);// 请求在当前 SuperStep 结束时停止工作流ValueTask RequestHaltAsync;// 读取状态ValueTask// 读取或初始化状态ValueTask// 更新状态(排队更新,在 SuperStep 结束时应用)ValueTask QueueStateUpdateAsync}

关键点:

✅ 消息传递:通过SendMessageAsync在 Executor 之间传递消息 ✅ 状态管理:支持读取、初始化和更新状态 ✅ 事件通知:通过AddEventAsync发出事件 ✅ 流程控制:通过RequestHaltAsync停止工作流

六、核心概念:WorkflowEvent(工作流事件) 什么是 WorkflowEvent?

WorkflowEvent(工作流事件)是工作流执行过程中产生的通知消息,类似于:

类比

说明

广播通知

向所有人广播系统状态变化

日志记录

记录系统执行过程中的关键节点

事件订阅

允许外部监听并响应特定事件

事件分类

事件层级

事件类型

说明

工作流级别WorkflowStartedEvent

工作流开始执行

WorkflowOutputEvent

工作流产生输出

WorkflowErrorEvent

工作流发生错误

WorkflowWarningEvent

工作流产生警告

超步级别SuperStepStartedEvent

超步开始

SuperStepCompletedEvent

超步完成

执行器级别ExecutorInvokedEvent

Executor 被调用

ExecutorCompletedEvent

Executor 完成处理

ExecutorFailedEvent

Executor 处理失败

七、核心概念:Run(运行实例) 什么是 Run?

Run(运行实例)是 Workflow 的一次具体执行,类似于:

类比

说明

电影的一场放映

同一部电影可以放映多场

生产线的一个批次

同一条生产线可以生产多个批次

游戏的一局

同一个游戏可以玩多局

Run 的核心特性// 来自 Run.cspublicsealedclassRun : IAsyncDisposable{// 运行实例的唯一标识符publicstring RunId => this._runHandle.RunId;// 获取当前执行状态public ValueTask GetStatusAsync(CancellationToken cancellationToken = default);// 获取所有产生的事件public IEnumerable// 获取自上次访问后的新事件public IEnumerable// 恢复执行(带外部响应)public async ValueTaskbool> ResumeAsync(IEnumerable}

RunStatus(运行状态)public enum RunStatus{NotStarted, // 尚未开始Idle, // 空闲(已暂停,无待处理请求)PendingRequests, // 等待外部响应Ended, // 已结束Running // 正在运行}

八、核心概念:Checkpoint(检查点) 什么是 Checkpoint?

Checkpoint(检查点)是工作流在某个时刻的完整状态快照,类似于:

类比

说明

游戏存档

保存游戏进度,随时可以读档继续

照片

记录某一时刻的完整状态

书签

标记阅读进度,下次从这里继续

Checkpoint 的核心信息// 来自 CheckpointInfo.cspublicsealedclassCheckpointInfo{// 运行实例的唯一标识符publicstring RunId { get; }// 检查点的唯一标识符publicstring CheckpointId { get; }}// 检查点的完整数据(来自 Checkpoint.cs)internalsealedclassCheckpoint{publicint StepNumber { get; } // 超步编号public WorkflowInfo Workflow { get; } // 工作流信息public RunnerStateData RunnerData { get; } // 运行器状态public Dictionarypublic Dictionarypublic CheckpointInfo? Parent { get; } // 父检查点}

Checkpoint 的使用场景

场景

说明

故障恢复

系统崩溃后从最近的检查点恢复

长时间运行

分段执行,每段结束保存进度

人机交互

等待用户输入时保存状态

调试回放

从任意检查点重新执行

版本分支

从同一个检查点创建多个分支执行

九、核心概念关系图 概念之间的关系

让我们把所有核心概念联系起来,看看它们是如何协作的:

生命周期视角

从工作流的完整生命周期来看:

消息流视角

从消息在工作流中的流动来看:

关键理解:

消息驱动:Executor 之间通过消息传递数据 异步批处理:同一 SuperStep 内的 Executor 可以并行执行 边控制流向:Edge 决定消息从哪里到哪里 状态隔离:每个 SuperStep 结束时应用状态更新

十、实际应用示例 场景:电商订单处理工作流

让我们通过一个实际场景来理解这些概念的应用:

概念对应关系

概念

在此场景中的体现

Workflow

整个订单处理流程

Executor

每个处理步骤(接收、验证、支付等)

Edge

步骤之间的连接(含条件判断)

SuperStep

每一轮处理(如:超步1处理接收,超步2处理验证...)

WorkflowContext

提供订单状态读写、消息发送能力

WorkflowEvent

每个步骤的开始/完成/失败事件

Run

一个具体订单的处理过程

Checkpoint

处理中途保存的状态(如支付完成后保存)

场景:多智能体协作工作流

在 AI Agent 场景中,Workflow 可以用来编排多个 Agent 的协作:

Workflow 的优势:

优势

说明

灵活编排

可以轻松调整 Agent 之间的协作关系

状态管理

自动管理各 Agent 的状态和上下文

⏸️ 可中断/恢复

支持人机交互,随时暂停和恢复

可观测性

通过事件追踪整个执行过程

️ 容错能力

通过检查点支持故障恢复

十一、概念总结 核心概念速查表

概念

定义

关键类

核心职责

Executor

执行器/处理节点

Executor,Executor,FunctionExecutor

处理消息,产生输出

Edge

连接边

Edge,EdgeData,DirectEdgeData

定义消息流向和条件

Workflow

工作流定义

Workflow,WorkflowBuilder

组织 Executor 和 Edge

SuperStep

超步/批量处理周期

SuperStepEvent,SuperStepStartedEvent

批量处理消息

WorkflowContext

工作流上下文

IWorkflowContext

提供运行时服务

WorkflowEvent

工作流事件

WorkflowEvent,ExecutorEvent

通知执行状态

Run

运行实例

Run,RunStatus

管理一次执行

Checkpoint

检查点

CheckpointInfo,ICheckpointStore

保存和恢复状态

记忆口诀 ❝

"工作流程边连接,执行器来做处理"

"超步批量往前走,上下文中读写态"

"事件通知全过程,运行实例跑一次"

"检查点来存进度,随时恢复不怕挂"