内置存储实现 #
本文档中引用的文件
- checkpoint/postgres/postgres.go
- checkpoint/redis/redis.go
- checkpoint/sqlite/sqlite.go
- graph/checkpointing.go
- examples/checkpointing/main.go
- examples/checkpointing/postgres/main.go
- examples/checkpointing/redis/main.go
- examples/checkpointing/sqlite/main.go
- checkpoint/postgres/postgres_test.go
- checkpoint/redis/redis_test.go
- checkpoint/sqlite/sqlite_test.go
- go.mod
目录 #
简介 #
LangGraphGo 提供了五种内置的检查点存储实现,为不同的应用场景提供了灵活的选择。这些存储方案涵盖了从开发测试到生产环境的各种需求,包括内存存储、文件存储、关系型数据库存储和键值存储等多种技术栈。
核心存储接口 #
所有存储实现都遵循统一的 CheckpointStore 接口,确保了不同存储之间的兼容性和可替换性:
classDiagram
class CheckpointStore {
<<interface>>
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint
+List(ctx, executionID) []*Checkpoint
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
class MemoryCheckpointStore {
-checkpoints map[string]*Checkpoint
-mutex sync.RWMutex
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint
+List(ctx, executionID) []*Checkpoint
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
class FileCheckpointStore {
-writer io.Writer
-reader io.Reader
-mutex sync.RWMutex
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint
+List(ctx, executionID) []*Checkpoint
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
class PostgresCheckpointStore {
-pool DBPool
-tableName string
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint
+List(ctx, executionID) []*Checkpoint
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
+InitSchema(ctx) error
+Close()
}
class RedisCheckpointStore {
-client *redis.Client
-prefix string
-ttl time.Duration
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint
+List(ctx, executionID) []*Checkpoint
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
class SqliteCheckpointStore {
-db *sql.DB
-tableName string
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint
+List(ctx, executionID) []*Checkpoint
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
+InitSchema(ctx) error
+Close() error
}
CheckpointStore <|.. MemoryCheckpointStore
CheckpointStore <|.. FileCheckpointStore
CheckpointStore <|.. PostgresCheckpointStore
CheckpointStore <|.. RedisCheckpointStore
CheckpointStore <|.. SqliteCheckpointStore
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L22-L38)
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L23-L26)
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L13-L18)
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L13-L17)
存储架构概览 #
LangGraphGo 的检查点存储系统采用分层架构设计,提供了统一的抽象接口和多样化的具体实现:
graph TB
subgraph "应用层"
A[CheckpointableRunnable]
B[CheckpointableMessageGraph]
end
subgraph "接口层"
C[CheckpointStore Interface]
end
subgraph "存储实现层"
D[MemoryCheckpointStore]
E[FileCheckpointStore]
F[PostgresCheckpointStore]
G[RedisCheckpointStore]
H[SqliteCheckpointStore]
end
subgraph "底层存储"
I[内存]
J[文件系统]
K[PostgreSQL数据库]
L[Redis服务器]
M[SQLite文件]
end
A --> C
B --> C
C --> D
C --> E
C --> F
C --> G
C --> H
D --> I
E --> J
F --> K
G --> L
H --> M
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L22-L38)
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L40-L111)
内存存储实现 #
概述 #
内存存储是最简单的存储实现,适用于开发测试环境和单次执行场景。它将所有检查点数据保存在程序的内存映射中,提供最快的访问速度但不具备持久化能力。
实现特点 #
- 性能: 最高性能,直接内存访问
- 持久化: 不支持持久化,进程重启后数据丢失
- 并发: 使用读写锁保证线程安全
- 适用场景: 单次执行、测试、原型开发
初始化方式 #
// 创建内存存储实例
store := graph.NewMemoryCheckpointStore()
// 配置检查点配置
config := graph.CheckpointConfig{
Store: store,
AutoSave: true,
MaxCheckpoints: 10,
}
关键操作实现 #
内存存储的核心实现非常简洁,主要通过同步互斥锁保护共享状态:
sequenceDiagram
participant Client as 客户端
participant Store as MemoryCheckpointStore
participant Mutex as RWMutex
Client->>Store : Save(checkpoint)
Store->>Mutex : Lock()
Mutex-->>Store : 获取写锁
Store->>Store : 存储到内存映射
Store->>Mutex : Unlock()
Store-->>Client : 返回成功
Client->>Store : Load(checkpointID)
Store->>Mutex : RLock()
Mutex-->>Store : 获取读锁
Store->>Store : 从内存映射查找
Store->>Mutex : RUnlock()
Store-->>Client : 返回检查点或错误
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L54-L72)
优缺点分析 #
优点:
- 极高的性能表现
- 实现简单,无外部依赖
- 内存中操作,响应时间最短
缺点:
- 数据不持久化
- 内存占用随检查点数量增长
- 进程重启后数据丢失
节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L40-L111)
文件存储实现 #
概述 #
文件存储是另一种简单的存储实现,通过文件系统进行数据持久化。虽然实现了基本的文件操作,但在复杂场景下的功能支持有限。
实现特点 #
- 性能: 中等性能,受文件系统I/O影响
- 持久化: 支持持久化,文件系统重启后数据保留
- 并发: 使用读写锁保证线程安全
- 适用场景: 简单的文件存储需求
初始化方式 #
// 创建文件存储实例
store := graph.NewFileCheckpointStore(
os.Stdout, // 写入器
os.Stdin, // 读取器
)
功能限制 #
文件存储的实现相对简陋,在复杂操作上存在明显限制:
flowchart TD
A[FileCheckpointStore] --> B{操作类型}
B --> |Save| C[写入JSON数据到Writer]
B --> |Load| D[从Reader读取全部数据<br/>反序列化为Checkpoint]
B --> |List| E[返回错误:<br/>未实现]
B --> |Delete| F[返回错误:<br/>未实现]
B --> |Clear| G[返回错误:<br/>未实现]
C --> H[写入成功]
D --> I[验证ID匹配]
I --> J[返回Checkpoint]
E --> K[错误处理]
F --> K
G --> K
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L128-L186)
适用场景 #
文件存储主要用于:
- 简单的数据导出
- 原始数据备份
- 开发调试时的数据查看
节来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L113-L186)
PostgreSQL 存储实现 #
概述 #
PostgreSQL 存储是生产级别的关系型数据库存储实现,基于 pgx/v5 驱动提供完整的数据库功能支持。
实现特点 #
- 性能: 高性能,支持并发访问
- 持久化: 强一致性,数据持久化
- 事务: 支持ACID事务
- 索引: 支持复合索引优化查询
- 适用场景: 生产环境、高并发场景
初始化方式 #
// 创建PostgreSQL存储实例
store, err := postgres.NewPostgresCheckpointStore(context.Background(), postgres.PostgresOptions{
ConnString: "postgres://user:password@localhost:5432/dbname",
TableName: "checkpoints",
})
// 初始化数据库模式
err = store.InitSchema(context.Background())
数据库架构 #
PostgreSQL 存储使用专门的表结构来组织检查点数据:
erDiagram
CHECKPOINTS {
string id PK
string execution_id
string node_name
jsonb state
jsonb metadata
timestamptz timestamp
integer version
}
CHECKPOINTS ||--o{ EXECUTION_CHECKPOINTS : "belongs to"
EXECUTION_CHECKPOINTS {
string execution_id
timestamp created_at
int checkpoint_count
}
图表来源
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L66-L77)
表结构设计 #
PostgreSQL 存储的表结构包含以下字段:
| 字段名 | 类型 | 约束 | 描述 |
|---|---|---|---|
| id | TEXT | PRIMARY KEY | 检查点唯一标识符 |
| execution_id | TEXT | NOT NULL | 执行会话标识符 |
| node_name | TEXT | NOT NULL | 执行节点名称 |
| state | JSONB | NOT NULL | 节点状态数据 |
| metadata | JSONB | NULLABLE | 元数据信息 |
| timestamp | TIMESTAMPTZ | NOT NULL | 创建时间戳 |
| version | INTEGER | NOT NULL | 版本号 |
索引策略 #
为了优化查询性能,PostgreSQL 存储创建了以下索引:
- 主键索引: 对
id字段建立主键索引 - 执行索引: 对
execution_id字段建立普通索引,加速按执行会话查询
关键操作实现 #
Save 操作流程 #
sequenceDiagram
participant Client as 客户端
participant Store as PostgresCheckpointStore
participant DB as PostgreSQL数据库
participant Pool as 连接池
Client->>Store : Save(checkpoint)
Store->>Store : 序列化State和Metadata为JSON
Store->>Pool : 获取数据库连接
Pool-->>Store : 返回连接
Store->>DB : INSERT/UPDATE语句
Note over Store,DB : 使用ON CONFLICT处理重复ID
DB-->>Store : 执行结果
Store->>Pool : 归还连接
Store-->>Client : 返回结果
图表来源
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L91-L135)
Load 操作流程 #
sequenceDiagram
participant Client as 客户端
participant Store as PostgresCheckpointStore
participant DB as PostgreSQL数据库
participant Pool as 连接池
Client->>Store : Load(checkpointID)
Store->>Pool : 获取数据库连接
Pool-->>Store : 返回连接
Store->>DB : SELECT查询语句
DB-->>Store : 返回JSON数据
Store->>Store : 反序列化JSON为Checkpoint
alt 检查点存在
Store-->>Client : 返回Checkpoint
else 检查点不存在
Store-->>Client : 返回NotFoundError
end
Store->>Pool : 归还连接
图表来源
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L137-L176)
错误处理机制 #
PostgreSQL 存储实现了完善的错误处理机制:
flowchart TD
A[数据库操作] --> B{操作类型}
B --> |连接| C[连接失败处理]
B --> |查询| D[查询错误处理]
B --> |插入| E[插入冲突处理]
B --> |删除| F[删除错误处理]
C --> G[返回连接错误]
D --> H{错误类型}
H --> |NoRows| I[返回NotFound]
H --> |其他| J[返回查询错误]
E --> K[返回冲突错误]
F --> L[返回删除错误]
图表来源
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L158-L172)
性能优化特性 #
- 连接池管理: 使用
pgxpool提供连接池支持 - 批量操作: 支持事务批量操作
- 索引优化: 合理的索引设计提升查询性能
- JSONB存储: 利用PostgreSQL的JSONB类型优化存储
节来源
- [checkpoint/postgres/postgres.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/postgres/postgres.go#L1-L250)
Redis 存储实现 #
概述 #
Redis 存储是基于 go-redis/v9 驱动的高性能键值存储实现,专为高并发场景设计。
实现特点 #
- 性能: 极高性能,内存中的键值存储
- 持久化: 可配置持久化,支持RDB和AOF
- 数据结构: 支持丰富的数据结构
- 适用场景: 高并发、低延迟场景
初始化方式 #
// 创建Redis存储实例
store := redis.NewRedisCheckpointStore(redis.RedisOptions{
Addr: "localhost:6379",
Password: "",
DB: 0,
Prefix: "langgraph:",
TTL: 1 * time.Hour,
})
数据结构设计 #
Redis 存储使用两种主要的数据结构来组织检查点数据:
graph TB
subgraph "键值存储结构"
A[checkpoint:{id}] --> B[JSON序列化Checkpoint]
end
subgraph "集合存储结构"
C[execution:{execution_id}:checkpoints] --> D[Set of checkpoint IDs]
end
subgraph "TTL管理"
E[可选TTL设置] --> F[自动过期清理]
end
A -.-> C
B -.-> D
E -.-> F
图表来源
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L49-L55)
键命名规范 #
Redis 存储使用标准化的键命名规范:
| 键类型 | 格式 | 示例 |
|---|---|---|
| 检查点键 | checkpoint:{id} |
checkpoint:cp-123 |
| 执行索引键 | execution:{execution_id}:checkpoints |
execution:exec-456:checkpoints |
关键操作实现 #
Save 操作流程 #
sequenceDiagram
participant Client as 客户端
participant Store as RedisCheckpointStore
participant Redis as Redis服务器
participant Pipeline as 管道
Client->>Store : Save(checkpoint)
Store->>Store : 序列化Checkpoint为JSON
Store->>Pipeline : 创建管道
Store->>Pipeline : Set(key, value, ttl)
Store->>Store : 检查是否有execution_id
alt 有execution_id
Store->>Pipeline : SAdd(execKey, checkpointID)
alt 有TTL设置
Store->>Pipeline : Expire(execKey, ttl)
end
end
Store->>Pipeline : Exec()
Pipeline-->>Store : 执行结果
Store-->>Client : 返回结果
图表来源
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L57-L84)
List 操作流程 #
sequenceDiagram
participant Client as 客户端
participant Store as RedisCheckpointStore
participant Redis as Redis服务器
Client->>Store : List(executionID)
Store->>Store : 生成执行索引键
Store->>Redis : SMembers(execKey)
Redis-->>Store : 返回检查点ID列表
Store->>Store : 生成所有检查点键
Store->>Redis : MGet(keys...)
Redis-->>Store : 返回检查点数据
Store->>Store : 并行反序列化
Store->>Store : 过滤和验证
Store-->>Client : 返回检查点列表
图表来源
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L105-L155)
高级特性 #
管道操作 #
Redis 存储大量使用管道操作来提高性能:
flowchart LR
A[开始管道] --> B[Set操作]
B --> C[SAdd操作]
C --> D[Expire操作]
D --> E[Exec执行]
E --> F[原子提交]
TTL 支持 #
Redis 存储支持可选的TTL(生存时间)设置:
- 全局TTL: 设置整个存储的过期时间
- 执行TTL: 为特定执行会话设置过期时间
- 自动清理: Redis 自动清理过期数据
性能优势 #
- 内存存储: 数据完全存储在内存中
- 管道操作: 减少网络往返次数
- 原子操作: 支持事务和原子操作
- 数据结构: 支持多种高效数据结构
节来源
- [checkpoint/redis/redis.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/redis/redis.go#L1-L212)
SQLite 存储实现 #
概述 #
SQLite 存储是基于 go-sqlite3 驱动的轻量级文件型数据库存储实现,适合单机部署和小型应用。
实现特点 #
- 性能: 中等性能,文件I/O受限
- 持久化: 强一致性,文件持久化
- 独立性: 无需单独的数据库服务
- 适用场景: 单机应用、嵌入式系统
初始化方式 #
// 创建SQLite存储实例
store, err := sqlite.NewSqliteCheckpointStore(sqlite.SqliteOptions{
Path: "./checkpoints.db",
TableName: "checkpoints",
})
// 初始化数据库模式
err = store.InitSchema(context.Background())
数据库架构 #
SQLite 存储使用标准的关系型表结构:
erDiagram
CHECKPOINTS {
string id PK
string execution_id
string node_name
text state
text metadata
datetime timestamp
integer version
}
CHECKPOINTS ||--o{ EXECUTION_CHECKPOINTS : "belongs to"
EXECUTION_CHECKPOINTS {
string execution_id
timestamp created_at
int checkpoint_count
}
图表来源
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L52-L62)
表结构设计 #
SQLite 存储的表结构与PostgreSQL类似,但使用不同的数据类型:
| 字段名 | SQLite类型 | 描述 |
|---|---|---|
| id | TEXT | 主键,检查点唯一标识符 |
| execution_id | TEXT | 执行会话标识符 |
| node_name | TEXT | 节点名称 |
| state | TEXT | JSON序列化的状态数据 |
| metadata | TEXT | JSON序列化的元数据 |
| timestamp | DATETIME | 创建时间 |
| version | INTEGER | 版本号 |
索引策略 #
SQLite 存储同样创建执行ID索引来优化查询:
CREATE INDEX IF NOT EXISTS idx_checkpoints_execution_id
ON checkpoints (execution_id);
关键操作实现 #
Save 操作流程 #
sequenceDiagram
participant Client as 客户端
participant Store as SqliteCheckpointStore
participant DB as SQLite数据库
Client->>Store : Save(checkpoint)
Store->>Store : 序列化State和Metadata为JSON
Store->>DB : INSERT/UPDATE语句
Note over Store,DB : 使用ON CONFLICT处理重复ID
DB-->>Store : 执行结果
Store-->>Client : 返回结果
图表来源
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L77-L121)
Load 操作流程 #
sequenceDiagram
participant Client as 客户端
participant Store as SqliteCheckpointStore
participant DB as SQLite数据库
Client->>Store : Load(checkpointID)
Store->>DB : SELECT查询语句
DB-->>Store : 返回JSON字符串
Store->>Store : 反序列化JSON为Checkpoint
alt 检查点存在
Store-->>Client : 返回Checkpoint
else 检查点不存在
Store-->>Client : 返回NotFoundError
end
图表来源
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L123-L162)
优势特性 #
文件独立性 #
SQLite 存储的主要优势在于其文件独立性:
- 零配置: 无需安装和配置数据库服务
- 便携性: 整个数据库是一个文件
- 可靠性: WAL模式提供更好的并发支持
事务支持 #
SQLite 提供完整的ACID事务支持:
flowchart TD
A[开始事务] --> B[执行多个操作]
B --> C{操作成功?}
C --> |是| D[提交事务]
C --> |否| E[回滚事务]
D --> F[事务完成]
E --> F
性能考虑 #
- 文件I/O: 性能受限于磁盘I/O速度
- 并发: WAL模式支持更好的并发
- 缓存: SQLite利用操作系统缓存
节来源
- [checkpoint/sqlite/sqlite.go](https://github.com/smallnest/langgraphgo/blob/main/checkpoint/sqlite/sqlite.go#L1-L236)
存储性能对比 #
性能基准测试 #
以下是各种存储实现的性能特征对比:
| 存储类型 | 吞吐量 | 延迟 | 持久化 | 并发支持 | 外部依赖 |
|---|---|---|---|---|---|
| Memory | 极高 | 极低 | 否 | 高 | 无 |
| File | 中等 | 中等 | 是 | 低 | 无 |
| PostgreSQL | 高 | 低 | 是 | 极高 | 数据库 |
| Redis | 极高 | 极低 | 可选 | 极高 | Redis服务器 |
| SQLite | 中等 | 中等 | 是 | 中等 | SQLite驱动 |
场景推荐 #
graph TD
A[选择存储类型] --> B{性能要求}
B --> |极高| C{持久化要求}
B --> |高| D{并发要求}
B --> |中等| E{部署复杂度}
C --> |是| F[PostgreSQL]
C --> |否| G[Redis]
D --> |高| H[Redis]
D --> |中等| I[SQLite]
D --> |低| J[Memory]
E --> |低| K[Memory]
E --> |中等| L[SQLite]
E --> |高| M[PostgreSQL]
性能优化建议 #
高并发场景 #
- 首选: Redis 存储
- 次选: PostgreSQL 存储
- 避免: File 存储
大数据量场景 #
- 首选: PostgreSQL 存储
- 次选: SQLite 存储
- 考虑: 分片策略
开发测试场景 #
- 首选: Memory 存储
- 次选: SQLite 存储
- 快速原型: File 存储
最佳实践指南 #
连接配置最佳实践 #
PostgreSQL 配置 #
// 生产环境配置
store, err := postgres.NewPostgresCheckpointStore(ctx, postgres.PostgresOptions{
ConnString: "postgres://user:password@host:port/dbname?sslmode=disable",
TableName: "checkpoints",
})
// 配置连接池参数
poolConfig, err := pgxpool.ParseConfig(connString)
poolConfig.MaxConns = 20
poolConfig.MinConns = 5
poolConfig.MaxConnLifetime = 1 * time.Hour
poolConfig.MaxConnIdleTime = 30 * time.Minute
pool, err := pgxpool.NewWithConfig(ctx, poolConfig)
Redis 配置 #
// 高性能配置
store := redis.NewRedisCheckpointStore(redis.RedisOptions{
Addr: "localhost:6379",
Password: "", // 如果需要认证
DB: 0, // 使用默认数据库
Prefix: "langgraph:", // 添加命名空间前缀
TTL: 24 * time.Hour, // 设置合理的TTL
})
// 生产环境配置
store := redis.NewRedisCheckpointStore(redis.RedisOptions{
Addr: "redis-cluster:6379",
Password: os.Getenv("REDIS_PASSWORD"),
DB: 0,
Prefix: "langgraph:",
TTL: 7 * 24 * time.Hour, // 一周TTL
})
SQLite 配置 #
// 生产环境配置
store, err := sqlite.NewSqliteCheckpointStore(sqlite.SqliteOptions{
Path: "/var/lib/langgraph/checkpoints.db",
TableName: "checkpoints",
})
// 配置WAL模式以提高并发性能
db, err := sql.Open("sqlite3", path+"?_journal_mode=WAL")
错误处理最佳实践 #
统一错误处理 #
func safeCheckpointOperation(store graph.CheckpointStore, op func() error) error {
const maxRetries = 3
const baseDelay = 100 * time.Millisecond
for i := 0; i < maxRetries; i++ {
err := op()
if err == nil {
return nil
}
// 检查是否为临时错误
if isTemporaryError(err) {
delay := baseDelay * time.Duration(1<<i)
time.Sleep(delay)
continue
}
return fmt.Errorf("checkpoint operation failed: %w", err)
}
return fmt.Errorf("checkpoint operation failed after %d retries", maxRetries)
}
连接池管理 #
type CheckpointManager struct {
stores map[string]graph.CheckpointStore
mu sync.RWMutex
}
func (cm *CheckpointManager) GetStore(name string) (graph.CheckpointStore, error) {
cm.mu.RLock()
store, exists := cm.stores[name]
cm.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("store %s not found", name)
}
// 检查连接状态
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 尝试简单的查询来检查连接
err := store.Save(ctx, &graph.Checkpoint{
ID: "test",
NodeName: "health",
State: map[string]interface{}{"status": "ok"},
Timestamp: time.Now(),
Version: 1,
})
if err != nil {
return nil, fmt.Errorf("store health check failed: %w", err)
}
return store, nil
}
监控和运维 #
性能监控指标 #
type CheckpointMetrics struct {
SavesTotal prometheus.Counter
LoadsTotal prometheus.Counter
DeletesTotal prometheus.Counter
ErrorsTotal prometheus.Counter
SaveDuration prometheus.Histogram
LoadDuration prometheus.Histogram
StorageSize prometheus.Gauge
}
func (m *CheckpointMetrics) RecordSave(duration time.Duration, err error) {
m.SavesTotal.Inc()
m.SaveDuration.Observe(duration.Seconds())
if err != nil {
m.ErrorsTotal.Inc()
}
}
func (m *CheckpointMetrics) RecordLoad(duration time.Duration, err error) {
m.LoadsTotal.Inc()
m.LoadDuration.Observe(duration.Seconds())
if err != nil {
m.ErrorsTotal.Inc()
}
}
数据清理策略 #
func cleanupOldCheckpoints(store graph.CheckpointStore, executionID string, retentionDays int) error {
ctx := context.Background()
// 获取所有检查点
checkpoints, err := store.List(ctx, executionID)
if err != nil {
return fmt.Errorf("failed to list checkpoints: %w", err)
}
// 计算保留期限
cutoff := time.Now().AddDate(0, 0, -retentionDays)
// 删除过期的检查点
for _, cp := range checkpoints {
if cp.Timestamp.Before(cutoff) {
err := store.Delete(ctx, cp.ID)
if err != nil {
log.Printf("failed to delete old checkpoint %s: %v", cp.ID, err)
// 继续处理下一个
}
}
}
return nil
}
故障排除 #
常见问题诊断 #
连接问题 #
flowchart TD
A[连接失败] --> B{错误类型}
B --> |网络错误| C[检查网络连接]
B --> |认证错误| D[检查凭据]
B --> |超时错误| E[增加超时时间]
B --> |资源不足| F[检查资源限制]
C --> G[ping目标主机]
D --> H[验证用户名密码]
E --> I[调整连接超时]
F --> J[增加连接池大小]
性能问题 #
// 性能监控和诊断
func diagnoseStoragePerformance(store graph.CheckpointStore) {
startTime := time.Now()
// 测试写入性能
testCheckpoint := &graph.Checkpoint{
ID: "perf-test-" + uuid.New().String(),
NodeName: "diagnostic",
State: map[string]interface{}{"test": "data"},
Timestamp: time.Now(),
Version: 1,
}
err := store.Save(context.Background(), testCheckpoint)
if err != nil {
log.Printf("Performance test failed: %v", err)
return
}
duration := time.Since(startTime)
log.Printf("Checkpoint save performance: %v", duration)
// 根据性能调整配置
if duration > 100*time.Millisecond {
log.Printf("Warning: Slow checkpoint save operation")
// 考虑使用更快的存储或优化配置
}
}
数据一致性问题 #
// 数据完整性检查
func validateCheckpointIntegrity(store graph.CheckpointStore, checkpointID string) error {
// 1. 加载检查点
checkpoint, err := store.Load(context.Background(), checkpointID)
if err != nil {
return fmt.Errorf("failed to load checkpoint: %w", err)
}
// 2. 验证必需字段
if checkpoint.ID == "" {
return fmt.Errorf("missing required field: ID")
}
if checkpoint.NodeName == "" {
return fmt.Errorf("missing required field: NodeName")
}
if checkpoint.Timestamp.IsZero() {
return fmt.Errorf("missing required field: Timestamp")
}
// 3. 验证状态数据
if checkpoint.State == nil {
return fmt.Errorf("missing required field: State")
}
// 4. 验证元数据格式
if checkpoint.Metadata != nil {
// 验证元数据格式
metadataJSON, err := json.Marshal(checkpoint.Metadata)
if err != nil {
return fmt.Errorf("invalid metadata format: %w", err)
}
var temp map[string]interface{}
err = json.Unmarshal(metadataJSON, &temp)
if err != nil {
return fmt.Errorf("invalid metadata JSON: %w", err)
}
}
return nil
}
调试工具 #
存储状态检查器 #
type StorageDebugger struct {
stores map[string]graph.CheckpointStore
}
func (sd *StorageDebugger) DumpStorageStats() {
for name, store := range sd.stores {
log.Printf("Storage: %s", name)
// 尝试执行一个简单的操作来检查健康状态
ctx := context.Background()
testCP := &graph.Checkpoint{
ID: "debug-" + uuid.New().String(),
NodeName: "debug",
State: map[string]interface{}{"test": true},
Timestamp: time.Now(),
Version: 1,
}
startTime := time.Now()
err := store.Save(ctx, testCP)
duration := time.Since(startTime)
if err != nil {
log.Printf(" Status: ERROR - %v", err)
} else {
log.Printf(" Status: OK - %.2fms", duration.Seconds()*1000)
// 清理测试数据
store.Delete(ctx, testCP.ID)
}
}
}
性能分析器 #
func profileStorageOperations(store graph.CheckpointStore) {
type OperationResult struct {
OpType string
Duration time.Duration
Success bool
Error error
}
results := make(chan OperationResult, 1000)
// 启动多个并发操作进行压力测试
for i := 0; i < 10; i++ {
go func(workerID int) {
for j := 0; j < 100; j++ {
opType := rand.Intn(3)
startTime := time.Now()
var err error
switch opType {
case 0:
err = store.Save(context.Background(), &graph.Checkpoint{
ID: fmt.Sprintf("profile-%d-%d", workerID, j),
NodeName: "profile",
State: map[string]interface{}{"data": "test"},
Timestamp: time.Now(),
Version: 1,
})
case 1:
_, err = store.Load(context.Background(), fmt.Sprintf("profile-%d-%d", workerID, j))
case 2:
err = store.Delete(context.Background(), fmt.Sprintf("profile-%d-%d", workerID, j))
}
results <- OperationResult{
OpType: []string{"Save", "Load", "Delete"}[opType],
Duration: time.Since(startTime),
Success: err == nil,
Error: err,
}
}
}(i)
}
// 收集结果
var totalTime time.Duration
var totalOps int
var failures int
for i := 0; i < 3000; i++ {
result := <-results
totalTime += result.Duration
totalOps++
if !result.Success {
failures++
}
}
avgLatency := totalTime / time.Duration(totalOps)
failureRate := float64(failures) / float64(totalOps)
log.Printf("Average latency: %v", avgLatency)
log.Printf("Failure rate: %.2f%%", failureRate*100)
}
总结 #
LangGraphGo 的内置检查点存储实现提供了从开发测试到生产环境的完整解决方案。每种存储都有其独特的优势和适用场景:
存储选择决策树 #
flowchart TD
A[开始选择存储] --> B{开发阶段?}
B --> |是| C[Memory存储]
B --> |否| D{生产环境?}
D --> |否| E{并发要求?}
D --> |是| F{数据量大小?}
E --> |高| G[Redis存储]
E --> |中等| H[SQLite存储]
E --> |低| I[Memory存储]
F --> |小| J[SQLite存储]
F --> |大| K[PostgreSQL存储]
F --> |极大| L[分布式存储方案]
C --> M[开发测试完成]
G --> N[高并发场景完成]
H --> O[中小项目完成]
I --> P[快速原型完成]
J --> Q[单机应用完成]
K --> R[大型应用完成]
L --> S[企业级方案完成]
技术演进路径 #
- 开发阶段: Memory 存储 → 快速迭代
- 测试阶段: SQLite 存储 → 本地测试
- 预发布阶段: PostgreSQL 存储 → 生产环境模拟
- 生产阶段: Redis 存储 + PostgreSQL 存储 → 高可用方案
最佳实践总结 #
- 开发环境: 使用 Memory 存储进行快速开发和测试
- 测试环境: 使用 SQLite 存储进行集成测试
- 生产环境: 根据具体需求选择 PostgreSQL 或 Redis 存储
- 混合方案: 结合多种存储实现以满足不同场景需求
通过合理选择和配置这些内置存储实现,可以构建出既满足功能需求又具备良好性能和可靠性的检查点管理系统。