重试执行流程 #

本文档中引用的文件

目录 #

  1. 简介
  2. 核心组件架构
  3. RetryNode 结构体详解
  4. StateRunnable 执行机制
  5. 重试策略配置
  6. 执行流程时序图
  7. 错误处理与恢复
  8. 性能考虑
  9. 最佳实践
  10. 总结

简介 #

langgraphgo 提供了强大的重试机制,确保图执行过程中的节点操作能够应对临时性故障。该系统通过两个主要组件实现:RetryNode 结构体用于包装单个节点的重试逻辑,以及 StateRunnable 中的 executeNodeWithRetry 方法用于图级别的重试控制。

重试机制的核心特点包括:

核心组件架构 #

classDiagram
class RetryNode {
+Node node
+RetryConfig config
+Execute(ctx, state) (interface, error)
}
class RetryConfig {
+int MaxAttempts
+time.Duration InitialDelay
+time.Duration MaxDelay
+float64 BackoffFactor
+func(error) bool RetryableErrors
}
class StateRunnable {
+StateGraph graph
+executeNodeWithRetry(ctx, node, state) (interface, error)
+isRetryableError(err) bool
+calculateBackoffDelay(attempt) time.Duration
}
class RetryPolicy {
+int MaxRetries
+BackoffStrategy BackoffStrategy
+[]string RetryableErrors
}
class Node {
+string Name
+func(ctx, state) (interface, error) Function
}
RetryNode --> RetryConfig : "使用"
RetryNode --> Node : "包装"
StateRunnable --> RetryPolicy : "应用"
StateRunnable --> Node : "执行"
StateGraph --> RetryPolicy : "配置"

图表来源

RetryNode 结构体详解 #

结构定义 #

RetryNode 是一个轻量级的包装器,负责为单个节点添加重试功能:

classDiagram
class RetryNode {
-Node node
-RetryConfig config
+Execute(ctx, state) (interface, error)
}
class RetryConfig {
+int MaxAttempts
+time.Duration InitialDelay
+time.Duration MaxDelay
+float64 BackoffFactor
+func(error) bool RetryableErrors
}
RetryNode --> RetryConfig : "包含"

图表来源

Execute 方法实现 #

Execute 方法是重试逻辑的核心,实现了完整的重试循环:

flowchart TD
Start([开始执行]) --> InitVars["初始化变量<br/>lastErr = nil<br/>delay = InitialDelay"]
InitVars --> LoopStart["循环开始<br/>attempt = 1 到 MaxAttempts"]
LoopStart --> CheckCancel["检查上下文取消<br/>select {<-ctx.Done()}"]
CheckCancel --> Cancelled{"上下文已取消?"}
Cancelled --> |是| ReturnCancelled["返回取消错误"]
Cancelled --> |否| ExecuteNode["执行节点函数<br/>result, err = node.Function()"]
ExecuteNode --> Success{"执行成功?"}
Success --> |是| ReturnResult["返回结果"]
Success --> |否| UpdateError["更新 lastErr = err"]
UpdateError --> CheckRetryable["检查是否可重试<br/>RetryableErrors(err)"]
CheckRetryable --> IsRetryable{"可重试?"}
IsRetryable --> |否| ReturnNonRetryable["返回非重试错误"]
IsRetryable --> |是| CheckLastAttempt["检查是否最后尝试<br/>attempt < MaxAttempts"]
CheckLastAttempt --> |否| ReturnMaxRetries["返回最大重试错误"]
CheckLastAttempt --> |是| Delay["等待退避延迟<br/>time.After(delay)"]
Delay --> CalcNextDelay["计算下一次延迟<br/>delay = delay * BackoffFactor"]
CalcNextDelay --> CheckMaxDelay["检查最大延迟限制<br/>delay > MaxDelay"]
CheckMaxDelay --> ClampDelay["限制延迟<br/>delay = MaxDelay"]
ClampDelay --> LoopStart
ReturnResult --> End([结束])
ReturnCancelled --> End
ReturnNonRetryable --> End
ReturnMaxRetries --> End

图表来源

节来源

关键特性 #

  1. 上下文感知:在每次重试前检查 ctx.Done(),确保及时响应取消信号
  2. 指数退避:支持可配置的退避因子,实现指数增长的延迟
  3. 延迟限制:防止延迟无限增长,设置最大延迟上限
  4. 可插拔错误判断:允许用户自定义哪些错误应该触发重试
  5. 优雅降级:当达到最大重试次数时提供详细的错误信息

StateRunnable 执行机制 #

图级别重试控制 #

StateRunnable 实现了图级别的重试策略,与 RetryNode 形成互补:

sequenceDiagram
participant Graph as StateGraph
participant Runnable as StateRunnable
participant Node as Node
participant Policy as RetryPolicy
Graph->>Runnable : Invoke(ctx, state)
Runnable->>Runnable : 获取入口节点
loop 并行执行所有活跃节点
Runnable->>Node : executeNodeWithRetry(ctx, node, state)
Node->>Node : 执行节点函数
alt 执行成功
Node-->>Runnable : 返回结果
else 执行失败
Node->>Policy : 检查错误类型
Policy-->>Node : 返回可重试性
alt 可重试且未达最大重试
Node->>Node : 计算退避延迟
Node->>Node : 等待延迟
Node->>Node : 继续重试
else 不可重试或已达最大重试
Node-->>Runnable : 返回错误
end
end
end
Runnable-->>Graph : 返回最终状态或错误

图表来源

executeNodeWithRetry 方法 #

该方法实现了图级别的重试逻辑:

flowchart TD
Start([开始执行]) --> GetMaxRetries["获取最大重试次数<br/>maxRetries = 1 (默认)<br/>maxRetries = policy.MaxRetries + 1"]
GetMaxRetries --> LoopStart["循环: attempt = 0 到 maxRetries-1"]
LoopStart --> Execute["执行节点函数<br/>result, err = node.Function()"]
Execute --> Success{"执行成功?"}
Success --> |是| ReturnSuccess["返回结果"]
Success --> |否| UpdateError["更新 lastErr = err"]
UpdateError --> CheckPolicy{"有重试策略?"}
CheckPolicy --> |否| BreakLoop["跳出循环"]
CheckPolicy --> |是| CheckAttempt{"未达最大重试?"}
CheckAttempt --> |否| BreakLoop
CheckAttempt --> |是| CheckRetryable["检查是否可重试<br/>isRetryableError(err)"]
CheckRetryable --> IsRetryable{"可重试?"}
IsRetryable --> |否| BreakLoop
IsRetryable --> |是| CalcDelay["计算退避延迟<br/>calculateBackoffDelay(attempt)"]
CalcDelay --> HasDelay{"有延迟?"}
HasDelay --> |否| Continue["继续重试"]
HasDelay --> |是| WaitDelay["等待延迟<br/>time.After(delay)"]
WaitDelay --> CheckContext["检查上下文<br/><-ctx.Done()"]
CheckContext --> ContextDone{"上下文已取消?"}
ContextDone --> |是| ReturnContextError["返回上下文错误"]
ContextDone --> |否| Continue
Continue --> LoopStart
BreakLoop --> ReturnError["返回 lastErr"]
ReturnSuccess --> End([结束])
ReturnContextError --> End
ReturnError --> End

图表来源

节来源

Backoff 策略实现 #

StateRunnable 支持三种退避策略:

策略类型 计算公式 特点
FixedBackoff baseDelay 固定间隔,简单可靠
ExponentialBackoff baseDelay * 2^attempt 指数增长,适合网络故障
LinearBackoff baseDelay * (attempt + 1) 线性增长,平衡性能

节来源

重试策略配置 #

RetryConfig 配置选项 #

RetryConfig 提供了细粒度的重试行为控制:

字段 类型 默认值 描述
MaxAttempts int 3 最大重试次数
InitialDelay time.Duration 100ms 初始延迟时间
MaxDelay time.Duration 5s 最大延迟时间
BackoffFactor float64 2.0 退避因子
RetryableErrors func(error) bool 全部重试 错误过滤函数

RetryPolicy 图级别配置 #

RetryPolicy 提供图级别的重试策略:

字段 类型 描述
MaxRetries int 最大重试次数
BackoffStrategy BackoffStrategy 退避策略类型
RetryableErrors []string 可重试错误模式列表

节来源

执行流程时序图 #

节点级重试流程 #

sequenceDiagram
participant Client as 客户端
participant RetryNode as RetryNode
participant Node as 原始节点
participant Timer as 时间控制器
Client->>RetryNode : Execute(ctx, state)
RetryNode->>RetryNode : 初始化变量
loop 重试循环 (1 到 MaxAttempts)
RetryNode->>RetryNode : 检查上下文取消
alt 上下文已取消
RetryNode-->>Client : 返回取消错误
else 正常执行
RetryNode->>Node : 执行节点函数
Node-->>RetryNode : 返回结果或错误
alt 执行成功
RetryNode-->>Client : 返回结果
else 执行失败
RetryNode->>RetryNode : 更新 lastErr
alt 错误不可重试
RetryNode-->>Client : 返回非重试错误
else 错误可重试
alt 非最后尝试
RetryNode->>Timer : 等待退避延迟
Timer-->>RetryNode : 延迟完成
RetryNode->>RetryNode : 计算下一次延迟
end
end
end
end
end
RetryNode-->>Client : 返回最大重试错误

图表来源

图级别重试流程 #

sequenceDiagram
participant Client as 客户端
participant StateGraph as StateGraph
participant StateRunnable as StateRunnable
participant Node as 节点
participant Policy as 重试策略
Client->>StateGraph : SetRetryPolicy(policy)
Client->>StateGraph : Compile()
StateGraph-->>StateRunnable : 创建可运行实例
Client->>StateRunnable : Invoke(ctx, initialState)
StateRunnable->>StateRunnable : 获取入口节点
loop 并行执行阶段
par 节点1
StateRunnable->>Node : executeNodeWithRetry(ctx, node1, state)
Node->>Policy : 检查错误类型
Policy-->>Node : 返回可重试性
Node-->>StateRunnable : 返回结果或错误
and 节点2
StateRunnable->>Node : executeNodeWithRetry(ctx, node2, state)
Node-->>StateRunnable : 返回结果或错误
end
end
alt 所有节点成功
StateRunnable-->>Client : 返回最终状态
else 存在失败节点
StateRunnable-->>Client : 返回错误
end

图表来源

错误处理与恢复 #

错误分类机制 #

系统实现了多层次的错误处理:

flowchart TD
Error[节点执行错误] --> CheckRetryable["检查 RetryableErrors 函数"]
CheckRetryable --> IsRetryable{"可重试?"}
IsRetryable --> |否| NonRetryableError["非重试错误"]
IsRetryable --> |是| CheckMaxRetries["检查最大重试次数"]
CheckMaxRetries --> ReachedMax{"达到最大重试?"}
ReachedMax --> |是| MaxRetriesError["最大重试错误"]
ReachedMax --> |否| RetryAttempt["执行重试尝试"]
NonRetryableError --> LogError["记录错误日志"]
MaxRetriesError --> LogError
RetryAttempt --> WaitBackoff["等待退避延迟"]
WaitBackoff --> ExecuteAgain["重新执行节点"]
LogError --> ReturnError["返回错误"]
ExecuteAgain --> CheckContext["检查上下文取消"]
CheckContext --> ContextCancelled{"上下文已取消?"}
ContextCancelled --> |是| ReturnCancelled["返回取消错误"]
ContextCancelled --> |否| CheckRetryable

图表来源

上下文取消处理 #

系统提供了完整的上下文取消支持:

  1. 实时检查:在每个重试循环中检查 ctx.Done()
  2. 优雅退出:检测到取消后立即停止重试
  3. 错误传播:正确传播取消原因和上下文信息
  4. 资源清理:确保取消时进行必要的资源清理

节来源

性能考虑 #

退避策略优化 #

不同的退避策略对性能的影响:

策略 延迟增长 CPU 使用 网络负载 适用场景
Fixed 线性 稳定 硬件故障
Exponential 指数 中等 波动 网络拥塞
Linear 线性 中等 渐进 平衡需求

内存使用优化 #

  1. 延迟队列管理:及时清理过期的调用记录
  2. 错误信息缓存:避免重复创建相同的错误消息
  3. 上下文传递:最小化上下文数据的复制

并发控制 #

系统通过以下方式优化并发性能:

  1. 并行节点执行:支持多个节点同时重试
  2. 选择器模式:使用 select 实现非阻塞上下文检查
  3. goroutine 管理:合理控制并发 goroutine 数量

最佳实践 #

配置建议 #

  1. 初始延迟:根据服务响应时间设置,通常为几百毫秒
  2. 最大重试次数:根据业务容忍度设置,一般不超过 5 次
  3. 退避因子:指数退避因子通常设置为 2.0
  4. 最大延迟:设置合理的上限,避免无限等待

错误处理策略 #

  1. 区分错误类型:只对临时性错误启用重试
  2. 自定义重试条件:根据具体业务场景定义重试规则
  3. 监控和告警:跟踪重试频率和成功率
  4. 降级策略:重试失败时提供备选方案

测试建议 #

  1. 单元测试:验证各种错误场景下的重试行为
  2. 集成测试:测试整个图的重试流程
  3. 压力测试:评估高并发情况下的重试性能
  4. 故障注入:模拟各种故障场景

节来源

总结 #

langgraphgo 的重试机制通过 RetryNodeStateRunnable 的协同工作,提供了强大而灵活的错误恢复能力。该系统的主要优势包括:

  1. 双层重试控制:节点级和图级的重试策略相互补充
  2. 灵活的配置选项:支持多种退避策略和错误过滤机制
  3. 完善的上下文支持:确保重试过程中的取消和超时处理
  4. 高性能设计:通过并行执行和优化算法保证系统性能

通过合理配置和使用这些重试机制,开发者可以构建更加健壮和可靠的图执行系统,有效应对各种临时性故障和异常情况。