内置存储实现 #

目录 #

  1. 简介
  2. 存储架构概览
  3. 内存存储实现
  4. 文件存储实现
  5. PostgreSQL 存储实现
  6. Redis 存储实现
  7. SQLite 存储实现
  8. 存储性能对比
  9. 最佳实践指南
  10. 故障排除
  11. 总结

简介 #

LangGraphGo 提供了五种内置的检查点存储实现,为不同的应用场景提供了灵活的选择。这些存储方案涵盖了从开发测试到生产环境的各种需求,包括内存存储、文件存储、关系型数据库存储和键值存储等多种技术栈。

核心存储接口 #

所有存储实现都遵循统一的 CheckpointStore 接口,确保了不同存储之间的兼容性和可替换性:

classDiagram
class CheckpointStore {
<<interface>>
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint
+List(ctx, executionID) []*Checkpoint
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
class MemoryCheckpointStore {
-checkpoints map[string]*Checkpoint
-mutex sync.RWMutex
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint
+List(ctx, executionID) []*Checkpoint
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
class FileCheckpointStore {
-writer io.Writer
-reader io.Reader
-mutex sync.RWMutex
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint
+List(ctx, executionID) []*Checkpoint
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
class PostgresCheckpointStore {
-pool DBPool
-tableName string
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint
+List(ctx, executionID) []*Checkpoint
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
+InitSchema(ctx) error
+Close()
}
class RedisCheckpointStore {
-client *redis.Client
-prefix string
-ttl time.Duration
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint
+List(ctx, executionID) []*Checkpoint
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
}
class SqliteCheckpointStore {
-db *sql.DB
-tableName string
+Save(ctx, checkpoint) error
+Load(ctx, checkpointID) *Checkpoint
+List(ctx, executionID) []*Checkpoint
+Delete(ctx, checkpointID) error
+Clear(ctx, executionID) error
+InitSchema(ctx) error
+Close() error
}
CheckpointStore <|.. MemoryCheckpointStore
CheckpointStore <|.. FileCheckpointStore
CheckpointStore <|.. PostgresCheckpointStore
CheckpointStore <|.. RedisCheckpointStore
CheckpointStore <|.. SqliteCheckpointStore

图表来源

存储架构概览 #

LangGraphGo 的检查点存储系统采用分层架构设计,提供了统一的抽象接口和多样化的具体实现:

graph TB
subgraph "应用层"
A[CheckpointableRunnable]
B[CheckpointableMessageGraph]
end
subgraph "接口层"
C[CheckpointStore Interface]
end
subgraph "存储实现层"
D[MemoryCheckpointStore]
E[FileCheckpointStore]
F[PostgresCheckpointStore]
G[RedisCheckpointStore]
H[SqliteCheckpointStore]
end
subgraph "底层存储"
I[内存]
J[文件系统]
K[PostgreSQL数据库]
L[Redis服务器]
M[SQLite文件]
end
A --> C
B --> C
C --> D
C --> E
C --> F
C --> G
C --> H
D --> I
E --> J
F --> K
G --> L
H --> M

图表来源

内存存储实现 #

概述 #

内存存储是最简单的存储实现,适用于开发测试环境和单次执行场景。它将所有检查点数据保存在程序的内存映射中,提供最快的访问速度但不具备持久化能力。

实现特点 #

初始化方式 #

// 创建内存存储实例
store := graph.NewMemoryCheckpointStore()

// 配置检查点配置
config := graph.CheckpointConfig{
    Store: store,
    AutoSave: true,
    MaxCheckpoints: 10,
}

关键操作实现 #

内存存储的核心实现非常简洁,主要通过同步互斥锁保护共享状态:

sequenceDiagram
participant Client as 客户端
participant Store as MemoryCheckpointStore
participant Mutex as RWMutex
Client->>Store : Save(checkpoint)
Store->>Mutex : Lock()
Mutex-->>Store : 获取写锁
Store->>Store : 存储到内存映射
Store->>Mutex : Unlock()
Store-->>Client : 返回成功
Client->>Store : Load(checkpointID)
Store->>Mutex : RLock()
Mutex-->>Store : 获取读锁
Store->>Store : 从内存映射查找
Store->>Mutex : RUnlock()
Store-->>Client : 返回检查点或错误

图表来源

优缺点分析 #

优点:

缺点:

节来源

文件存储实现 #

概述 #

文件存储是另一种简单的存储实现,通过文件系统进行数据持久化。虽然实现了基本的文件操作,但在复杂场景下的功能支持有限。

实现特点 #

初始化方式 #

// 创建文件存储实例
store := graph.NewFileCheckpointStore(
    os.Stdout,  // 写入器
    os.Stdin,   // 读取器
)

功能限制 #

文件存储的实现相对简陋,在复杂操作上存在明显限制:

flowchart TD
A[FileCheckpointStore] --> B{操作类型}
B --> |Save| C[写入JSON数据到Writer]
B --> |Load| D[从Reader读取全部数据<br/>反序列化为Checkpoint]
B --> |List| E[返回错误:<br/>未实现]
B --> |Delete| F[返回错误:<br/>未实现]
B --> |Clear| G[返回错误:<br/>未实现]
C --> H[写入成功]
D --> I[验证ID匹配]
I --> J[返回Checkpoint]
E --> K[错误处理]
F --> K
G --> K

图表来源

适用场景 #

文件存储主要用于:

节来源

PostgreSQL 存储实现 #

概述 #

PostgreSQL 存储是生产级别的关系型数据库存储实现,基于 pgx/v5 驱动提供完整的数据库功能支持。

实现特点 #

初始化方式 #

// 创建PostgreSQL存储实例
store, err := postgres.NewPostgresCheckpointStore(context.Background(), postgres.PostgresOptions{
    ConnString: "postgres://user:password@localhost:5432/dbname",
    TableName:  "checkpoints",
})

// 初始化数据库模式
err = store.InitSchema(context.Background())

数据库架构 #

PostgreSQL 存储使用专门的表结构来组织检查点数据:

erDiagram
CHECKPOINTS {
string id PK
string execution_id
string node_name
jsonb state
jsonb metadata
timestamptz timestamp
integer version
}
CHECKPOINTS ||--o{ EXECUTION_CHECKPOINTS : "belongs to"
EXECUTION_CHECKPOINTS {
string execution_id
timestamp created_at
int checkpoint_count
}

图表来源

表结构设计 #

PostgreSQL 存储的表结构包含以下字段:

字段名 类型 约束 描述
id TEXT PRIMARY KEY 检查点唯一标识符
execution_id TEXT NOT NULL 执行会话标识符
node_name TEXT NOT NULL 执行节点名称
state JSONB NOT NULL 节点状态数据
metadata JSONB NULLABLE 元数据信息
timestamp TIMESTAMPTZ NOT NULL 创建时间戳
version INTEGER NOT NULL 版本号

索引策略 #

为了优化查询性能,PostgreSQL 存储创建了以下索引:

关键操作实现 #

Save 操作流程 #

sequenceDiagram
participant Client as 客户端
participant Store as PostgresCheckpointStore
participant DB as PostgreSQL数据库
participant Pool as 连接池
Client->>Store : Save(checkpoint)
Store->>Store : 序列化State和Metadata为JSON
Store->>Pool : 获取数据库连接
Pool-->>Store : 返回连接
Store->>DB : INSERT/UPDATE语句
Note over Store,DB : 使用ON CONFLICT处理重复ID
DB-->>Store : 执行结果
Store->>Pool : 归还连接
Store-->>Client : 返回结果

图表来源

Load 操作流程 #

sequenceDiagram
participant Client as 客户端
participant Store as PostgresCheckpointStore
participant DB as PostgreSQL数据库
participant Pool as 连接池
Client->>Store : Load(checkpointID)
Store->>Pool : 获取数据库连接
Pool-->>Store : 返回连接
Store->>DB : SELECT查询语句
DB-->>Store : 返回JSON数据
Store->>Store : 反序列化JSON为Checkpoint
alt 检查点存在
Store-->>Client : 返回Checkpoint
else 检查点不存在
Store-->>Client : 返回NotFoundError
end
Store->>Pool : 归还连接

图表来源

错误处理机制 #

PostgreSQL 存储实现了完善的错误处理机制:

flowchart TD
A[数据库操作] --> B{操作类型}
B --> |连接| C[连接失败处理]
B --> |查询| D[查询错误处理]
B --> |插入| E[插入冲突处理]
B --> |删除| F[删除错误处理]
C --> G[返回连接错误]
D --> H{错误类型}
H --> |NoRows| I[返回NotFound]
H --> |其他| J[返回查询错误]
E --> K[返回冲突错误]
F --> L[返回删除错误]

图表来源

性能优化特性 #

节来源

Redis 存储实现 #

概述 #

Redis 存储是基于 go-redis/v9 驱动的高性能键值存储实现,专为高并发场景设计。

实现特点 #

初始化方式 #

// 创建Redis存储实例
store := redis.NewRedisCheckpointStore(redis.RedisOptions{
    Addr:     "localhost:6379",
    Password: "",
    DB:       0,
    Prefix:   "langgraph:",
    TTL:      1 * time.Hour,
})

数据结构设计 #

Redis 存储使用两种主要的数据结构来组织检查点数据:

graph TB
subgraph "键值存储结构"
A[checkpoint:{id}] --> B[JSON序列化Checkpoint]
end
subgraph "集合存储结构"
C[execution:{execution_id}:checkpoints] --> D[Set of checkpoint IDs]
end
subgraph "TTL管理"
E[可选TTL设置] --> F[自动过期清理]
end
A -.-> C
B -.-> D
E -.-> F

图表来源

键命名规范 #

Redis 存储使用标准化的键命名规范:

键类型 格式 示例
检查点键 checkpoint:{id} checkpoint:cp-123
执行索引键 execution:{execution_id}:checkpoints execution:exec-456:checkpoints

关键操作实现 #

Save 操作流程 #

sequenceDiagram
participant Client as 客户端
participant Store as RedisCheckpointStore
participant Redis as Redis服务器
participant Pipeline as 管道
Client->>Store : Save(checkpoint)
Store->>Store : 序列化Checkpoint为JSON
Store->>Pipeline : 创建管道
Store->>Pipeline : Set(key, value, ttl)
Store->>Store : 检查是否有execution_id
alt 有execution_id
Store->>Pipeline : SAdd(execKey, checkpointID)
alt 有TTL设置
Store->>Pipeline : Expire(execKey, ttl)
end
end
Store->>Pipeline : Exec()
Pipeline-->>Store : 执行结果
Store-->>Client : 返回结果

图表来源

List 操作流程 #

sequenceDiagram
participant Client as 客户端
participant Store as RedisCheckpointStore
participant Redis as Redis服务器
Client->>Store : List(executionID)
Store->>Store : 生成执行索引键
Store->>Redis : SMembers(execKey)
Redis-->>Store : 返回检查点ID列表
Store->>Store : 生成所有检查点键
Store->>Redis : MGet(keys...)
Redis-->>Store : 返回检查点数据
Store->>Store : 并行反序列化
Store->>Store : 过滤和验证
Store-->>Client : 返回检查点列表

图表来源

高级特性 #

管道操作 #

Redis 存储大量使用管道操作来提高性能:

flowchart LR
A[开始管道] --> B[Set操作]
B --> C[SAdd操作]
C --> D[Expire操作]
D --> E[Exec执行]
E --> F[原子提交]

TTL 支持 #

Redis 存储支持可选的TTL(生存时间)设置:

性能优势 #

节来源

SQLite 存储实现 #

概述 #

SQLite 存储是基于 go-sqlite3 驱动的轻量级文件型数据库存储实现,适合单机部署和小型应用。

实现特点 #

初始化方式 #

// 创建SQLite存储实例
store, err := sqlite.NewSqliteCheckpointStore(sqlite.SqliteOptions{
    Path:      "./checkpoints.db",
    TableName: "checkpoints",
})

// 初始化数据库模式
err = store.InitSchema(context.Background())

数据库架构 #

SQLite 存储使用标准的关系型表结构:

erDiagram
CHECKPOINTS {
string id PK
string execution_id
string node_name
text state
text metadata
datetime timestamp
integer version
}
CHECKPOINTS ||--o{ EXECUTION_CHECKPOINTS : "belongs to"
EXECUTION_CHECKPOINTS {
string execution_id
timestamp created_at
int checkpoint_count
}

图表来源

表结构设计 #

SQLite 存储的表结构与PostgreSQL类似,但使用不同的数据类型:

字段名 SQLite类型 描述
id TEXT 主键,检查点唯一标识符
execution_id TEXT 执行会话标识符
node_name TEXT 节点名称
state TEXT JSON序列化的状态数据
metadata TEXT JSON序列化的元数据
timestamp DATETIME 创建时间
version INTEGER 版本号

索引策略 #

SQLite 存储同样创建执行ID索引来优化查询:

CREATE INDEX IF NOT EXISTS idx_checkpoints_execution_id 
ON checkpoints (execution_id);

关键操作实现 #

Save 操作流程 #

sequenceDiagram
participant Client as 客户端
participant Store as SqliteCheckpointStore
participant DB as SQLite数据库
Client->>Store : Save(checkpoint)
Store->>Store : 序列化State和Metadata为JSON
Store->>DB : INSERT/UPDATE语句
Note over Store,DB : 使用ON CONFLICT处理重复ID
DB-->>Store : 执行结果
Store-->>Client : 返回结果

图表来源

Load 操作流程 #

sequenceDiagram
participant Client as 客户端
participant Store as SqliteCheckpointStore
participant DB as SQLite数据库
Client->>Store : Load(checkpointID)
Store->>DB : SELECT查询语句
DB-->>Store : 返回JSON字符串
Store->>Store : 反序列化JSON为Checkpoint
alt 检查点存在
Store-->>Client : 返回Checkpoint
else 检查点不存在
Store-->>Client : 返回NotFoundError
end

图表来源

优势特性 #

文件独立性 #

SQLite 存储的主要优势在于其文件独立性:

事务支持 #

SQLite 提供完整的ACID事务支持:

flowchart TD
A[开始事务] --> B[执行多个操作]
B --> C{操作成功?}
C --> |是| D[提交事务]
C --> |否| E[回滚事务]
D --> F[事务完成]
E --> F

性能考虑 #

节来源

存储性能对比 #

性能基准测试 #

以下是各种存储实现的性能特征对比:

存储类型 吞吐量 延迟 持久化 并发支持 外部依赖
Memory 极高 极低
File 中等 中等
PostgreSQL 极高 数据库
Redis 极高 极低 可选 极高 Redis服务器
SQLite 中等 中等 中等 SQLite驱动

场景推荐 #

graph TD
A[选择存储类型] --> B{性能要求}
B --> |极高| C{持久化要求}
B --> |高| D{并发要求}
B --> |中等| E{部署复杂度}
C --> |是| F[PostgreSQL]
C --> |否| G[Redis]
D --> |高| H[Redis]
D --> |中等| I[SQLite]
D --> |低| J[Memory]
E --> |低| K[Memory]
E --> |中等| L[SQLite]
E --> |高| M[PostgreSQL]

性能优化建议 #

高并发场景 #

大数据量场景 #

开发测试场景 #

最佳实践指南 #

连接配置最佳实践 #

PostgreSQL 配置 #

// 生产环境配置
store, err := postgres.NewPostgresCheckpointStore(ctx, postgres.PostgresOptions{
    ConnString: "postgres://user:password@host:port/dbname?sslmode=disable",
    TableName:  "checkpoints",
})

// 配置连接池参数
poolConfig, err := pgxpool.ParseConfig(connString)
poolConfig.MaxConns = 20
poolConfig.MinConns = 5
poolConfig.MaxConnLifetime = 1 * time.Hour
poolConfig.MaxConnIdleTime = 30 * time.Minute

pool, err := pgxpool.NewWithConfig(ctx, poolConfig)

Redis 配置 #

// 高性能配置
store := redis.NewRedisCheckpointStore(redis.RedisOptions{
    Addr:     "localhost:6379",
    Password: "",           // 如果需要认证
    DB:       0,            // 使用默认数据库
    Prefix:   "langgraph:", // 添加命名空间前缀
    TTL:      24 * time.Hour, // 设置合理的TTL
})

// 生产环境配置
store := redis.NewRedisCheckpointStore(redis.RedisOptions{
    Addr:     "redis-cluster:6379",
    Password: os.Getenv("REDIS_PASSWORD"),
    DB:       0,
    Prefix:   "langgraph:",
    TTL:      7 * 24 * time.Hour, // 一周TTL
})

SQLite 配置 #

// 生产环境配置
store, err := sqlite.NewSqliteCheckpointStore(sqlite.SqliteOptions{
    Path:      "/var/lib/langgraph/checkpoints.db",
    TableName: "checkpoints",
})

// 配置WAL模式以提高并发性能
db, err := sql.Open("sqlite3", path+"?_journal_mode=WAL")

错误处理最佳实践 #

统一错误处理 #

func safeCheckpointOperation(store graph.CheckpointStore, op func() error) error {
    const maxRetries = 3
    const baseDelay = 100 * time.Millisecond
    
    for i := 0; i < maxRetries; i++ {
        err := op()
        if err == nil {
            return nil
        }
        
        // 检查是否为临时错误
        if isTemporaryError(err) {
            delay := baseDelay * time.Duration(1<<i)
            time.Sleep(delay)
            continue
        }
        
        return fmt.Errorf("checkpoint operation failed: %w", err)
    }
    
    return fmt.Errorf("checkpoint operation failed after %d retries", maxRetries)
}

连接池管理 #

type CheckpointManager struct {
    stores map[string]graph.CheckpointStore
    mu     sync.RWMutex
}

func (cm *CheckpointManager) GetStore(name string) (graph.CheckpointStore, error) {
    cm.mu.RLock()
    store, exists := cm.stores[name]
    cm.mu.RUnlock()
    
    if !exists {
        return nil, fmt.Errorf("store %s not found", name)
    }
    
    // 检查连接状态
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    // 尝试简单的查询来检查连接
    err := store.Save(ctx, &graph.Checkpoint{
        ID:        "test",
        NodeName:  "health",
        State:     map[string]interface{}{"status": "ok"},
        Timestamp: time.Now(),
        Version:   1,
    })
    
    if err != nil {
        return nil, fmt.Errorf("store health check failed: %w", err)
    }
    
    return store, nil
}

监控和运维 #

性能监控指标 #

type CheckpointMetrics struct {
    SavesTotal     prometheus.Counter
    LoadsTotal     prometheus.Counter
    DeletesTotal   prometheus.Counter
    ErrorsTotal    prometheus.Counter
    SaveDuration   prometheus.Histogram
    LoadDuration   prometheus.Histogram
    StorageSize    prometheus.Gauge
}

func (m *CheckpointMetrics) RecordSave(duration time.Duration, err error) {
    m.SavesTotal.Inc()
    m.SaveDuration.Observe(duration.Seconds())
    if err != nil {
        m.ErrorsTotal.Inc()
    }
}

func (m *CheckpointMetrics) RecordLoad(duration time.Duration, err error) {
    m.LoadsTotal.Inc()
    m.LoadDuration.Observe(duration.Seconds())
    if err != nil {
        m.ErrorsTotal.Inc()
    }
}

数据清理策略 #

func cleanupOldCheckpoints(store graph.CheckpointStore, executionID string, retentionDays int) error {
    ctx := context.Background()
    
    // 获取所有检查点
    checkpoints, err := store.List(ctx, executionID)
    if err != nil {
        return fmt.Errorf("failed to list checkpoints: %w", err)
    }
    
    // 计算保留期限
    cutoff := time.Now().AddDate(0, 0, -retentionDays)
    
    // 删除过期的检查点
    for _, cp := range checkpoints {
        if cp.Timestamp.Before(cutoff) {
            err := store.Delete(ctx, cp.ID)
            if err != nil {
                log.Printf("failed to delete old checkpoint %s: %v", cp.ID, err)
                // 继续处理下一个
            }
        }
    }
    
    return nil
}

故障排除 #

常见问题诊断 #

连接问题 #

flowchart TD
A[连接失败] --> B{错误类型}
B --> |网络错误| C[检查网络连接]
B --> |认证错误| D[检查凭据]
B --> |超时错误| E[增加超时时间]
B --> |资源不足| F[检查资源限制]
C --> G[ping目标主机]
D --> H[验证用户名密码]
E --> I[调整连接超时]
F --> J[增加连接池大小]

性能问题 #

// 性能监控和诊断
func diagnoseStoragePerformance(store graph.CheckpointStore) {
    startTime := time.Now()
    
    // 测试写入性能
    testCheckpoint := &graph.Checkpoint{
        ID:        "perf-test-" + uuid.New().String(),
        NodeName:  "diagnostic",
        State:     map[string]interface{}{"test": "data"},
        Timestamp: time.Now(),
        Version:   1,
    }
    
    err := store.Save(context.Background(), testCheckpoint)
    if err != nil {
        log.Printf("Performance test failed: %v", err)
        return
    }
    
    duration := time.Since(startTime)
    log.Printf("Checkpoint save performance: %v", duration)
    
    // 根据性能调整配置
    if duration > 100*time.Millisecond {
        log.Printf("Warning: Slow checkpoint save operation")
        // 考虑使用更快的存储或优化配置
    }
}

数据一致性问题 #

// 数据完整性检查
func validateCheckpointIntegrity(store graph.CheckpointStore, checkpointID string) error {
    // 1. 加载检查点
    checkpoint, err := store.Load(context.Background(), checkpointID)
    if err != nil {
        return fmt.Errorf("failed to load checkpoint: %w", err)
    }
    
    // 2. 验证必需字段
    if checkpoint.ID == "" {
        return fmt.Errorf("missing required field: ID")
    }
    
    if checkpoint.NodeName == "" {
        return fmt.Errorf("missing required field: NodeName")
    }
    
    if checkpoint.Timestamp.IsZero() {
        return fmt.Errorf("missing required field: Timestamp")
    }
    
    // 3. 验证状态数据
    if checkpoint.State == nil {
        return fmt.Errorf("missing required field: State")
    }
    
    // 4. 验证元数据格式
    if checkpoint.Metadata != nil {
        // 验证元数据格式
        metadataJSON, err := json.Marshal(checkpoint.Metadata)
        if err != nil {
            return fmt.Errorf("invalid metadata format: %w", err)
        }
        
        var temp map[string]interface{}
        err = json.Unmarshal(metadataJSON, &temp)
        if err != nil {
            return fmt.Errorf("invalid metadata JSON: %w", err)
        }
    }
    
    return nil
}

调试工具 #

存储状态检查器 #

type StorageDebugger struct {
    stores map[string]graph.CheckpointStore
}

func (sd *StorageDebugger) DumpStorageStats() {
    for name, store := range sd.stores {
        log.Printf("Storage: %s", name)
        
        // 尝试执行一个简单的操作来检查健康状态
        ctx := context.Background()
        testCP := &graph.Checkpoint{
            ID:        "debug-" + uuid.New().String(),
            NodeName:  "debug",
            State:     map[string]interface{}{"test": true},
            Timestamp: time.Now(),
            Version:   1,
        }
        
        startTime := time.Now()
        err := store.Save(ctx, testCP)
        duration := time.Since(startTime)
        
        if err != nil {
            log.Printf("  Status: ERROR - %v", err)
        } else {
            log.Printf("  Status: OK - %.2fms", duration.Seconds()*1000)
            
            // 清理测试数据
            store.Delete(ctx, testCP.ID)
        }
    }
}

性能分析器 #

func profileStorageOperations(store graph.CheckpointStore) {
    type OperationResult struct {
        OpType     string
        Duration   time.Duration
        Success    bool
        Error      error
    }
    
    results := make(chan OperationResult, 1000)
    
    // 启动多个并发操作进行压力测试
    for i := 0; i < 10; i++ {
        go func(workerID int) {
            for j := 0; j < 100; j++ {
                opType := rand.Intn(3)
                startTime := time.Now()
                
                var err error
                switch opType {
                case 0:
                    err = store.Save(context.Background(), &graph.Checkpoint{
                        ID:        fmt.Sprintf("profile-%d-%d", workerID, j),
                        NodeName:  "profile",
                        State:     map[string]interface{}{"data": "test"},
                        Timestamp: time.Now(),
                        Version:   1,
                    })
                case 1:
                    _, err = store.Load(context.Background(), fmt.Sprintf("profile-%d-%d", workerID, j))
                case 2:
                    err = store.Delete(context.Background(), fmt.Sprintf("profile-%d-%d", workerID, j))
                }
                
                results <- OperationResult{
                    OpType:  []string{"Save", "Load", "Delete"}[opType],
                    Duration: time.Since(startTime),
                    Success: err == nil,
                    Error:   err,
                }
            }
        }(i)
    }
    
    // 收集结果
    var totalTime time.Duration
    var totalOps int
    var failures int
    
    for i := 0; i < 3000; i++ {
        result := <-results
        totalTime += result.Duration
        totalOps++
        if !result.Success {
            failures++
        }
    }
    
    avgLatency := totalTime / time.Duration(totalOps)
    failureRate := float64(failures) / float64(totalOps)
    
    log.Printf("Average latency: %v", avgLatency)
    log.Printf("Failure rate: %.2f%%", failureRate*100)
}

总结 #

LangGraphGo 的内置检查点存储实现提供了从开发测试到生产环境的完整解决方案。每种存储都有其独特的优势和适用场景:

存储选择决策树 #

flowchart TD
A[开始选择存储] --> B{开发阶段?}
B --> |是| C[Memory存储]
B --> |否| D{生产环境?}
D --> |否| E{并发要求?}
D --> |是| F{数据量大小?}
E --> |高| G[Redis存储]
E --> |中等| H[SQLite存储]
E --> |低| I[Memory存储]
F --> |小| J[SQLite存储]
F --> |大| K[PostgreSQL存储]
F --> |极大| L[分布式存储方案]
C --> M[开发测试完成]
G --> N[高并发场景完成]
H --> O[中小项目完成]
I --> P[快速原型完成]
J --> Q[单机应用完成]
K --> R[大型应用完成]
L --> S[企业级方案完成]

技术演进路径 #

  1. 开发阶段: Memory 存储 → 快速迭代
  2. 测试阶段: SQLite 存储 → 本地测试
  3. 预发布阶段: PostgreSQL 存储 → 生产环境模拟
  4. 生产阶段: Redis 存储 + PostgreSQL 存储 → 高可用方案

最佳实践总结 #

通过合理选择和配置这些内置存储实现,可以构建出既满足功能需求又具备良好性能和可靠性的检查点管理系统。