检查点存储 #
本文档中引用的文件
- checkpoint/postgres/postgres.go
- checkpoint/redis/redis.go
- checkpoint/sqlite/sqlite.go
- graph/checkpointing.go
- examples/checkpointing/main.go
- examples/checkpointing/postgres/main.go
- examples/checkpointing/redis/main.go
- examples/checkpointing/sqlite/main.go
- examples/checkpointing/README.md
- checkpoint/postgres/postgres_test.go
- checkpoint/redis/redis_test.go
- checkpoint/sqlite/sqlite_test.go
目录 #
简介 #
检查点存储是 LangGraphGo 中实现持久化、状态恢复和长时间运行工作流的关键机制。它允许应用程序在执行过程中保存状态快照,在系统崩溃或重启后能够从中断点恢复执行,同时支持人类介入、时间旅行调试等多种高级功能。
检查点存储解决了以下核心问题:
- 故障容错:当应用程序意外终止时,可以从最近的检查点恢复
- 人类介入:支持长时间等待人工输入后继续执行
- 状态管理:维护复杂工作流的状态历史
- 并发控制:确保多个执行实例之间的状态隔离
核心概念 #
检查点结构 #
检查点存储的核心数据结构包含以下关键字段:
classDiagram
class Checkpoint {
+string ID
+string NodeName
+interface State
+map[string]interface Metadata
+time.Time Timestamp
+int Version
}
class CheckpointStore {
<<interface>>
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint
+List(ctx, executionID) []*Checkpoint
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
class CheckpointConfig {
+CheckpointStore Store
+bool AutoSave
+time.Duration SaveInterval
+int MaxCheckpoints
}
CheckpointStore --> Checkpoint : "manages"
CheckpointConfig --> CheckpointStore : "configures"
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L12-L20)
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L22-L38)
执行标识符 #
每个检查点都与一个唯一的执行标识符(execution ID)关联,用于:
- 隔离不同执行实例的状态
- 支持多线程并发执行
- 实现状态查询和管理
自动保存机制 #
检查点存储支持两种保存模式:
- 自动保存:每次节点执行完成后自动保存
- 定时保存:按指定间隔定期保存
章节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L12-L20)
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L188-L201)
检查点存储架构 #
接口设计 #
检查点存储采用统一的接口设计,确保不同后端实现的一致性:
sequenceDiagram
participant App as 应用程序
participant Graph as 图执行器
participant Store as 检查点存储
participant DB as 后端数据库
App->>Graph : 设置检查点配置
Graph->>Store : 初始化存储
loop 执行过程
Graph->>Store : Save(checkpoint)
Store->>DB : 存储检查点数据
DB-->>Store : 确认保存
Store-->>Graph : 保存成功
alt 自动保存启用
Graph->>Store : 自动保存触发
end
end
App->>Store : Load(checkpointID)
Store->>DB : 查询检查点
DB-->>Store : 返回检查点数据
Store-->>App : 返回完整检查点
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L22-L38)
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L230-L295)
状态恢复流程 #
flowchart TD
Start([开始恢复]) --> LoadCP["加载指定检查点"]
LoadCP --> ValidateCP{"检查点存在?"}
ValidateCP --> |否| Error["返回错误"]
ValidateCP --> |是| ExtractState["提取状态数据"]
ExtractState --> RestoreState["恢复到图状态"]
RestoreState --> ResumeExecution["从检查点节点继续执行"]
ResumeExecution --> Complete([恢复完成])
Error --> End([结束])
Complete --> End
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L279-L295)
章节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L22-L38)
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L230-L295)
支持的后端存储 #
PostgreSQL 存储 #
PostgreSQL 是生产环境推荐的检查点存储解决方案,提供强一致性和企业级功能。
特性优势 #
- 强一致性:ACID事务保证数据完整性
- 高可用性:支持主从复制和集群部署
- 复杂查询:支持复杂的过滤和排序操作
- 索引优化:为执行ID和时间戳建立索引
数据库表结构 #
PostgreSQL存储使用以下表结构:
| 字段名 | 类型 | 约束 | 描述 |
|---|---|---|---|
| id | TEXT | PRIMARY KEY | 检查点唯一标识符 |
| execution_id | TEXT | NOT NULL | 执行实例标识符 |
| node_name | TEXT | NOT NULL | 触发检查点的节点名称 |
| state | JSONB | NOT NULL | 节点状态数据 |
| metadata | JSONB | 元数据信息(如执行ID等) | |
| timestamp | TIMESTAMPTZ | NOT NULL | 创建时间戳 |
| version | INTEGER | NOT NULL | 版本号 |
连接配置 #
// PostgreSQL连接选项配置
type PostgresOptions struct {
ConnString string // 连接字符串
TableName string // 表名,默认"checkpoints"
}
性能特点 #
- 写入性能:支持批量插入和更新
- 读取性能:通过索引快速定位特定执行的检查点
- 存储效率:JSONB格式压缩存储状态数据
章节来源
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L28-L32)
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L64-L84)
Redis 存储 #
Redis 提供高性能的内存存储,适用于需要快速访问和临时数据存储的场景。
特性优势 #
- 超高性能:基于内存的存储,毫秒级响应时间
- 数据过期:内置TTL机制自动清理过期数据
- 原子操作:支持事务和管道操作
- 多种数据结构:支持集合、哈希等复杂数据类型
键命名规范 #
Redis存储使用以下键命名模式:
| 键类型 | 命名格式 | 示例 |
|---|---|---|
| 检查点数据 | {prefix}checkpoint:{id} |
langgraph:checkpoint:cp-1 |
| 执行索引 | {prefix}execution:{execID}:checkpoints |
langgraph:execution:exec-1:checkpoints |
配置选项 #
type RedisOptions struct {
Addr string // 服务器地址
Password string // 认证密码
DB int // 数据库编号
Prefix string // 键前缀,默认"langgraph:"
TTL time.Duration // 过期时间,默认0(无过期)
}
性能特点 #
- 读写速度:内存访问,延迟极低
- 并发处理:支持高并发读写操作
- 数据持久化:可选的RDB/AOF持久化
章节来源
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L20-L27)
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L48-L56)
SQLite 存储 #
SQLite 是轻量级的文件数据库,适合单机部署和开发测试场景。
特性优势 #
- 零配置:无需独立数据库服务器
- 便携性:单个文件即可包含整个数据库
- 跨平台:支持所有主流操作系统
- 事务支持:提供基本的ACID特性
文件存储结构 #
SQLite存储将所有检查点数据保存在单个文件中:
graph LR
subgraph "SQLite数据库文件"
A[checkpoints表] --> B[id: TEXT]
A --> C[execution_id: TEXT]
A --> D[node_name: TEXT]
A --> E[state: TEXT]
A --> F[metadata: TEXT]
A --> G[timestamp: DATETIME]
A --> H[version: INTEGER]
end
subgraph "索引"
I[idx_checkpoints_execution_id]
end
A --> I
图表来源
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L52-L62)
配置选项 #
type SqliteOptions struct {
Path string // 数据库文件路径
TableName string // 表名,默认"checkpoints"
}
性能特点 #
- 启动速度快:无需数据库服务启动
- 资源占用少:内存和CPU开销低
- 简单可靠:适合小型应用和测试环境
章节来源
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L19-L24)
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L50-L70)
配置和使用指南 #
基础配置 #
所有检查点存储都遵循相同的配置模式:
classDiagram
class CheckpointConfig {
+CheckpointStore Store
+bool AutoSave
+time.Duration SaveInterval
+int MaxCheckpoints
}
class MemoryCheckpointStore
class PostgresCheckpointStore
class RedisCheckpointStore
class SqliteCheckpointStore
CheckpointConfig --> CheckpointStore : "uses"
CheckpointStore <|-- MemoryCheckpointStore
CheckpointStore <|-- PostgresCheckpointStore
CheckpointStore <|-- RedisCheckpointStore
CheckpointStore <|-- SqliteCheckpointStore
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L188-L201)
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L40-L51)
内存存储(开发测试) #
内存存储是最简单的实现,适合开发和测试环境:
// 创建内存存储
store := graph.NewMemoryCheckpointStore()
// 配置检查点
config := graph.CheckpointConfig{
Store: store,
AutoSave: true,
SaveInterval: 30 * time.Second,
MaxCheckpoints: 10,
}
// 应用到图
g.SetCheckpointConfig(config)
PostgreSQL 存储配置 #
生产环境推荐的PostgreSQL配置:
// 创建PostgreSQL存储
store, err := postgres.NewPostgresCheckpointStore(context.Background(), postgres.PostgresOptions{
ConnString: "postgres://user:password@localhost:5432/dbname",
TableName: "checkpoints",
})
// 初始化表结构
err := store.InitSchema(context.Background())
// 配置检查点
config := graph.CheckpointConfig{
Store: store,
AutoSave: true,
SaveInterval: 5 * time.Second,
MaxCheckpoints: 20,
}
Redis 存储配置 #
高性能Redis配置:
// 创建Redis存储
store := redis.NewRedisCheckpointStore(redis.RedisOptions{
Addr: "localhost:6379",
Prefix: "langgraph:",
TTL: 24 * time.Hour, // 24小时过期
})
// 配置检查点
config := graph.CheckpointConfig{
Store: store,
AutoSave: true,
SaveInterval: 2 * time.Second,
MaxCheckpoints: 15,
}
SQLite 存储配置 #
轻量级SQLite配置:
// 创建SQLite存储
store, err := sqlite.NewSqliteCheckpointStore(sqlite.SqliteOptions{
Path: "./checkpoints.db",
TableName: "checkpoints",
})
// 配置检查点
config := graph.CheckpointConfig{
Store: store,
AutoSave: true,
SaveInterval: 10 * time.Second,
MaxCheckpoints: 10,
}
章节来源
- [examples/checkpointing/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/checkpointing/main.go#L21-L27)
- [examples/checkpointing/postgres/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/checkpointing/postgres/main.go#L33-L47)
- [examples/checkpointing/redis/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/checkpointing/redis/main.go#L31-L36)
- [examples/checkpointing/sqlite/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/checkpointing/sqlite/main.go#L31-L40)
性能对比和选择建议 #
性能特征对比 #
| 特性 | 内存存储 | PostgreSQL | Redis | SQLite |
|---|---|---|---|---|
| 读取延迟 | 极低 (< 1ms) | 低 (1-10ms) | 很低 (< 1ms) | 中等 (1-5ms) |
| 写入延迟 | 极低 (< 1ms) | 中等 (10-50ms) | 很低 (< 1ms) | 中等 (1-10ms) |
| 吞吐量 | 极高 | 高 | 极高 | 中等 |
| 持久性 | 无 | 强 | 可选 | 强 |
| 并发支持 | 高 | 极高 | 高 | 低 |
| 部署复杂度 | 无 | 中等 | 低 | 无 |
| 资源消耗 | 低 | 中等 | 中等 | 低 |
场景选择指南 #
生产环境推荐 #
flowchart TD
Start([生产环境选择]) --> Critical{"是否需要强一致性?"}
Critical --> |是| ProductionDB["PostgreSQL<br/>- ACID事务<br/>- 复杂查询<br/>- 高可用性"]
Critical --> |否| Performance{"对性能要求如何?"}
Performance --> |极高| Redis["Redis<br/>- 超低延迟<br/>- 高并发<br/>- 数据过期"]
Performance --> |较高| PostgreSQL
ProductionDB --> Scale{"扩展需求?"}
Scale --> |水平扩展| PostgreSQL
Scale --> |垂直扩展| Redis
Redis --> TTL{"需要数据过期?"}
TTL --> |是| Redis
TTL --> |否| Memory["内存存储<br/>- 开发测试<br/>- 快速原型"]
不同场景的最佳选择 #
| 场景 | 推荐存储 | 理由 |
|---|---|---|
| 微服务架构 | PostgreSQL | 强一致性,支持分布式事务 |
| 实时聊天应用 | Redis | 高并发,低延迟 |
| 批处理任务 | SQLite | 简单部署,资源节省 |
| 开发测试 | 内存存储 | 快速迭代,无外部依赖 |
| 长期归档 | PostgreSQL + 定期备份 | 持久存储,便于查询分析 |
性能调优建议 #
PostgreSQL 调优 #
- 连接池配置:设置合适的最大连接数
- 索引优化:为常用查询字段建立索引
- 分区策略:按时间或执行ID进行表分区
- WAL配置:根据数据重要性调整预写日志设置
Redis 调优 #
- 内存优化:合理设置内存限制和淘汰策略
- 持久化配置:平衡数据安全性和性能
- 网络优化:使用本地连接减少网络延迟
- 数据结构:选择合适的数据类型
SQLite 调优 #
- WAL模式:启用WAL模式提高并发性能
- 同步设置:根据数据重要性调整同步级别
- 缓存大小:适当增加页面缓存大小
- 事务管理:合理使用事务减少I/O操作
章节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L188-L201)
最佳实践 #
数据一致性保障 #
事务处理 #
// 使用事务确保数据一致性
func saveCheckpointWithTransaction(store CheckpointStore, checkpoint *Checkpoint) error {
ctx := context.Background()
// 开始事务(如果支持)
tx, err := store.BeginTx(ctx)
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback(ctx)
} else {
tx.Commit(ctx)
}
}()
// 执行检查点保存
err = store.Save(ctx, checkpoint)
if err != nil {
return err
}
// 更新相关索引
err = store.UpdateIndex(ctx, checkpoint.ExecutionID)
if err != nil {
return err
}
return nil
}
版本控制 #
// 实现版本控制防止数据冲突
func saveWithVersionControl(store CheckpointStore, checkpoint *Checkpoint) error {
// 获取当前版本
current, err := store.Load(ctx, checkpoint.ID)
if err == nil && current.Version >= checkpoint.Version {
return fmt.Errorf("conflict: newer version exists")
}
// 保存新版本
return store.Save(ctx, checkpoint)
}
监控和告警 #
关键指标监控 #
- 检查点数量:监控检查点增长趋势
- 存储空间:跟踪磁盘使用情况
- 读写延迟:监控存储性能
- 错误率:统计保存和加载失败次数
告警配置 #
// 检查点存储健康监控
func monitorCheckpointStore(store CheckpointStore) {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
<-ticker.C
// 测试连接
err := store.TestConnection()
if err != nil {
log.Printf("Checkpoint store connection failed: %v", err)
// 发送告警
}
// 检查存储空间
stats, err := store.GetStats()
if err != nil {
continue
}
if stats.SpaceUsedPercent > 90 {
log.Printf("Storage space warning: %.1f%% used", stats.SpaceUsedPercent)
}
}
}
备份和恢复 #
自动备份策略 #
// 定期备份检查点数据
func backupCheckpoints(store CheckpointStore, backupDir string) error {
checkpoints, err := store.ListAll(context.Background())
if err != nil {
return err
}
// 创建备份文件
filename := fmt.Sprintf("backup_%s.json", time.Now().Format("20060102_150405"))
filepath := filepath.Join(backupDir, filename)
// 序列化并保存
data, err := json.Marshal(checkpoints)
if err != nil {
return err
}
return os.WriteFile(filepath, data, 0644)
}
恢复流程 #
// 从备份恢复检查点
func restoreFromBackup(store CheckpointStore, backupFile string) error {
data, err := os.ReadFile(backupFile)
if err != nil {
return err
}
var checkpoints []*Checkpoint
if err := json.Unmarshal(data, &checkpoints); err != nil {
return err
}
// 逐个恢复检查点
for _, cp := range checkpoints {
if err := store.Save(context.Background(), cp); err != nil {
log.Printf("Failed to restore checkpoint %s: %v", cp.ID, err)
// 继续处理其他检查点
}
}
return nil
}
安全考虑 #
数据加密 #
// 对敏感检查点数据进行加密
func encryptCheckpointData(data []byte) ([]byte, error) {
// 使用AES-GCM加密
block, err := aes.NewCipher(encryptionKey)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonce := make([]byte, gcm.NonceSize())
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
return nil, err
}
return gcm.Seal(nonce, nonce, data, nil), nil
}
访问控制 #
// 实现访问控制
type SecureCheckpointStore struct {
store CheckpointStore
acl AccessControlList
}
func (s *SecureCheckpointStore) Save(ctx context.Context, checkpoint *Checkpoint) error {
user := getUserFromContext(ctx)
if !s.acl.CanWrite(user, checkpoint.ExecutionID) {
return fmt.Errorf("access denied")
}
return s.store.Save(ctx, checkpoint)
}
章节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L230-L295)
故障排除 #
常见问题和解决方案 #
连接问题 #
问题:无法连接到数据库
# PostgreSQL连接失败
failed to connect to `host=localhost user=user database=dbname`: dial error (timeout?)
# 解决方案
# 1. 检查数据库服务状态
systemctl status postgresql
# 2. 验证连接字符串
echo $POSTGRES_CONN_STRING
# 3. 测试网络连通性
telnet localhost 5432
问题:Redis连接超时
# Redis连接失败
dial tcp: connection refused
# 解决方案
# 1. 检查Redis服务
systemctl status redis
# 2. 验证配置
redis-cli ping
# 3. 检查防火墙设置
iptables -L | grep 6379
性能问题 #
问题:检查点保存缓慢
// 性能诊断
func diagnoseCheckpointPerformance(store CheckpointStore) {
start := time.Now()
// 测试保存性能
err := store.Save(context.Background(), testCheckpoint)
duration := time.Since(start)
if duration > 100 * time.Millisecond {
log.Printf("Slow checkpoint save: %v", duration)
// 考虑优化存储配置
}
}
问题:内存使用过高
# 监控内存使用
free -h
docker stats
# 解决方案
# 1. 限制MaxCheckpoints数量
config.MaxCheckpoints = 5
# 2. 启用TTL(Redis)
opts.TTL = 1 * time.Hour
# 3. 定期清理旧检查点
store.ClearOldCheckpoints(context.Background(), 7*24*time.Hour)
数据一致性问题 #
问题:检查点数据不一致
// 数据验证
func validateCheckpointIntegrity(store CheckpointStore, checkpointID string) error {
// 加载检查点
cp, err := store.Load(context.Background(), checkpointID)
if err != nil {
return err
}
// 验证必需字段
if cp.ID == "" || cp.NodeName == "" || cp.State == nil {
return fmt.Errorf("missing required fields")
}
// 验证元数据
if execID, ok := cp.Metadata["execution_id"].(string); !ok || execID == "" {
return fmt.Errorf("invalid execution_id")
}
return nil
}
调试工具 #
检查点查看器 #
// 检查点调试工具
func debugCheckpoints(store CheckpointStore, executionID string) {
checkpoints, err := store.List(context.Background(), executionID)
if err != nil {
log.Printf("Failed to list checkpoints: %v", err)
return
}
for i, cp := range checkpoints {
fmt.Printf("Checkpoint %d:\n", i+1)
fmt.Printf(" ID: %s\n", cp.ID)
fmt.Printf(" Node: %s\n", cp.NodeName)
fmt.Printf(" Time: %v\n", cp.Timestamp)
fmt.Printf(" Version: %d\n", cp.Version)
fmt.Printf(" State length: %d\n", len(cp.State))
}
}
性能分析器 #
// 性能分析工具
type PerformanceProfiler struct {
metrics map[string]time.Duration
}
func (p *PerformanceProfiler) Measure(operation string, fn func() error) error {
start := time.Now()
err := fn()
duration := time.Since(start)
p.metrics[operation] = duration
return err
}
func (p *PerformanceProfiler) Report() {
for op, duration := range p.metrics {
fmt.Printf("%s: %v\n", op, duration)
}
}
章节来源
- [checkpoint/postgres/postgres_test.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres_test.go#L15-L90)
- [checkpoint/redis/redis_test.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis_test.go#L13-L89)
- [checkpoint/sqlite/sqlite_test.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite_test.go#L12-L83)
总结 #
检查点存储机制是 LangGraphGo 实现可靠、可恢复工作流的核心组件。通过支持多种后端存储,开发者可以根据具体需求选择最适合的解决方案:
主要优势 #
- 持久化能力:确保状态不会因系统故障而丢失
- 恢复机制:支持从任意检查点重新开始执行
- 并发控制:通过执行ID隔离不同实例的状态
- 灵活配置:支持自动保存、定时保存等多种模式
选择建议 #
- 开发测试:优先使用内存存储
- 高性能需求:选择Redis存储
- 生产环境:推荐PostgreSQL存储
- 轻量部署:考虑SQLite存储
未来发展方向 #
随着应用场景的不断扩展,检查点存储机制将继续演进:
- 云原生支持:更好的容器化和微服务集成
- 智能调度:基于负载和性能的自动存储选择
- 数据压缩:更高效的存储格式和压缩算法
- 分布式协调:支持更大规模的并发执行
通过合理配置和使用检查点存储,可以构建出既可靠又高效的复杂工作流系统,满足现代应用的各种需求。