检查点持久化示例 #
本文档中引用的文件
- examples/checkpointing/main.go
- examples/checkpointing/README.md
- checkpoint/postgres/postgres.go
- checkpoint/redis/redis.go
- checkpoint/sqlite/sqlite.go
- examples/checkpointing/postgres/main.go
- examples/checkpointing/redis/main.go
- examples/checkpointing/sqlite/main.go
- graph/checkpointing.go
- checkpoint/postgres/postgres_test.go
- checkpoint/redis/redis_test.go
- checkpoint/sqlite/sqlite_test.go
目录 #
简介 #
LangGraphGo 的检查点持久化功能为复杂、长时间运行或关键应用程序提供了强大的状态管理能力。该功能解决了以下核心问题:
- 容错性:当应用程序崩溃时,可以从最后保存的状态恢复执行
- 人机交互:支持暂停执行,等待人工输入(可能需要几天时间),然后继续
- 时间旅行:可以检查过去的状态(“第三步发生了什么?”)或从之前的点分叉执行
检查点持久化通过将图的状态保存到持久化存储中,确保了即使在系统故障或重启后也能保持业务连续性。
核心概念 #
检查点存储接口 #
所有检查点存储都实现了统一的 CheckpointStore 接口:
classDiagram
class CheckpointStore {
<<interface>>
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint, error
+List(ctx, executionID) []*Checkpoint, error
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
class MemoryCheckpointStore {
-checkpoints map[string]*Checkpoint
-mutex sync.RWMutex
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint, error
+List(ctx, executionID) []*Checkpoint, error
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
class PostgresCheckpointStore {
-pool DBPool
-tableName string
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint, error
+List(ctx, executionID) []*Checkpoint, error
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
+InitSchema(ctx) error
+Close()
}
class RedisCheckpointStore {
-client *redis.Client
-prefix string
-ttl time.Duration
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint, error
+List(ctx, executionID) []*Checkpoint, error
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
class SqliteCheckpointStore {
-db *sql.DB
-tableName string
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint, error
+List(ctx, executionID) []*Checkpoint, error
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
+InitSchema(ctx) error
+Close() error
}
CheckpointStore <|.. MemoryCheckpointStore
CheckpointStore <|.. PostgresCheckpointStore
CheckpointStore <|.. RedisCheckpointStore
CheckpointStore <|.. SqliteCheckpointStore
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L22-L38)
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L22-L26)
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L13-L18)
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L13-L17)
检查点数据结构 #
检查点包含以下核心信息:
classDiagram
class Checkpoint {
+string ID
+string NodeName
+interface State
+map[string]interface Metadata
+time.Time Timestamp
+int Version
}
class CheckpointConfig {
+CheckpointStore Store
+bool AutoSave
+time.Duration SaveInterval
+int MaxCheckpoints
}
Checkpoint --> CheckpointConfig : "配置"
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L12-L20)
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L188-L201)
章节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L12-L20)
- [examples/checkpointing/README.md](https://github.com/smallnest/langgraphgo/blob/main/examples/checkpointing/README.md#L12-L20)
架构概览 #
LangGraphGo 的检查点持久化架构采用分层设计,提供了灵活的存储后端选择:
graph TB
subgraph "应用层"
A[CheckpointableRunnable]
B[CheckpointableMessageGraph]
end
subgraph "配置层"
C[CheckpointConfig]
D[CheckpointListener]
end
subgraph "存储抽象层"
E[CheckpointStore接口]
end
subgraph "存储实现层"
F[MemoryCheckpointStore]
G[PostgresCheckpointStore]
H[RedisCheckpointStore]
I[SqliteCheckpointStore]
end
subgraph "外部存储"
J[(内存)]
K[(PostgreSQL)]
L[(Redis)]
M[(SQLite文件)]
end
A --> C
B --> A
C --> D
D --> E
E --> F
E --> G
E --> H
E --> I
F --> J
G --> K
H --> L
I --> M
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L213-L228)
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L336-L340)
内存存储实现 #
内存存储是最简单的实现,适用于测试和开发环境。
实现特点 #
- 瞬态存储:数据仅存在于内存中,进程重启后丢失
- 高性能:无磁盘 I/O 开销
- 简单配置:无需额外依赖
使用场景 #
- 单次执行的应用程序
- 测试和调试
- 快速原型开发
配置示例 #
内存存储作为默认选项,无需特殊配置:
config := graph.CheckpointConfig{
Store: graph.NewMemoryCheckpointStore(),
AutoSave: true,
SaveInterval: 2 * time.Second,
MaxCheckpoints: 5,
}
章节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L40-L111)
- [examples/checkpointing/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/checkpointing/main.go#L22-L27)
PostgreSQL 存储实现 #
PostgreSQL 存储提供生产级别的持久化解决方案,具有高可靠性和事务支持。
数据库架构 #
PostgreSQL 存储使用 JSONB 类型存储状态数据,支持复杂的查询和索引:
erDiagram
CHECKPOINTS {
string id PK
string execution_id FK
string node_name
jsonb state
jsonb metadata
timestamptz timestamp
integer version
}
EXECUTION_INDEX {
string execution_id
timestamp created_at
}
CHECKPOINTS ||--o{ EXECUTION_INDEX : "indexed by"
图表来源
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L66-L76)
连接配置 #
PostgreSQL 存储支持丰富的配置选项:
| 配置项 | 类型 | 默认值 | 描述 |
|---|---|---|---|
| ConnString | string | 必需 | PostgreSQL 连接字符串 |
| TableName | string | “checkpoints” | 表名 |
| PoolSize | int | 10 | 连接池大小 |
性能特性 #
- 并发安全:使用连接池处理并发访问
- 事务支持:确保检查点操作的原子性
- 索引优化:按执行 ID 建立索引,加速查询
- 自动重试:内置连接失败重试机制
初始化流程 #
sequenceDiagram
participant App as 应用程序
participant Store as PostgresStore
participant DB as PostgreSQL
App->>Store : NewPostgresCheckpointStore()
Store->>DB : 创建连接池
DB-->>Store : 连接池实例
Store->>Store : InitSchema()
Store->>DB : CREATE TABLE IF NOT EXISTS
DB-->>Store : 表创建成功
Store-->>App : 存储实例
图表来源
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L35-L50)
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L65-L84)
错误处理策略 #
PostgreSQL 存储实现了完善的错误处理机制:
- 连接失败:自动重试连接
- 查询超时:设置合理的超时时间
- 数据损坏:提供数据验证和修复机制
章节来源
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L28-L50)
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L65-L84)
- [examples/checkpointing/postgres/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/checkpointing/postgres/main.go#L21-L46)
Redis 存储实现 #
Redis 存储提供高性能的内存数据存储,适合对响应时间要求极高的场景。
数据结构设计 #
Redis 存储使用键值对和集合数据结构:
graph LR
subgraph "Redis 数据结构"
A["checkpoint:{id}"] --> B["JSON序列化的检查点"]
C["execution:{execId}:checkpoints"] --> D["Set 包含所有检查点ID"]
end
subgraph "TTL 支持"
E["可选过期时间"] --> F["自动清理旧检查点"]
end
图表来源
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L49-L55)
配置选项 #
| 配置项 | 类型 | 默认值 | 描述 |
|---|---|---|---|
| Addr | string | “localhost:6379” | Redis 服务器地址 |
| Password | string | “” | 认证密码 |
| DB | int | 0 | 数据库编号 |
| Prefix | string | “langgraph:” | 键前缀 |
| TTL | time.Duration | 0 | 过期时间(0表示永不过期) |
性能优势 #
- 极低延迟:纯内存操作,毫秒级响应
- 批量操作:使用管道(Pipeline)提高吞吐量
- 原子性:支持事务操作
- 高并发:Redis 天然支持高并发访问
数据持久化 #
虽然 Redis 是内存存储,但可以通过以下方式保证数据持久化:
flowchart TD
A[检查点写入] --> B{启用RDB?}
B --> |是| C[RDB快照]
B --> |否| D{启用AOF?}
D --> |是| E[AOF日志]
D --> |否| F[纯内存存储]
C --> G[定期快照]
E --> H[实时日志]
F --> I[内存存储]
章节来源
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L20-L47)
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L58-L84)
- [examples/checkpointing/redis/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/checkpointing/redis/main.go#L31-L47)
SQLite 存储实现 #
SQLite 存储提供轻量级的本地文件数据库解决方案,适合桌面应用和简单部署。
文件数据库架构 #
SQLite 存储将所有检查点数据保存在单个文件中:
erDiagram
CHECKPOINTS {
string id PK
string execution_id
string node_name
text state
text metadata
datetime timestamp
integer version
}
INDEX_EXECUTION_ID {
string execution_id
integer __unused__
}
CHECKPOINTS ||--o{ INDEX_EXECUTION_ID : "索引"
图表来源
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L52-L62)
配置参数 #
| 参数 | 类型 | 默认值 | 描述 |
|---|---|---|---|
| Path | string | 必需 | SQLite 数据库文件路径 |
| TableName | string | “checkpoints” | 表名 |
优势特点 #
- 零配置:无需安装数据库服务器
- 便携性:整个数据库是一个文件
- 跨平台:支持所有主流操作系统
- ACID 保证:完整的事务支持
文件管理 #
SQLite 存储提供了完整的文件生命周期管理:
stateDiagram-v2
[*] --> 初始化
初始化 --> 创建表 : InitSchema()
创建表 --> 就绪 : 表创建成功
就绪 --> 写入 : Save()
就绪 --> 读取 : Load/List()
就绪 --> 删除 : Delete/Clear()
写入 --> 就绪
读取 --> 就绪
删除 --> 就绪
就绪 --> 关闭 : Close()
关闭 --> [*]
章节来源
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L19-L48)
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L50-L70)
- [examples/checkpointing/sqlite/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/checkpointing/sqlite/main.go#L31-L47)
配置与使用指南 #
基本配置模式 #
所有存储后端都遵循相同的配置模式:
flowchart TD
A[创建图形] --> B[选择存储后端]
B --> C[配置连接参数]
C --> D[初始化存储]
D --> E[设置检查点配置]
E --> F[编译可检查点运行器]
F --> G[开始执行]
B --> H[内存存储]
B --> I[PostgreSQL存储]
B --> J[Redis存储]
B --> K[SQLite存储]
自动保存配置 #
检查点配置支持多种保存策略:
| 配置项 | 类型 | 描述 |
|---|---|---|
| AutoSave | bool | 是否在每个节点后自动保存 |
| SaveInterval | time.Duration | 自动保存的时间间隔 |
| MaxCheckpoints | int | 最大保留的检查点数量 |
手动检查点控制 #
除了自动保存,还可以手动控制检查点:
// 手动保存检查点
err := runnable.SaveCheckpoint(ctx, "node-name", state)
// 列出所有检查点
checkpoints, err := runnable.ListCheckpoints(ctx)
// 从特定检查点恢复
resumedState, err := runnable.ResumeFromCheckpoint(ctx, checkpointID)
状态一致性验证 #
为了确保状态一致性,建议实施以下验证机制:
sequenceDiagram
participant App as 应用程序
participant Store as 存储
participant Validator as 验证器
App->>Store : 加载检查点
Store-->>App : 检查点数据
App->>Validator : 验证状态完整性
Validator->>Validator : 检查数据类型
Validator->>Validator : 验证业务规则
Validator-->>App : 验证结果
alt 验证成功
App->>App : 继续执行
else 验证失败
App->>App : 报告错误或回滚
end
章节来源
- [examples/checkpointing/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/checkpointing/main.go#L17-L27)
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L254-L295)
性能对比分析 #
性能指标对比 #
| 存储类型 | 启动时间 | 内存占用 | 并发性能 | 持久化可靠性 | 适用场景 |
|---|---|---|---|---|---|
| 内存存储 | 极快 | 低 | 高 | 无 | 测试、临时任务 |
| PostgreSQL | 中等 | 中等 | 极高 | 极高 | 生产环境、高并发 |
| Redis | 快 | 低 | 极高 | 高 | 缓存、快速访问 |
| SQLite | 快 | 低 | 中等 | 高 | 桌面应用、简单部署 |
延迟分析 #
graph LR
subgraph "延迟分布"
A[内存存储: <1ms] --> B[SQLite: 1-5ms]
B --> C[Redis: 1-10ms]
C --> D[PostgreSQL: 5-50ms]
end
subgraph "影响因素"
E[网络延迟] --> F[磁盘I/O]
F --> G[连接池开销]
G --> H[序列化开销]
end
可扩展性评估 #
不同存储的可扩展性特征:
- 内存存储:水平扩展困难,适合单实例
- PostgreSQL:支持主从复制,可水平扩展
- Redis:支持集群模式,天然分布式
- SQLite:单文件限制,难以水平扩展
成本效益分析 #
| 成本维度 | 内存存储 | PostgreSQL | Redis | SQLite |
|---|---|---|---|---|
| 硬件成本 | 低 | 中等 | 中等 | 低 |
| 维护成本 | 低 | 高 | 中等 | 低 |
| 学习成本 | 低 | 高 | 中等 | 低 |
| 性能收益 | 极高 | 极高 | 最高 | 高 |
故障排除指南 #
常见问题诊断 #
连接问题 #
flowchart TD
A[连接失败] --> B{检查网络}
B --> |网络正常| C{检查认证}
B --> |网络异常| D[修复网络连接]
C --> |认证正确| E{检查权限}
C --> |认证错误| F[更新认证信息]
E --> |权限不足| G[授予必要权限]
E --> |权限正常| H[检查配置]
D --> I[重新尝试连接]
F --> I
G --> I
H --> I
数据一致性问题 #
常见的数据不一致情况及解决方案:
| 问题类型 | 症状 | 解决方案 |
|---|---|---|
| 数据丢失 | 检查点无法加载 | 检查存储空间和权限 |
| 数据损坏 | JSON反序列化失败 | 验证数据完整性 |
| 版本冲突 | 并发写入冲突 | 实施乐观锁机制 |
| 索引失效 | 查询性能下降 | 重建相关索引 |
监控指标 #
建议监控以下关键指标:
graph TB
subgraph "性能指标"
A[响应时间]
B[吞吐量]
C[错误率]
end
subgraph "资源指标"
D[CPU使用率]
E[内存使用率]
F[磁盘I/O]
end
subgraph "业务指标"
G[检查点数量]
H[恢复成功率]
I[并发连接数]
end
A --> J[告警阈值]
B --> J
C --> J
D --> J
E --> J
F --> J
G --> J
H --> J
I --> J
日志记录策略 #
建议实施分级日志记录:
- DEBUG:详细的执行跟踪和状态变化
- INFO:重要的状态转换和检查点事件
- WARN:潜在的问题和异常情况
- ERROR:严重的错误和故障
章节来源
- [checkpoint/postgres/postgres_test.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres_test.go#L15-L53)
- [checkpoint/redis/redis_test.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis_test.go#L13-L89)
- [checkpoint/sqlite/sqlite_test.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite_test.go#L12-L83)
最佳实践建议 #
存储选择指南 #
根据具体需求选择合适的存储后端:
flowchart TD
A[选择存储后端] --> B{是否需要持久化?}
B --> |否| C[内存存储]
B --> |是| D{性能要求}
D --> |极高| E{是否有Redis?}
D --> |高| F{是否有PostgreSQL?}
D --> |一般| G{是否需要分布式?}
E --> |是| H[Redis存储]
E --> |否| I{是否需要事务?}
F --> |是| J[PostgreSQL存储]
F --> |否| K{是否需要便携性?}
G --> |是| J
G --> |否| L[SQLite存储]
I --> |是| J
I --> |否| M[内存存储]
K --> |是| L
K --> |否| N[其他考虑]
配置优化建议 #
PostgreSQL 配置优化 #
// 生产环境推荐配置
opts := postgres.PostgresOptions{
ConnString: "postgres://user:pass@host:5432/db?sslmode=disable",
TableName: "checkpoints",
}
store, err := postgres.NewPostgresCheckpointStore(ctx, opts)
if err != nil {
// 错误处理
}
// 设置连接池大小
poolConfig := pgxpool.Config{
MaxConns: 20,
MinConns: 5,
}
Redis 配置优化 #
// 高性能配置
opts := redis.RedisOptions{
Addr: "localhost:6379",
Prefix: "langgraph:",
TTL: 24 * time.Hour, // 自动清理
}
SQLite 配置优化 #
// 移动设备优化
opts := sqlite.SqliteOptions{
Path: "/data/checkpoints.db",
TableName: "checkpoints",
}
安全考虑 #
数据加密 #
对于敏感数据,建议实施以下加密措施:
- 传输加密:使用 SSL/TLS 连接
- 存储加密:在应用层面加密敏感字段
- 访问控制:严格的权限管理
备份策略 #
graph LR
A[定期备份] --> B[本地备份]
A --> C[远程备份]
A --> D[增量备份]
B --> E[监控备份完整性]
C --> E
D --> E
E --> F[恢复测试]
F --> G[备份策略调整]
监控和告警 #
实施全面的监控体系:
- 健康检查:定期检查存储可用性
- 性能监控:跟踪响应时间和吞吐量
- 容量监控:监控存储空间使用情况
- 错误监控:及时发现和处理错误
总结 #
LangGraphGo 的检查点持久化功能提供了灵活而强大的状态管理解决方案。通过支持多种存储后端,开发者可以根据具体需求选择最适合的实现:
主要优势 #
- 统一接口:所有存储后端都实现相同的接口,便于切换和测试
- 灵活配置:支持自动保存、手动保存等多种保存策略
- 状态恢复:完整的检查点加载和恢复机制
- 扩展性:支持从单机到分布式的各种部署场景
技术选型建议 #
- 开发测试:优先使用内存存储
- 生产环境:推荐 PostgreSQL 或 Redis
- 桌面应用:SQLite 是理想选择
- 微服务架构:考虑使用分布式缓存如 Redis
未来发展方向 #
随着容器化和云原生技术的发展,检查点持久化功能将继续演进:
- 云存储集成:支持 AWS S3、Azure Blob Storage 等
- 流式处理:支持大规模数据流的检查点
- 机器学习:为 AI 工作流提供专门的检查点优化
- 多模态存储:结合不同存储的优势,提供混合存储方案
通过合理选择和配置检查点持久化功能,开发者可以构建更加可靠、可维护和高性能的应用系统。