节点 #

目录 #

  1. 简介
  2. Node 结构体详解
  3. 节点注册与添加
  4. 节点执行机制
  5. 并行执行环境下的节点行为
  6. 节点级别的重试机制
  7. 错误处理与中断信号
  8. 节点与检查点的交互
  9. 实际应用示例
  10. 最佳实践与设计原则

简介 #

在 langgraphgo 中,节点(Node)是构建工作流的基本执行单元。每个节点代表一个独立的功能模块,负责处理特定的业务逻辑。节点通过函数的形式承载业务逻辑,接收上下文和状态作为输入,返回更新后的状态和错误信息。

节点的设计遵循函数式编程的原则,具有以下核心特征:

Node 结构体详解 #

Node 结构体是 langgraphgo 中节点的核心数据结构,定义了节点的基本属性和行为。

classDiagram
class Node {
+string Name
+func(ctx Context, state interface) (interface, error) Function
+GetName() string
+GetFunction() Function
+Execute(ctx Context, state interface) (interface, error)
}
class MessageGraph {
+map[string]Node nodes
+AddNode(name string, fn Function)
+Invoke(ctx Context, state interface) (interface, error)
}
class StateGraph {
+map[string]Node nodes
+*RetryPolicy retryPolicy
+AddNode(name string, fn Function)
+executeNodeWithRetry(ctx Context, node Node, state interface) (interface, error)
}
MessageGraph --> Node : "包含"
StateGraph --> Node : "包含"
StateGraph --> RetryPolicy : "使用"

图表来源

核心字段说明 #

Name 字段 #

Function 字段 #

节来源

节点注册与添加 #

langgraphgo 提供了两种主要的图类型来管理节点:MessageGraph 和 StateGraph。它们都提供了添加节点的方法,但各自有不同的特性和用途。

MessageGraph.AddNode 方法 #

MessageGraph 是最基础的图类型,适用于简单的消息传递模式。

sequenceDiagram
participant Client as "客户端代码"
participant MG as "MessageGraph"
participant Node as "Node实例"
Client->>MG : AddNode(name, function)
MG->>Node : 创建Node结构体
Node->>Node : 设置Name字段
Node->>Node : 设置Function字段
MG->>MG : 将Node存储到nodes映射中
MG-->>Client : 添加完成

图表来源

StateGraph.AddNode 方法 #

StateGraph 提供了更丰富的功能,包括重试机制和状态管理。

sequenceDiagram
participant Client as "客户端代码"
participant SG as "StateGraph"
participant Node as "Node实例"
Client->>SG : AddNode(name, function)
SG->>Node : 创建Node结构体
Node->>Node : 设置Name字段
Node->>Node : 设置Function字段
SG->>SG : 将Node存储到nodes映射中
SG-->>Client : 添加完成

图表来源

添加节点的具体实践 #

在 examples/basic_example/main.go 中展示了节点添加的实际应用:

节来源

节点执行机制 #

节点的执行是通过图的编译和运行阶段完成的。在执行过程中,节点按照预定义的顺序和规则进行调用。

flowchart TD
Start([开始执行]) --> CheckEntry{检查入口节点}
CheckEntry --> |有效| GetNodes[获取当前节点列表]
CheckEntry --> |无效| Error[返回错误]
GetNodes --> FilterActive[过滤活跃节点]
FilterActive --> ParallelExec[并行执行节点]
ParallelExec --> ExecuteNode[执行单个节点]
ExecuteNode --> CheckResult{检查执行结果}
CheckResult --> |成功| ProcessResult[处理结果]
CheckResult --> |失败| HandleError[处理错误]
ProcessResult --> CheckCommand{检查是否为Command}
CheckCommand --> |是| ProcessCommand[处理Command]
CheckCommand --> |否| MergeState[合并状态]
ProcessCommand --> DetermineNext[确定下一节点]
MergeState --> DetermineNext
DetermineNext --> CheckInterrupt{检查中断}
CheckInterrupt --> |有中断| Interrupt[触发中断]
CheckInterrupt --> |无中断| CheckMore{还有节点?}
CheckMore --> |是| GetNodes
CheckMore --> |否| Complete[执行完成]
HandleError --> Error
Interrupt --> Error
Complete --> End([结束])

图表来源

执行流程详解 #

  1. 入口验证: 确保图的入口节点已设置
  2. 节点过滤: 移除标记为 END 的节点
  3. 并行执行: 使用 goroutine 并行执行所有活跃节点
  4. 结果处理: 处理节点返回的结果和命令
  5. 状态合并: 根据图的策略合并多个节点的状态
  6. 路由决策: 基于静态边或条件边确定下一组节点

节来源

并行执行环境下的节点行为 #

langgraphgo 支持节点的并行执行,这是提高工作流性能的关键特性。

sequenceDiagram
participant Main as "主执行线程"
participant WG as "WaitGroup"
participant Node1 as "节点1"
participant Node2 as "节点2"
participant Node3 as "节点3"
Main->>WG : Add(3)
Main->>Node1 : 启动goroutine
Main->>Node2 : 启动goroutine
Main->>Node3 : 启动goroutine
par 并行执行
Node1->>Node1 : 执行业务逻辑
Node1->>WG : Done()
and
Node2->>Node2 : 执行业务逻辑
Node2->>WG : Done()
and
Node3->>Node3 : 执行业务逻辑
Node3->>WG : Done()
end
Main->>WG : Wait()
WG-->>Main : 所有节点完成
Main->>Main : 处理并行结果

图表来源

并行执行的特点 #

  1. 独立性: 每个节点在独立的 goroutine 中执行
  2. 隔离性: 节点间不共享状态,避免竞态条件
  3. 容错性: 单个节点的失败不会影响其他节点
  4. 性能: 充分利用多核 CPU 资源

注意事项 #

节来源

节点级别的重试机制 #

StateGraph 提供了内置的重试机制,可以在节点执行失败时自动重试。

flowchart TD
Start([开始执行节点]) --> Execute[执行节点函数]
Execute --> CheckResult{检查执行结果}
CheckResult --> |成功| Success[返回结果]
CheckResult --> |失败| CheckRetry{检查重试配置}
CheckRetry --> |无重试配置| Fail[返回错误]
CheckRetry --> |有重试配置| CheckRetryable{检查是否可重试}
CheckRetryable --> |不可重试| Fail
CheckRetryable --> |可重试| CheckAttempts{检查重试次数}
CheckAttempts --> |达到上限| Fail
CheckAttempts --> |未达到上限| WaitDelay[等待退避延迟]
WaitDelay --> Execute
Success --> End([结束])
Fail --> End

图表来源

RetryPolicy 配置 #

RetryPolicy 定义了重试行为的各个方面:

字段 类型 默认值 说明
MaxRetries int 1 最大重试次数
BackoffStrategy BackoffStrategy FixedBackoff 退避策略
RetryableErrors []string nil 可重试的错误模式

退避策略 #

  1. FixedBackoff: 固定时间间隔
  2. ExponentialBackoff: 指数退避(1s, 2s, 4s, 8s…)
  3. LinearBackoff: 线性退避(1s, 2s, 3s, 4s…)

节来源

错误处理与中断信号 #

langgraphgo 提供了完善的错误处理和中断机制,确保工作流的可靠性和可控性。

错误类型 #

classDiagram
class Error {
<<abstract>>
}
class NodeInterrupt {
+string Node
+interface Value
+Error() string
}
class GraphInterrupt {
+string Node
+interface State
+[]string NextNodes
+interface InterruptValue
+Error() string
}
class StandardError {
+string Message
+Error() string
}
Error <|-- NodeInterrupt
Error <|-- GraphInterrupt
Error <|-- StandardError

图表来源

中断机制 #

NodeInterrupt #

GraphInterrupt #

中断处理流程 #

flowchart TD
NodeExec[节点执行] --> CheckInterrupt{检查中断条件}
CheckInterrupt --> |节点中断| CreateNodeInt[创建NodeInterrupt]
CheckInterrupt --> |图中断Before| CreateGraphIntBefore[创建GraphInterrupt Before]
CheckInterrupt --> |图中断After| CreateGraphIntAfter[创建GraphInterrupt After]
CheckInterrupt --> |无中断| ContinueExec[继续执行]
CreateNodeInt --> PropagateInt[传播中断]
CreateGraphIntBefore --> PropagateInt
CreateGraphIntAfter --> PropagateInt
PropagateInt --> HandleInt[处理中断]
HandleInt --> Resume[恢复执行]
ContinueExec --> Success[执行成功]
Resume --> Success

图表来源

节来源

节点与检查点的交互 #

检查点机制为节点提供了状态持久化的支持,确保在故障恢复时能够从正确的状态继续执行。

检查点生命周期 #

stateDiagram-v2
[*] --> Created : 节点开始执行
Created --> Executing : 开始执行
Executing --> CheckpointSaved : 自动保存检查点
Executing --> ManualCheckpoint : 手动保存检查点
CheckpointSaved --> Success : 执行成功
ManualCheckpoint --> Success
Executing --> Failed : 执行失败
Failed --> [*] : 错误处理
Success --> [*] : 正常完成

检查点配置 #

配置项 类型 说明
Store CheckpointStore 存储后端(内存、PostgreSQL、SQLite、Redis)
AutoSave bool 是否自动保存检查点
SaveInterval time.Duration 自动保存间隔
MaxCheckpoints int 最大检查点数量

检查点与节点的交互 #

  1. 自动保存: 在节点执行完成后自动保存检查点
  2. 手动保存: 节点可以主动请求保存检查点
  3. 恢复执行: 从检查点恢复时重新执行节点
  4. 状态查询: 获取特定检查点的状态信息

节来源

实际应用示例 #

基础节点示例 #

在 basic_example 中展示了最基本的节点定义和使用:

节来源

带重试的节点 #

使用 StateGraph 的重试机制:

sequenceDiagram
participant Client as "客户端"
participant SG as "StateGraph"
participant Node as "重试节点"
participant Policy as "RetryPolicy"
Client->>SG : 添加带重试的节点
SG->>Node : 包装重试逻辑
Client->>SG : 执行节点
SG->>Node : 第一次执行
Node->>Node : 执行业务逻辑
Node-->>SG : 执行失败
SG->>Policy : 检查重试条件
Policy-->>SG : 允许重试
SG->>Node : 第二次执行
Node-->>SG : 执行成功
SG-->>Client : 返回结果

带中断的节点 #

处理需要人工干预的场景:

节来源

最佳实践与设计原则 #

节点设计原则 #

  1. 单一职责: 每个节点只负责一个明确的功能
  2. 幂等性: 节点应该能够安全地重复执行
  3. 无副作用: 节点不应该修改外部状态
  4. 快速执行: 节点应该尽快完成执行

错误处理最佳实践 #

  1. 明确错误分类: 区分可重试和不可重试的错误
  2. 有意义的错误信息: 提供有助于诊断问题的错误信息
  3. 适当的重试策略: 根据错误类型选择合适的重试策略
  4. 超时控制: 为长时间运行的节点设置合理的超时

性能优化建议 #

  1. 合理使用并行: 在不影响状态安全的前提下使用并行执行
  2. 避免阻塞操作: 节点中避免使用阻塞的 I/O 操作
  3. 状态优化: 设计轻量级的状态结构
  4. 资源管理: 及时释放不需要的资源

可维护性考虑 #

  1. 清晰的命名: 使用描述性的节点名称
  2. 文档化: 为复杂节点提供详细的文档
  3. 测试覆盖: 确保节点有足够的测试覆盖率
  4. 监控支持: 在节点中添加适当的日志和指标

通过遵循这些原则和最佳实践,可以构建出健壮、高效且易于维护的节点系统,充分发挥 langgraphgo 工作流框架的优势。