状态恢复 #
本文档中引用的文件
目录 #
简介 #
langgraphgo 的状态恢复机制是一个强大的功能,允许在图执行过程中中断并恢复执行。这种机制通过 InvokeWithConfig 方法实现,支持两种主要的恢复方式:基于节点列表的恢复(ResumeFrom)和基于上下文注入的恢复(ResumeValue)。该机制为构建可中断、可恢复的复杂工作流提供了基础。
核心概念 #
GraphInterrupt 错误类型 #
状态恢复的核心是 GraphInterrupt 错误类型,它包含了执行中断时的所有必要信息:
classDiagram
class GraphInterrupt {
+string Node
+interface State
+[]string NextNodes
+interface InterruptValue
+Error() string
}
class NodeInterrupt {
+string Node
+interface Value
+Error() string
}
GraphInterrupt --> NodeInterrupt : "继承"
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L24-L41)
- [errors.go](https://github.com/smallnest/langgraphgo/blob/main/graph/errors.go#L5-L15)
配置结构 #
恢复机制依赖于 Config 结构体中的两个关键字段:
| 字段名 | 类型 | 描述 | 用途 |
|---|---|---|---|
ResumeFrom |
[]string |
指定恢复执行的起始节点列表 | 控制从哪些节点开始重新执行 |
ResumeValue |
interface{} |
提供给中断函数的恢复值 | 实现动态中断的续传 |
节来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L40-L70)
InvokeWithConfig 方法详解 #
InvokeWithConfig 是状态恢复机制的核心方法,它扩展了基本的 Invoke 方法,增加了对恢复功能的支持。
方法签名 #
func (r *Runnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)
恢复流程的关键步骤 #
flowchart TD
Start([开始执行]) --> CheckResume["检查 ResumeFrom 配置"]
CheckResume --> SetNodes["设置当前节点列表"]
SetNodes --> InjectContext["注入配置到上下文"]
InjectContext --> CheckResumeValue["检查 ResumeValue"]
CheckResumeValue --> ExecuteLoop["执行节点循环"]
ExecuteLoop --> CheckInterrupt["检查中断条件"]
CheckInterrupt --> InterruptFound{"发现中断?"}
InterruptFound --> |是| ReturnInterrupt["返回 GraphInterrupt"]
InterruptFound --> |否| ContinueExecution["继续执行"]
ContinueExecution --> CheckMoreNodes{"还有节点?"}
CheckMoreNodes --> |是| ExecuteLoop
CheckMoreNodes --> |否| ReturnResult["返回最终结果"]
ReturnInterrupt --> End([结束])
ReturnResult --> End
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L180-L491)
恢复上下文的建立 #
在方法开始时,系统会根据配置建立恢复上下文:
sequenceDiagram
participant Client as 客户端
participant Runnable as Runnable
participant Context as 上下文管理器
participant Node as 节点执行器
Client->>Runnable : InvokeWithConfig(initialState, config)
Runnable->>Context : 检查 ResumeFrom
Context-->>Runnable : 更新当前节点列表
Runnable->>Context : 注入配置到上下文
Runnable->>Context : 检查 ResumeValue
Context-->>Runnable : 设置恢复值
Runnable->>Node : 开始执行节点
Node-->>Runnable : 返回中断或结果
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L182-L202)
节来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L180-L202)
ResumeFrom 字段机制 #
ResumeFrom 字段指定了恢复执行的起始节点列表,这是状态恢复的核心机制之一。
基本工作原理 #
当 ResumeFrom 字段不为空时,系统会使用该字段指定的节点列表替换默认的入口节点:
flowchart LR
DefaultEntry["默认入口节点<br/>A"] --> ResumeFrom["ResumeFrom 配置<br/>['B', 'C']"]
ResumeFrom --> NewStart["新的执行起点<br/>同时执行 B 和 C"]
NewStart --> ParallelExec["并行执行节点"]
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L186-L189)
使用场景 #
- 部分失败恢复:从失败的节点重新开始执行
- 条件分支恢复:跳过某些不需要的分支
- 性能优化:避免重复执行已完成的节点
实际应用示例 #
参考测试代码中的使用模式:
sequenceDiagram
participant Test as 测试
participant Graph as 图实例
participant Config as 配置对象
Test->>Graph : 创建图结构
Test->>Config : 设置 ResumeFrom = ['C']
Test->>Graph : InvokeWithConfig(state, config)
Graph->>Graph : 从节点 C 开始执行
Graph-->>Test : 返回执行结果
图表来源
- [resume_test.go](https://github.com/smallnest/langgraphgo/blob/main/graph/resume_test.go#L49-L51)
节来源
- [resume_test.go](https://github.com/smallnest/langgraphgo/blob/main/graph/resume_test.go#L49-L51)
ResumeValue 字段机制 #
ResumeValue 字段通过上下文注入的方式,为动态中断提供恢复数据。
上下文管理机制 #
classDiagram
class resumeValueKey {
<<struct>>
}
class WithResumeValue {
+func(ctx Context, value interface) Context
}
class GetResumeValue {
+func(ctx Context) interface
}
resumeValueKey --> WithResumeValue : "作为键值"
resumeValueKey --> GetResumeValue : "作为键值"
图表来源
- [context.go](https://github.com/smallnest/langgraphgo/blob/main/graph/context.go#L5-L16)
中断函数的工作流程 #
flowchart TD
InterruptCall["Interrupt(ctx, value)"] --> CheckResumeValue["检查上下文中是否有 ResumeValue"]
CheckResumeValue --> HasResumeValue{"有 ResumeValue?"}
HasResumeValue --> |是| ReturnResumeValue["返回 ResumeValue"]
HasResumeValue --> |否| ReturnInterrupt["返回 NodeInterrupt 错误"]
ReturnResumeValue --> End([结束])
ReturnInterrupt --> End
图表来源
- [context.go](https://github.com/smallnest/langgraphgo/blob/main/graph/context.go#L43-L50)
动态中断的实现 #
动态中断允许节点在运行时决定是否中断执行:
sequenceDiagram
participant Node as 节点函数
participant Interrupt as Interrupt函数
participant Context as 上下文
participant Config as 配置对象
Node->>Interrupt : Interrupt(ctx, "用户输入?")
Interrupt->>Context : GetResumeValue(ctx)
Context-->>Interrupt : 返回 ResumeValue 或 nil
alt 有 ResumeValue
Interrupt-->>Node : 返回 ResumeValue
else 无 ResumeValue
Interrupt-->>Node : 返回 NodeInterrupt 错误
end
Node->>Node : 处理返回值
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L43-L50)
节来源
- [context.go](https://github.com/smallnest/langgraphgo/blob/main/graph/context.go#L43-L50)
动态中断与恢复 #
动态中断是 langgraphgo 状态恢复机制中最灵活的功能,它允许节点在运行时决定是否中断执行。
中断函数的使用 #
在节点内部,可以通过 graph.Interrupt 函数实现动态中断:
flowchart TD
NodeStart["节点开始执行"] --> CheckCondition["检查中断条件"]
CheckCondition --> NeedInterrupt{"需要中断?"}
NeedInterrupt --> |是| CallInterrupt["调用 Interrupt(ctx, value)"]
NeedInterrupt --> |否| ContinueExecution["继续执行"]
CallInterrupt --> CheckResumeValue["检查 ResumeValue"]
CheckResumeValue --> HasValue{"有恢复值?"}
HasValue --> |是| ReturnValue["返回恢复值"]
HasValue --> |否| ThrowInterrupt["抛出 GraphInterrupt"]
ReturnValue --> ContinueExecution
ContinueExecution --> NodeEnd["节点结束"]
ThrowInterrupt --> ErrorHandling["错误处理"]
图表来源
- [main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/dynamic_interrupt/main.go#L17-L30)
完整的恢复流程 #
sequenceDiagram
participant Client as 客户端
participant Graph as 图实例
participant Node as 节点函数
participant User as 用户交互
Client->>Graph : 第一次调用 Invoke
Graph->>Node : 执行节点
Node->>Node : 调用 Interrupt("What is your name?")
Node-->>Graph : 返回 GraphInterrupt
Graph-->>Client : 抛出中断错误
Client->>User : 获取用户输入
User-->>Client : 输入 Alice
Client->>Graph : 第二次调用 InvokeWithConfig(ResumeValue : "Alice")
Graph->>Node : 再次执行节点
Node->>Node : 调用 Interrupt("What is your name?")
Node-->>Node : 返回 "Alice"
Node-->>Graph : 返回最终结果
Graph-->>Client : 返回 Hello, Alice!
图表来源
- [main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/dynamic_interrupt/main.go#L40-L77)
节来源
- [main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/dynamic_interrupt/main.go#L17-L30)
完整工作流程 #
从捕获 GraphInterrupt 到成功恢复的完整流程 #
flowchart TD
StartExecution["开始执行图"] --> ExecuteNodes["执行节点"]
ExecuteNodes --> CheckInterrupt["检查中断条件"]
CheckInterrupt --> IsInterrupted{"是否中断?"}
IsInterrupted --> |否| ContinueExecution["继续执行"]
IsInterrupted --> |是| CaptureInterrupt["捕获 GraphInterrupt"]
CaptureInterrupt --> ExtractInfo["提取中断信息"]
ExtractInfo --> WaitForInput["等待用户输入"]
WaitForInput --> PrepareResume["准备恢复配置"]
PrepareResume --> ResumeExecution["恢复执行"]
ResumeExecution --> ReExecuteNodes["重新执行节点"]
ReExecuteNodes --> CheckResumeValue["检查 ResumeValue"]
CheckResumeValue --> HasResumeValue{"有恢复值?"}
HasResumeValue --> |是| SkipInterrupt["跳过中断"]
HasResumeValue --> |否| ThrowInterrupt["抛出中断错误"]
SkipInterrupt --> ContinueExecution
ContinueExecution --> MoreNodes{"还有节点?"}
MoreNodes --> |是| ExecuteNodes
MoreNodes --> |否| CompleteExecution["执行完成"]
ThrowInterrupt --> ErrorHandling["错误处理"]
CompleteExecution --> End([结束])
ErrorHandling --> End
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L180-L491)
- [main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/dynamic_interrupt/main.go#L40-L77)
错误处理机制 #
系统提供了完善的错误处理机制来处理各种中断情况:
classDiagram
class GraphInterrupt {
+string Node
+interface State
+[]string NextNodes
+interface InterruptValue
+Error() string
}
class NodeInterrupt {
+string Node
+interface Value
+Error() string
}
class ErrorHandling {
+handleGraphInterrupt(err)
+handleNodeInterrupt(err)
+propagateError(err)
}
GraphInterrupt --> ErrorHandling : "处理"
NodeInterrupt --> ErrorHandling : "处理"
图表来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L24-L41)
- [errors.go](https://github.com/smallnest/langgraphgo/blob/main/graph/errors.go#L5-L15)
节来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L24-L41)
- [errors.go](https://github.com/smallnest/langgraphgo/blob/main/graph/errors.go#L5-L15)
错误处理与恢复 #
中断类型的分类处理 #
系统对不同类型的中断采用不同的处理策略:
| 中断类型 | 处理方式 | 恢复机制 |
|---|---|---|
GraphInterrupt |
捕获并分析中断信息 | 使用 ResumeFrom 和 ResumeValue |
NodeInterrupt |
转换为 GraphInterrupt | 继续使用相同的恢复机制 |
| 其他错误 | 直接传播 | 不支持恢复 |
恢复验证机制 #
在恢复过程中,系统会验证各种状态的一致性:
flowchart TD
StartResume["开始恢复"] --> ValidateState["验证状态完整性"]
ValidateState --> StateValid{"状态有效?"}
StateValid --> |否| ErrorState["状态错误"]
StateValid --> |是| ValidateNodes["验证节点列表"]
ValidateNodes --> NodesValid{"节点有效?"}
NodesValid --> |否| ErrorNodes["节点错误"]
NodesValid --> |是| ValidateConfig["验证配置"]
ValidateConfig --> ConfigValid{"配置有效?"}
ConfigValid --> |否| ErrorConfig["配置错误"]
ConfigValid --> |是| StartExecution["开始执行"]
ErrorState --> End([结束])
ErrorNodes --> End
ErrorConfig --> End
StartExecution --> End
节来源
- [graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L24-L41)
最佳实践 #
配置设计原则 #
- 明确的恢复点:确保每个可能的中断点都有明确的恢复路径
- 状态一致性:恢复后的状态应该与中断前保持一致
- 渐进式恢复:优先恢复关键节点,再逐步恢复其他节点
错误处理建议 #
- 优雅降级:在无法恢复时提供合理的默认行为
- 日志记录:详细记录中断和恢复过程以便调试
- 超时控制:为长时间运行的恢复操作设置超时限制
性能优化 #
- 选择性恢复:只恢复必要的节点,避免不必要的计算
- 状态缓存:合理使用检查点机制减少重复计算
- 并发控制:在恢复过程中合理控制并发度
总结 #
langgraphgo 的状态恢复机制通过 InvokeWithConfig 方法实现了灵活而强大的执行恢复功能。该机制的核心包括:
- ResumeFrom 字段:精确控制恢复执行的起始节点
- ResumeValue 字段:通过上下文注入实现动态中断的续传
- GraphInterrupt 错误类型:提供完整的中断信息和恢复指导
- Interrupt 函数:支持节点级别的动态中断决策
这种设计使得开发者能够构建健壮、可恢复的复杂工作流,为生产环境中的可靠性需求提供了坚实的基础。通过合理使用这些机制,可以实现从简单的节点中断到复杂的分布式系统恢复的各种应用场景。