节点 #
本文档中引用的文件
目录 #
简介 #
在 langgraphgo 中,节点(Node)是构建工作流的基本执行单元。每个节点代表一个独立的功能模块,负责处理特定的业务逻辑。节点通过函数的形式承载业务逻辑,接收上下文和状态作为输入,返回更新后的状态和错误信息。
节点的设计遵循函数式编程的原则,具有以下核心特征:
- 无状态性:节点本身不维护状态,所有状态都通过输入参数传递
- 纯函数性:相同的输入总是产生相同的输出
- 可组合性:多个节点可以组合成复杂的工作流
- 可扩展性:支持重试、超时、熔断等多种增强功能
Node 结构体详解 #
Node 结构体是 langgraphgo 中节点的核心数据结构,定义了节点的基本属性和行为。
classDiagram
class Node {
+string Name
+func(ctx Context, state interface) (interface, error) Function
+GetName() string
+GetFunction() Function
+Execute(ctx Context, state interface) (interface, error)
}
class MessageGraph {
+map[string]Node nodes
+AddNode(name string, fn Function)
+Invoke(ctx Context, state interface) (interface, error)
}
class StateGraph {
+map[string]Node nodes
+*RetryPolicy retryPolicy
+AddNode(name string, fn Function)
+executeNodeWithRetry(ctx Context, node Node, state interface) (interface, error)
}
MessageGraph --> Node : "包含"
StateGraph --> Node : "包含"
StateGraph --> RetryPolicy : "使用"
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L52-L59)
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L11-L32)
核心字段说明 #
Name 字段 #
- 类型:
string - 作用: 作为节点的唯一标识符,在图中必须保持唯一性
- 用途: 用于节点间的连接、路由决策和调试追踪
- 命名规范: 建议使用描述性的名称,如
"data_validation"、"email_processor"
Function 字段 #
- 类型:
func(ctx Context, state interface{}) (interface{}, error) - 作用: 承载节点的业务逻辑,是节点的核心执行函数
- 参数:
ctx: 上下文对象,提供取消信号和超时控制state: 当前状态,可以是任意类型,通常为结构体或映射
- 返回值:
interface{}: 更新后的状态error: 执行过程中产生的错误
节来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L52-L59)
节点注册与添加 #
langgraphgo 提供了两种主要的图类型来管理节点:MessageGraph 和 StateGraph。它们都提供了添加节点的方法,但各自有不同的特性和用途。
MessageGraph.AddNode 方法 #
MessageGraph 是最基础的图类型,适用于简单的消息传递模式。
sequenceDiagram
participant Client as "客户端代码"
participant MG as "MessageGraph"
participant Node as "Node实例"
Client->>MG : AddNode(name, function)
MG->>Node : 创建Node结构体
Node->>Node : 设置Name字段
Node->>Node : 设置Function字段
MG->>MG : 将Node存储到nodes映射中
MG-->>Client : 添加完成
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L103-L109)
StateGraph.AddNode 方法 #
StateGraph 提供了更丰富的功能,包括重试机制和状态管理。
sequenceDiagram
participant Client as "客户端代码"
participant SG as "StateGraph"
participant Node as "Node实例"
Client->>SG : AddNode(name, function)
SG->>Node : 创建Node结构体
Node->>Node : 设置Name字段
Node->>Node : 设置Function字段
SG->>SG : 将Node存储到nodes映射中
SG-->>Client : 添加完成
图表来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L58-L64)
添加节点的具体实践 #
在 examples/basic_example/main.go 中展示了节点添加的实际应用:
节来源
- [main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/basic_example/main.go#L27-L30)
节点执行机制 #
节点的执行是通过图的编译和运行阶段完成的。在执行过程中,节点按照预定义的顺序和规则进行调用。
flowchart TD
Start([开始执行]) --> CheckEntry{检查入口节点}
CheckEntry --> |有效| GetNodes[获取当前节点列表]
CheckEntry --> |无效| Error[返回错误]
GetNodes --> FilterActive[过滤活跃节点]
FilterActive --> ParallelExec[并行执行节点]
ParallelExec --> ExecuteNode[执行单个节点]
ExecuteNode --> CheckResult{检查执行结果}
CheckResult --> |成功| ProcessResult[处理结果]
CheckResult --> |失败| HandleError[处理错误]
ProcessResult --> CheckCommand{检查是否为Command}
CheckCommand --> |是| ProcessCommand[处理Command]
CheckCommand --> |否| MergeState[合并状态]
ProcessCommand --> DetermineNext[确定下一节点]
MergeState --> DetermineNext
DetermineNext --> CheckInterrupt{检查中断}
CheckInterrupt --> |有中断| Interrupt[触发中断]
CheckInterrupt --> |无中断| CheckMore{还有节点?}
CheckMore --> |是| GetNodes
CheckMore --> |否| Complete[执行完成]
HandleError --> Error
Interrupt --> Error
Complete --> End([结束])
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L175-L491)
执行流程详解 #
- 入口验证: 确保图的入口节点已设置
- 节点过滤: 移除标记为 END 的节点
- 并行执行: 使用 goroutine 并行执行所有活跃节点
- 结果处理: 处理节点返回的结果和命令
- 状态合并: 根据图的策略合并多个节点的状态
- 路由决策: 基于静态边或条件边确定下一组节点
节来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L224-L491)
并行执行环境下的节点行为 #
langgraphgo 支持节点的并行执行,这是提高工作流性能的关键特性。
sequenceDiagram
participant Main as "主执行线程"
participant WG as "WaitGroup"
participant Node1 as "节点1"
participant Node2 as "节点2"
participant Node3 as "节点3"
Main->>WG : Add(3)
Main->>Node1 : 启动goroutine
Main->>Node2 : 启动goroutine
Main->>Node3 : 启动goroutine
par 并行执行
Node1->>Node1 : 执行业务逻辑
Node1->>WG : Done()
and
Node2->>Node2 : 执行业务逻辑
Node2->>WG : Done()
and
Node3->>Node3 : 执行业务逻辑
Node3->>WG : Done()
end
Main->>WG : Wait()
WG-->>Main : 所有节点完成
Main->>Main : 处理并行结果
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L250-L316)
并行执行的特点 #
- 独立性: 每个节点在独立的 goroutine 中执行
- 隔离性: 节点间不共享状态,避免竞态条件
- 容错性: 单个节点的失败不会影响其他节点
- 性能: 充分利用多核 CPU 资源
注意事项 #
- 状态安全: 如果状态是可变对象,需要确保线程安全
- 资源竞争: 避免多个节点同时访问共享资源
- 错误传播: 需要正确处理和传播节点执行错误
节来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L250-L316)
节点级别的重试机制 #
StateGraph 提供了内置的重试机制,可以在节点执行失败时自动重试。
flowchart TD
Start([开始执行节点]) --> Execute[执行节点函数]
Execute --> CheckResult{检查执行结果}
CheckResult --> |成功| Success[返回结果]
CheckResult --> |失败| CheckRetry{检查重试配置}
CheckRetry --> |无重试配置| Fail[返回错误]
CheckRetry --> |有重试配置| CheckRetryable{检查是否可重试}
CheckRetryable --> |不可重试| Fail
CheckRetryable --> |可重试| CheckAttempts{检查重试次数}
CheckAttempts --> |达到上限| Fail
CheckAttempts --> |未达到上限| WaitDelay[等待退避延迟]
WaitDelay --> Execute
Success --> End([结束])
Fail --> End
图表来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L299-L338)
RetryPolicy 配置 #
RetryPolicy 定义了重试行为的各个方面:
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| MaxRetries | int | 1 | 最大重试次数 |
| BackoffStrategy | BackoffStrategy | FixedBackoff | 退避策略 |
| RetryableErrors | []string | nil | 可重试的错误模式 |
退避策略 #
- FixedBackoff: 固定时间间隔
- ExponentialBackoff: 指数退避(1s, 2s, 4s, 8s…)
- LinearBackoff: 线性退避(1s, 2s, 3s, 4s…)
节来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L34-L48)
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L299-L338)
错误处理与中断信号 #
langgraphgo 提供了完善的错误处理和中断机制,确保工作流的可靠性和可控性。
错误类型 #
classDiagram
class Error {
<<abstract>>
}
class NodeInterrupt {
+string Node
+interface Value
+Error() string
}
class GraphInterrupt {
+string Node
+interface State
+[]string NextNodes
+interface InterruptValue
+Error() string
}
class StandardError {
+string Message
+Error() string
}
Error <|-- NodeInterrupt
Error <|-- GraphInterrupt
Error <|-- StandardError
图表来源
- [errors.go](https://github.com/smallnest/langgraphgo/blob/main/graph/errors.go#L5-L10)
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L24-L41)
中断机制 #
NodeInterrupt #
- 触发时机: 节点主动请求中断
- 用途: 用于需要外部输入或人工干预的场景
- 处理方式: 返回 NodeInterrupt 错误,包含中断值
GraphInterrupt #
- 触发时机: 图级别中断(InterruptBefore/InterruptAfter)
- 用途: 控制整个工作流的执行流程
- 处理方式: 返回 GraphInterrupt 错误,包含当前状态和下一节点
中断处理流程 #
flowchart TD
NodeExec[节点执行] --> CheckInterrupt{检查中断条件}
CheckInterrupt --> |节点中断| CreateNodeInt[创建NodeInterrupt]
CheckInterrupt --> |图中断Before| CreateGraphIntBefore[创建GraphInterrupt Before]
CheckInterrupt --> |图中断After| CreateGraphIntAfter[创建GraphInterrupt After]
CheckInterrupt --> |无中断| ContinueExec[继续执行]
CreateNodeInt --> PropagateInt[传播中断]
CreateGraphIntBefore --> PropagateInt
CreateGraphIntAfter --> PropagateInt
PropagateInt --> HandleInt[处理中断]
HandleInt --> Resume[恢复执行]
ContinueExec --> Success[执行成功]
Resume --> Success
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L320-L341)
- [interrupt_test.go](https://github.com/smallnest/langgraphgo/blob/main/graph/interrupt_test.go#L30-L63)
节来源
- [errors.go](https://github.com/smallnest/langgraphgo/blob/main/graph/errors.go#L5-L10)
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L24-L41)
- [interrupt_test.go](https://github.com/smallnest/langgraphgo/blob/main/graph/interrupt_test.go#L30-L63)
节点与检查点的交互 #
检查点机制为节点提供了状态持久化的支持,确保在故障恢复时能够从正确的状态继续执行。
检查点生命周期 #
stateDiagram-v2
[*] --> Created : 节点开始执行
Created --> Executing : 开始执行
Executing --> CheckpointSaved : 自动保存检查点
Executing --> ManualCheckpoint : 手动保存检查点
CheckpointSaved --> Success : 执行成功
ManualCheckpoint --> Success
Executing --> Failed : 执行失败
Failed --> [*] : 错误处理
Success --> [*] : 正常完成
检查点配置 #
| 配置项 | 类型 | 说明 |
|---|---|---|
| Store | CheckpointStore | 存储后端(内存、PostgreSQL、SQLite、Redis) |
| AutoSave | bool | 是否自动保存检查点 |
| SaveInterval | time.Duration | 自动保存间隔 |
| MaxCheckpoints | int | 最大检查点数量 |
检查点与节点的交互 #
- 自动保存: 在节点执行完成后自动保存检查点
- 手动保存: 节点可以主动请求保存检查点
- 恢复执行: 从检查点恢复时重新执行节点
- 状态查询: 获取特定检查点的状态信息
节来源
- [checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L12-L19)
- [checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L188-L201)
实际应用示例 #
基础节点示例 #
在 basic_example 中展示了最基本的节点定义和使用:
节来源
- [main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/basic_example/main.go#L27-L30)
带重试的节点 #
使用 StateGraph 的重试机制:
sequenceDiagram
participant Client as "客户端"
participant SG as "StateGraph"
participant Node as "重试节点"
participant Policy as "RetryPolicy"
Client->>SG : 添加带重试的节点
SG->>Node : 包装重试逻辑
Client->>SG : 执行节点
SG->>Node : 第一次执行
Node->>Node : 执行业务逻辑
Node-->>SG : 执行失败
SG->>Policy : 检查重试条件
Policy-->>SG : 允许重试
SG->>Node : 第二次执行
Node-->>SG : 执行成功
SG-->>Client : 返回结果
带中断的节点 #
处理需要人工干预的场景:
节来源
- [interrupt_test.go](https://github.com/smallnest/langgraphgo/blob/main/graph/interrupt_test.go#L12-L20)
最佳实践与设计原则 #
节点设计原则 #
- 单一职责: 每个节点只负责一个明确的功能
- 幂等性: 节点应该能够安全地重复执行
- 无副作用: 节点不应该修改外部状态
- 快速执行: 节点应该尽快完成执行
错误处理最佳实践 #
- 明确错误分类: 区分可重试和不可重试的错误
- 有意义的错误信息: 提供有助于诊断问题的错误信息
- 适当的重试策略: 根据错误类型选择合适的重试策略
- 超时控制: 为长时间运行的节点设置合理的超时
性能优化建议 #
- 合理使用并行: 在不影响状态安全的前提下使用并行执行
- 避免阻塞操作: 节点中避免使用阻塞的 I/O 操作
- 状态优化: 设计轻量级的状态结构
- 资源管理: 及时释放不需要的资源
可维护性考虑 #
- 清晰的命名: 使用描述性的节点名称
- 文档化: 为复杂节点提供详细的文档
- 测试覆盖: 确保节点有足够的测试覆盖率
- 监控支持: 在节点中添加适当的日志和指标
通过遵循这些原则和最佳实践,可以构建出健壮、高效且易于维护的节点系统,充分发挥 langgraphgo 工作流框架的优势。