核心概念 #

目录 #

  1. 简介
  2. 图结构工作流概述
  3. 节点与边
  4. StateGraph 设计
  5. 状态模式与更新机制
  6. 持久化检查点
  7. 流式输出与监听器
  8. 错误处理与重试策略
  9. 总结

简介 #

LangGraphGo 是一个基于图结构的工作流引擎,它将应用程序逻辑建模为有向无环图(DAG),其中节点表示处理步骤,边表示数据流向。这种设计提供了高度的灵活性、可组合性和可观测性,特别适用于复杂的 AI 应用程序和数据处理管道。

图结构工作流概述 #

基本架构 #

LangGraphGo 的核心是基于图的执行模型,它将应用程序分解为独立的处理单元(节点),并通过连接这些单元来定义执行流程。

graph TD
A[输入状态] --> B[节点1<br/>数据处理]
B --> C[节点2<br/>业务逻辑]
C --> D[节点3<br/>结果生成]
D --> E[END<br/>结束]
F[状态Schema] --> B
F --> C
F --> D

图表来源

执行模型 #

图结构工作流采用并行执行模型,在每个步骤中可以同时执行多个节点,然后通过状态合并机制统一处理结果。

sequenceDiagram
participant G as 图执行器
participant N1 as 节点1
participant N2 as 节点2
participant S as 状态管理器
G->>N1 : 并行执行
G->>N2 : 并行执行
N1-->>G : 返回结果1
N2-->>G : 返回结果2
G->>S : 合并状态
S-->>G : 更新后状态
G->>G : 决定下一跳

图表来源

章节来源

节点与边 #

节点(Node) #

节点是图中的基本执行单元,每个节点都有唯一的名称和关联的处理函数。

classDiagram
class Node {
+string Name
+Function func
+Execute(ctx, state) result
}
class StateRunnable {
+Invoke(ctx, state) result
+executeNodeWithRetry(ctx, node, state) result
}
Node --> StateRunnable : "被调用"

图表来源

节点特性 #

  1. 唯一标识: 每个节点必须有唯一的名称
  2. 函数绑定: 关联具体的处理逻辑
  3. 状态感知: 接收当前状态作为输入
  4. 并行执行: 支持多节点并发执行

边(Edge) #

边定义了节点之间的连接关系,控制数据流向和执行顺序。

graph LR
A[开始节点] --> B[处理节点]
B --> C[结束节点]
subgraph "静态边"
A -.->|直接连接| B
B -.->|直接连接| C
end

图表来源

边类型 #

  1. 静态边: 预定义的固定连接关系
  2. 条件边: 基于状态动态决定的连接
  3. 扇出边: 从单个节点到多个目标节点的连接

条件边(Conditional Edge) #

条件边允许根据运行时状态动态选择下一个执行路径。

flowchart TD
A[开始] --> B{条件判断}
B --> |条件1| C[路径1]
B --> |条件2| D[路径2]
B --> |默认| E[路径3]
C --> F[结束]
D --> F
E --> F

图表来源

入口点(Entry Point) #

入口点定义了图执行的起始位置,是整个工作流的入口。

graph TD
A[ENTRY_POINT] --> B[节点1]
B --> C[节点2]
C --> D[END]
style A fill:#ff9999

图表来源

章节来源

StateGraph 设计 #

核心结构 #

StateGraph 是 LangGraphGo 的核心组件,它封装了整个图的状态管理和执行逻辑。

classDiagram
class StateGraph {
+map~string,Node~ nodes
+[]Edge edges
+map~string,func~ conditionalEdges
+string entryPoint
+RetryPolicy retryPolicy
+StateMerger stateMerger
+StateSchema Schema
+AddNode(name, fn)
+AddEdge(from, to)
+AddConditionalEdge(from, condition)
+SetEntryPoint(name)
+Compile() StateRunnable
}
class StateRunnable {
+StateGraph graph
+Invoke(ctx, state) result
+InvokeWithConfig(ctx, state, config) result
}
StateGraph --> StateRunnable : "编译为"

图表来源

状态管理 #

StateGraph 通过状态模式来管理应用程序状态,确保状态的一致性和可预测性。

stateDiagram-v2
[*] --> 初始化
初始化 --> 执行中 : SetSchema()
执行中 --> 执行中 : 节点执行
执行中 --> 完成 : 所有节点完成
执行中 --> 错误 : 节点失败
错误 --> 重试 : 支持重试
重试 --> 执行中 : 重试成功
重试 --> 失败 : 重试耗尽
完成 --> [*]
失败 --> [*]

图表来源

并发执行机制 #

StateGraph 支持并行节点执行,通过 WaitGroup 确保所有节点完成后再继续下一步。

sequenceDiagram
participant M as 主执行器
participant W1 as 工作线程1
participant W2 as 工作线程2
participant W3 as 工作线程3
M->>W1 : 启动节点1
M->>W2 : 启动节点2
M->>W3 : 启动节点3
par 并行执行
W1-->>M : 完成结果1
and
W2-->>M : 完成结果2
and
W3-->>M : 完成结果3
end
M->>M : 合并结果
M->>M : 更新状态

图表来源

章节来源

状态模式与更新机制 #

StateSchema 接口 #

StateSchema 定义了状态的结构和更新逻辑,提供了类型安全的状态管理。

classDiagram
class StateSchema {
<<interface>>
+Init() interface
+Update(current, new) interface
}
class CleaningStateSchema {
<<interface>>
+Cleanup(state) interface
}
class MapSchema {
+map~string,Reducer~ Reducers
+map~string,bool~ EphemeralKeys
+RegisterReducer(key, reducer)
+RegisterChannel(key, reducer, isEphemeral)
+Init() interface
+Update(current, new) interface
+Cleanup(state) interface
}
StateSchema <|-- CleaningStateSchema
StateSchema <|-- MapSchema

图表来源

Reducer 函数 #

Reducer 定义了如何将新状态值合并到现有状态中。

flowchart TD
A[当前状态] --> B{Reducer类型}
B --> |Overwrite| C[直接替换]
B --> |Append| D[追加到列表]
B --> |自定义| E[执行自定义逻辑]
C --> F[更新后状态]
D --> F
E --> F

图表来源

状态更新流程 #

状态更新遵循严格的合并规则,确保数据一致性。

sequenceDiagram
participant N as 节点
participant S as StateSchema
participant R as Reducer
participant U as 状态更新器
N->>S : 请求更新状态
S->>R : 获取对应Reducer
R->>U : 执行合并逻辑
U->>S : 返回合并结果
S->>N : 返回更新后状态

图表来源

通道与临时状态 #

MapSchema 支持通道概念,可以定义哪些键是临时的,在每步执行后自动清理。

章节来源

持久化检查点 #

检查点存储接口 #

检查点系统提供了可靠的状态持久化机制,支持容错和恢复。

classDiagram
class CheckpointStore {
<<interface>>
+Save(ctx, checkpoint) error
+Load(ctx, id) Checkpoint
+List(ctx, executionID) []Checkpoint
+Delete(ctx, id) error
+Clear(ctx, executionID) error
}
class Checkpoint {
+string ID
+string NodeName
+interface State
+map~string,interface~ Metadata
+time.Time Timestamp
+int Version
}
class MemoryCheckpointStore {
+map~string,Checkpoint~ checkpoints
+sync.RWMutex mutex
+Save(ctx, checkpoint) error
+Load(ctx, id) Checkpoint
+List(ctx, executionID) []Checkpoint
+Delete(ctx, id) error
+Clear(ctx, executionID) error
}
class CheckpointableRunnable {
+ListenableRunnable runnable
+CheckpointConfig config
+string executionID
+SaveCheckpoint(ctx, nodeName, state) error
+LoadCheckpoint(ctx, id) Checkpoint
+ListCheckpoints(ctx) []Checkpoint
+ResumeFromCheckpoint(ctx, id) interface
+GetState(ctx, config) StateSnapshot
+UpdateState(ctx, config, values, asNode) Config
}
CheckpointStore <|-- MemoryCheckpointStore
CheckpointStore <-- Checkpoint
CheckpointableRunnable --> CheckpointStore

图表来源

自动检查点机制 #

检查点系统支持自动保存和手动保存两种模式。

flowchart TD
A[节点执行开始] --> B{启用自动保存?}
B --> |是| C[创建检查点]
B --> |否| D[等待手动触发]
C --> E[异步保存]
E --> F[继续执行]
D --> G[手动SaveCheckpoint]
G --> H[同步保存]
H --> F
F --> I[节点执行完成]
I --> J[更新元数据]
J --> K[检查点完成]

图表来源

恢复机制 #

检查点系统支持从中断点恢复执行,提供强大的容错能力。

sequenceDiagram
participant E as 执行器
participant S as 存储
participant R as 恢复器
E->>S : 加载检查点
S-->>E : 返回检查点数据
E->>R : 解析检查点
R->>R : 提取状态和元数据
R->>E : 返回恢复状态
E->>E : 从检查点继续执行

图表来源

状态快照 #

StateSnapshot 提供了完整的状态视图,包括时间戳、元数据和配置信息。

章节来源

流式输出与监听器 #

监听器架构 #

监听器系统提供了事件驱动的可观测性机制,支持多种事件类型的捕获和处理。

classDiagram
class NodeListener {
<<interface>>
+OnNodeEvent(ctx, event, nodeName, state, err)
}
class ListenableNode {
+Node Node
+[]NodeListener listeners
+sync.RWMutex mutex
+AddListener(listener)
+RemoveListener(listener)
+NotifyListeners(ctx, event, state, err)
+Execute(ctx, state) result
}
class StreamingListener {
+chan~StreamEvent~ eventChan
+StreamConfig config
+int droppedEvents
+bool closed
+emitEvent(event)
+shouldEmit(event) bool
+handleBackpressure()
}
class StreamEvent {
+time.Time Timestamp
+string NodeName
+NodeEvent Event
+interface State
+error Error
+map~string,interface~ Metadata
+time.Duration Duration
}
NodeListener <|-- StreamingListener
ListenableNode --> NodeListener : "通知"
StreamingListener --> StreamEvent : "生成"

图表来源

事件类型 #

系统定义了丰富的事件类型来覆盖各种执行场景。

graph TD
A[NodeEvent] --> B[NodeEventStart]
A --> C[NodeEventProgress]
A --> D[NodeEventComplete]
A --> E[NodeEventError]
F[链路事件] --> G[EventChainStart]
F --> H[EventChainEnd]
I[工具事件] --> J[EventToolStart]
I --> K[EventToolEnd]
L[LLM事件] --> M[EventLLMStart]
L --> N[EventLLMEnd]
O[流式事件] --> P[EventToken]
O --> Q[EventCustom]

图表来源

流式模式 #

流式输出支持多种模式,满足不同的监控和调试需求。

flowchart TD
A[流式配置] --> B{模式类型}
B --> |values| C[完整状态流]
B --> |updates| D[增量更新流]
B --> |messages| E[消息流]
B --> |debug| F[调试事件流]
C --> G[GraphStep事件]
D --> H[ToolEnd/ChainEnd事件]
E --> I[LLMStart/LLMEnd事件]
F --> J[所有事件类型]

图表来源

背压处理 #

流式系统实现了智能的背压处理机制,防止快速生产者淹没消费者。

sequenceDiagram
participant P as 生产者
participant L as 监听器
participant C as 消费者
P->>L : 发送事件
L->>L : 检查缓冲区
alt 缓冲区充足
L->>L : 直接发送
else 缓冲区满
L->>L : 启用背压处理
L->>L : 记录丢弃事件
L->>P : 丢弃事件
end
C->>L : 消费事件
L->>C : 提供事件

图表来源

章节来源

错误处理与重试策略 #

重试策略配置 #

LangGraphGo 提供了灵活的错误处理和重试机制。

classDiagram
class RetryPolicy {
+int MaxRetries
+BackoffStrategy BackoffStrategy
+[]string RetryableErrors
}
class BackoffStrategy {
<<enumeration>>
FixedBackoff
ExponentialBackoff
LinearBackoff
}
class StateRunnable {
+executeNodeWithRetry(ctx, node, state) result
+isRetryableError(err) bool
+calculateBackoffDelay(attempt) time.Duration
}
RetryPolicy --> BackoffStrategy
StateRunnable --> RetryPolicy

图表来源

重试算法 #

系统支持多种退避策略来优化重试行为。

flowchart TD
A[节点执行失败] --> B{是否可重试?}
B --> |否| C[返回最终错误]
B --> |是| D{达到最大重试次数?}
D --> |是| C
D --> |否| E{退避策略}
E --> |Fixed| F[固定延迟1秒]
E --> |Exponential| G[指数退避: 1s, 2s, 4s...]
E --> |Linear| H[线性退避: 1s, 2s, 3s...]
F --> I[等待延迟]
G --> I
H --> I
I --> J[重新执行节点]
J --> K{执行成功?}
K --> |是| L[返回结果]
K --> |否| D

图表来源

错误分类 #

系统能够智能地识别和处理不同类型的错误。

graph TD
A[错误发生] --> B{错误类型}
B --> |网络错误| C[可重试]
B --> |认证错误| D[不可重试]
B --> |超时错误| C
B --> |业务逻辑错误| D
C --> E[应用重试策略]
D --> F[立即失败]

图表来源

章节来源

总结 #

LangGraphGo 的核心概念构成了一个强大而灵活的工作流引擎,其主要特点包括:

架构优势 #

  1. 模块化设计: 清晰的职责分离,便于扩展和维护
  2. 类型安全: 强类型的状态管理和事件处理
  3. 并发友好: 内置并行执行和状态同步机制
  4. 可观测性: 完整的监听器和流式输出系统

核心价值 #

  1. 容错能力: 检查点系统提供可靠的错误恢复
  2. 可调试性: 丰富的事件系统支持深度调试
  3. 可扩展性: 插件式的监听器和状态管理
  4. 性能优化: 智能的重试和背压处理

应用场景 #

LangGraphGo 特别适合以下应用场景:

通过深入理解这些核心概念,开发者可以更好地利用 LangGraphGo 构建健壮、可维护的应用程序。