核心概念 #
本文档中引用的文件
目录 #
简介 #
LangGraphGo 是一个基于图结构的工作流引擎,它将应用程序逻辑建模为有向无环图(DAG),其中节点表示处理步骤,边表示数据流向。这种设计提供了高度的灵活性、可组合性和可观测性,特别适用于复杂的 AI 应用程序和数据处理管道。
图结构工作流概述 #
基本架构 #
LangGraphGo 的核心是基于图的执行模型,它将应用程序分解为独立的处理单元(节点),并通过连接这些单元来定义执行流程。
graph TD
A[输入状态] --> B[节点1<br/>数据处理]
B --> C[节点2<br/>业务逻辑]
C --> D[节点3<br/>结果生成]
D --> E[END<br/>结束]
F[状态Schema] --> B
F --> C
F --> D
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L52-L60)
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L11-L32)
执行模型 #
图结构工作流采用并行执行模型,在每个步骤中可以同时执行多个节点,然后通过状态合并机制统一处理结果。
sequenceDiagram
participant G as 图执行器
participant N1 as 节点1
participant N2 as 节点2
participant S as 状态管理器
G->>N1 : 并行执行
G->>N2 : 并行执行
N1-->>G : 返回结果1
N2-->>G : 返回结果2
G->>S : 合并状态
S-->>G : 更新后状态
G->>G : 决定下一跳
图表来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L143-L166)
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L249-L316)
章节来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L1-L492)
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L1-L458)
节点与边 #
节点(Node) #
节点是图中的基本执行单元,每个节点都有唯一的名称和关联的处理函数。
classDiagram
class Node {
+string Name
+Function func
+Execute(ctx, state) result
}
class StateRunnable {
+Invoke(ctx, state) result
+executeNodeWithRetry(ctx, node, state) result
}
Node --> StateRunnable : "被调用"
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L52-L60)
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L99-L102)
节点特性 #
- 唯一标识: 每个节点必须有唯一的名称
- 函数绑定: 关联具体的处理逻辑
- 状态感知: 接收当前状态作为输入
- 并行执行: 支持多节点并发执行
边(Edge) #
边定义了节点之间的连接关系,控制数据流向和执行顺序。
graph LR
A[开始节点] --> B[处理节点]
B --> C[结束节点]
subgraph "静态边"
A -.->|直接连接| B
B -.->|直接连接| C
end
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L62-L70)
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L67-L71)
边类型 #
- 静态边: 预定义的固定连接关系
- 条件边: 基于状态动态决定的连接
- 扇出边: 从单个节点到多个目标节点的连接
条件边(Conditional Edge) #
条件边允许根据运行时状态动态选择下一个执行路径。
flowchart TD
A[开始] --> B{条件判断}
B --> |条件1| C[路径1]
B --> |条件2| D[路径2]
B --> |默认| E[路径3]
C --> F[结束]
D --> F
E --> F
图表来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L74-L77)
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L119-L122)
入口点(Entry Point) #
入口点定义了图执行的起始位置,是整个工作流的入口。
graph TD
A[ENTRY_POINT] --> B[节点1]
B --> C[节点2]
C --> D[END]
style A fill:#ff9999
图表来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L79-L82)
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L125-L128)
章节来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L52-L139)
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L58-L89)
StateGraph 设计 #
核心结构 #
StateGraph 是 LangGraphGo 的核心组件,它封装了整个图的状态管理和执行逻辑。
classDiagram
class StateGraph {
+map~string,Node~ nodes
+[]Edge edges
+map~string,func~ conditionalEdges
+string entryPoint
+RetryPolicy retryPolicy
+StateMerger stateMerger
+StateSchema Schema
+AddNode(name, fn)
+AddEdge(from, to)
+AddConditionalEdge(from, condition)
+SetEntryPoint(name)
+Compile() StateRunnable
}
class StateRunnable {
+StateGraph graph
+Invoke(ctx, state) result
+InvokeWithConfig(ctx, state, config) result
}
StateGraph --> StateRunnable : "编译为"
图表来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L11-L32)
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L99-L102)
状态管理 #
StateGraph 通过状态模式来管理应用程序状态,确保状态的一致性和可预测性。
stateDiagram-v2
[*] --> 初始化
初始化 --> 执行中 : SetSchema()
执行中 --> 执行中 : 节点执行
执行中 --> 完成 : 所有节点完成
执行中 --> 错误 : 节点失败
错误 --> 重试 : 支持重试
重试 --> 执行中 : 重试成功
重试 --> 失败 : 重试耗尽
完成 --> [*]
失败 --> [*]
图表来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L115-L296)
- [schema.go](https://github.com/smallnest/langgraphgo/blob/main/graph/schema.go#L12-L19)
并发执行机制 #
StateGraph 支持并行节点执行,通过 WaitGroup 确保所有节点完成后再继续下一步。
sequenceDiagram
participant M as 主执行器
participant W1 as 工作线程1
participant W2 as 工作线程2
participant W3 as 工作线程3
M->>W1 : 启动节点1
M->>W2 : 启动节点2
M->>W3 : 启动节点3
par 并行执行
W1-->>M : 完成结果1
and
W2-->>M : 完成结果2
and
W3-->>M : 完成结果3
end
M->>M : 合并结果
M->>M : 更新状态
图表来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L143-L166)
章节来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L1-L458)
状态模式与更新机制 #
StateSchema 接口 #
StateSchema 定义了状态的结构和更新逻辑,提供了类型安全的状态管理。
classDiagram
class StateSchema {
<<interface>>
+Init() interface
+Update(current, new) interface
}
class CleaningStateSchema {
<<interface>>
+Cleanup(state) interface
}
class MapSchema {
+map~string,Reducer~ Reducers
+map~string,bool~ EphemeralKeys
+RegisterReducer(key, reducer)
+RegisterChannel(key, reducer, isEphemeral)
+Init() interface
+Update(current, new) interface
+Cleanup(state) interface
}
StateSchema <|-- CleaningStateSchema
StateSchema <|-- MapSchema
图表来源
- [schema.go](https://github.com/smallnest/langgraphgo/blob/main/graph/schema.go#L12-L27)
- [schema.go](https://github.com/smallnest/langgraphgo/blob/main/graph/schema.go#L29-L42)
Reducer 函数 #
Reducer 定义了如何将新状态值合并到现有状态中。
flowchart TD
A[当前状态] --> B{Reducer类型}
B --> |Overwrite| C[直接替换]
B --> |Append| D[追加到列表]
B --> |自定义| E[执行自定义逻辑]
C --> F[更新后状态]
D --> F
E --> F
图表来源
- [schema.go](https://github.com/smallnest/langgraphgo/blob/main/graph/schema.go#L8-L11)
- [schema.go](https://github.com/smallnest/langgraphgo/blob/main/graph/schema.go#L141-L186)
状态更新流程 #
状态更新遵循严格的合并规则,确保数据一致性。
sequenceDiagram
participant N as 节点
participant S as StateSchema
participant R as Reducer
participant U as 状态更新器
N->>S : 请求更新状态
S->>R : 获取对应Reducer
R->>U : 执行合并逻辑
U->>S : 返回合并结果
S->>N : 返回更新后状态
图表来源
- [schema.go](https://github.com/smallnest/langgraphgo/blob/main/graph/schema.go#L62-L99)
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L200-L216)
通道与临时状态 #
MapSchema 支持通道概念,可以定义哪些键是临时的,在每步执行后自动清理。
章节来源
- [schema.go](https://github.com/smallnest/langgraphgo/blob/main/graph/schema.go#L1-L186)
持久化检查点 #
检查点存储接口 #
检查点系统提供了可靠的状态持久化机制,支持容错和恢复。
classDiagram
class CheckpointStore {
<<interface>>
+Save(ctx, checkpoint) error
+Load(ctx, id) Checkpoint
+List(ctx, executionID) []Checkpoint
+Delete(ctx, id) error
+Clear(ctx, executionID) error
}
class Checkpoint {
+string ID
+string NodeName
+interface State
+map~string,interface~ Metadata
+time.Time Timestamp
+int Version
}
class MemoryCheckpointStore {
+map~string,Checkpoint~ checkpoints
+sync.RWMutex mutex
+Save(ctx, checkpoint) error
+Load(ctx, id) Checkpoint
+List(ctx, executionID) []Checkpoint
+Delete(ctx, id) error
+Clear(ctx, executionID) error
}
class CheckpointableRunnable {
+ListenableRunnable runnable
+CheckpointConfig config
+string executionID
+SaveCheckpoint(ctx, nodeName, state) error
+LoadCheckpoint(ctx, id) Checkpoint
+ListCheckpoints(ctx) []Checkpoint
+ResumeFromCheckpoint(ctx, id) interface
+GetState(ctx, config) StateSnapshot
+UpdateState(ctx, config, values, asNode) Config
}
CheckpointStore <|-- MemoryCheckpointStore
CheckpointStore <-- Checkpoint
CheckpointableRunnable --> CheckpointStore
图表来源
- [checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L23-L38)
- [checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L12-L20)
- [checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L40-L51)
自动检查点机制 #
检查点系统支持自动保存和手动保存两种模式。
flowchart TD
A[节点执行开始] --> B{启用自动保存?}
B --> |是| C[创建检查点]
B --> |否| D[等待手动触发]
C --> E[异步保存]
E --> F[继续执行]
D --> G[手动SaveCheckpoint]
G --> H[同步保存]
H --> F
F --> I[节点执行完成]
I --> J[更新元数据]
J --> K[检查点完成]
图表来源
- [checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L297-L330)
- [checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L230-L251)
恢复机制 #
检查点系统支持从中断点恢复执行,提供强大的容错能力。
sequenceDiagram
participant E as 执行器
participant S as 存储
participant R as 恢复器
E->>S : 加载检查点
S-->>E : 返回检查点数据
E->>R : 解析检查点
R->>R : 提取状态和元数据
R->>E : 返回恢复状态
E->>E : 从检查点继续执行
图表来源
- [checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L279-L290)
- [checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L397-L463)
状态快照 #
StateSnapshot 提供了完整的状态视图,包括时间戳、元数据和配置信息。
章节来源
- [checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L1-L560)
流式输出与监听器 #
监听器架构 #
监听器系统提供了事件驱动的可观测性机制,支持多种事件类型的捕获和处理。
classDiagram
class NodeListener {
<<interface>>
+OnNodeEvent(ctx, event, nodeName, state, err)
}
class ListenableNode {
+Node Node
+[]NodeListener listeners
+sync.RWMutex mutex
+AddListener(listener)
+RemoveListener(listener)
+NotifyListeners(ctx, event, state, err)
+Execute(ctx, state) result
}
class StreamingListener {
+chan~StreamEvent~ eventChan
+StreamConfig config
+int droppedEvents
+bool closed
+emitEvent(event)
+shouldEmit(event) bool
+handleBackpressure()
}
class StreamEvent {
+time.Time Timestamp
+string NodeName
+NodeEvent Event
+interface State
+error Error
+map~string,interface~ Metadata
+time.Duration Duration
}
NodeListener <|-- StreamingListener
ListenableNode --> NodeListener : "通知"
StreamingListener --> StreamEvent : "生成"
图表来源
- [listeners.go](https://github.com/smallnest/langgraphgo/blob/main/graph/listeners.go#L51-L55)
- [listeners.go](https://github.com/smallnest/langgraphgo/blob/main/graph/listeners.go#L89-L102)
- [streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L66-L70)
- [streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L66-L87)
事件类型 #
系统定义了丰富的事件类型来覆盖各种执行场景。
graph TD
A[NodeEvent] --> B[NodeEventStart]
A --> C[NodeEventProgress]
A --> D[NodeEventComplete]
A --> E[NodeEventError]
F[链路事件] --> G[EventChainStart]
F --> H[EventChainEnd]
I[工具事件] --> J[EventToolStart]
I --> K[EventToolEnd]
L[LLM事件] --> M[EventLLMStart]
L --> N[EventLLMEnd]
O[流式事件] --> P[EventToken]
O --> Q[EventCustom]
图表来源
- [listeners.go](https://github.com/smallnest/langgraphgo/blob/main/graph/listeners.go#L10-L49)
流式模式 #
流式输出支持多种模式,满足不同的监控和调试需求。
flowchart TD
A[流式配置] --> B{模式类型}
B --> |values| C[完整状态流]
B --> |updates| D[增量更新流]
B --> |messages| E[消息流]
B --> |debug| F[调试事件流]
C --> G[GraphStep事件]
D --> H[ToolEnd/ChainEnd事件]
E --> I[LLMStart/LLMEnd事件]
F --> J[所有事件类型]
图表来源
- [streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L9-L21)
- [streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L112-L133)
背压处理 #
流式系统实现了智能的背压处理机制,防止快速生产者淹没消费者。
sequenceDiagram
participant P as 生产者
participant L as 监听器
participant C as 消费者
P->>L : 发送事件
L->>L : 检查缓冲区
alt 缓冲区充足
L->>L : 直接发送
else 缓冲区满
L->>L : 启用背压处理
L->>L : 记录丢弃事件
L->>P : 丢弃事件
end
C->>L : 消费事件
L->>C : 提供事件
图表来源
- [streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L84-L109)
- [streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L252-L268)
章节来源
- [listeners.go](https://github.com/smallnest/langgraphgo/blob/main/graph/listeners.go#L1-L335)
- [streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L1-L476)
错误处理与重试策略 #
重试策略配置 #
LangGraphGo 提供了灵活的错误处理和重试机制。
classDiagram
class RetryPolicy {
+int MaxRetries
+BackoffStrategy BackoffStrategy
+[]string RetryableErrors
}
class BackoffStrategy {
<<enumeration>>
FixedBackoff
ExponentialBackoff
LinearBackoff
}
class StateRunnable {
+executeNodeWithRetry(ctx, node, state) result
+isRetryableError(err) bool
+calculateBackoffDelay(attempt) time.Duration
}
RetryPolicy --> BackoffStrategy
StateRunnable --> RetryPolicy
图表来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L34-L48)
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L397-L458)
重试算法 #
系统支持多种退避策略来优化重试行为。
flowchart TD
A[节点执行失败] --> B{是否可重试?}
B --> |否| C[返回最终错误]
B --> |是| D{达到最大重试次数?}
D --> |是| C
D --> |否| E{退避策略}
E --> |Fixed| F[固定延迟1秒]
E --> |Exponential| G[指数退避: 1s, 2s, 4s...]
E --> |Linear| H[线性退避: 1s, 2s, 3s...]
F --> I[等待延迟]
G --> I
H --> I
I --> J[重新执行节点]
J --> K{执行成功?}
K --> |是| L[返回结果]
K --> |否| D
图表来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L299-L339)
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L375-L395)
错误分类 #
系统能够智能地识别和处理不同类型的错误。
graph TD
A[错误发生] --> B{错误类型}
B --> |网络错误| C[可重试]
B --> |认证错误| D[不可重试]
B --> |超时错误| C
B --> |业务逻辑错误| D
C --> E[应用重试策略]
D --> F[立即失败]
图表来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L341-L355)
章节来源
- [state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L34-L395)
总结 #
LangGraphGo 的核心概念构成了一个强大而灵活的工作流引擎,其主要特点包括:
架构优势 #
- 模块化设计: 清晰的职责分离,便于扩展和维护
- 类型安全: 强类型的状态管理和事件处理
- 并发友好: 内置并行执行和状态同步机制
- 可观测性: 完整的监听器和流式输出系统
核心价值 #
- 容错能力: 检查点系统提供可靠的错误恢复
- 可调试性: 丰富的事件系统支持深度调试
- 可扩展性: 插件式的监听器和状态管理
- 性能优化: 智能的重试和背压处理
应用场景 #
LangGraphGo 特别适合以下应用场景:
- 复杂的 AI 工作流处理
- 数据管道和 ETL 系统
- 微服务编排和协调
- 实时数据处理和分析
通过深入理解这些核心概念,开发者可以更好地利用 LangGraphGo 构建健壮、可维护的应用程序。