流式处理模块 #

目录 #

  1. 简介
  2. 核心概念
  3. StreamMode 枚举类型
  4. StreamConfig 配置结构体
  5. StreamResult 结果结构体
  6. StreamingRunnable 核心组件
  7. StreamingListener 事件监听器
  8. StreamingExecutor 执行器
  9. 流式模式详解
  10. 使用示例
  11. 最佳实践
  12. 故障排除

简介 #

LangGraph Go 的流式处理模块提供了一个强大而灵活的架构,用于在长运行的 LLM 链或复杂代理工作流中实现实时事件流式传输。该模块允许开发者订阅执行过程中的各种事件,为用户提供实时反馈,同时支持多种粒度的事件过滤和处理策略。

主要特性 #

核心概念 #

事件驱动架构 #

流式处理采用事件驱动架构,主要包含以下核心组件:

graph TB
subgraph "执行引擎"
G[Graph 执行器]
N[节点执行器]
end
subgraph "监听系统"
SL[StreamingListener]
NL[NodeListener]
GLH[GraphCallbackHandler]
end
subgraph "事件通道"
EC[Event Channel]
RC[Result Channel]
ER[Error Channel]
DC[Done Channel]
end
subgraph "用户接口"
SR[StreamingRunnable]
SE[StreamingExecutor]
end
G --> N
N --> SL
SL --> EC
SL --> RC
SL --> ER
SL --> DC
SL --> NL
SL --> GLH
SR --> SL
SE --> SR

图表来源

事件类型系统 #

系统定义了丰富的事件类型来表示不同的执行阶段:

classDiagram
class NodeEvent {
<<enumeration>>
+NodeEventStart
+NodeEventProgress
+NodeEventComplete
+NodeEventError
+EventChainStart
+EventChainEnd
+EventToolStart
+EventToolEnd
+EventLLMStart
+EventLLMEnd
+EventToken
+EventCustom
}
class StreamEvent {
+time.Time Timestamp
+string NodeName
+NodeEvent Event
+interface State
+error Error
+map[string]interface Metadata
+time.Duration Duration
}
class StreamingListener {
+chan~StreamEvent~ eventChan
+StreamConfig config
+sync.RWMutex mutex
+int droppedEvents
+bool closed
+emitEvent(event StreamEvent)
+shouldEmit(event StreamEvent) bool
+OnNodeEvent(ctx, event, nodeName, state, err)
+OnChainStart(ctx, serialized, inputs, runID, parentRunID, tags, metadata)
+OnChainEnd(ctx, outputs, runID)
+OnLLMStart(ctx, serialized, prompts, runID, parentRunID, tags, metadata)
+OnLLMEnd(ctx, response, runID)
+OnToolStart(ctx, serialized, inputStr, runID, parentRunID, tags, metadata)
+OnToolEnd(ctx, output, runID)
+OnGraphStep(ctx, stepNode, state)
}
StreamEvent --> NodeEvent
StreamingListener --> StreamEvent

图表来源

章节来源

StreamMode 枚举类型 #

StreamMode 定义了流式处理的不同模式,每种模式决定了哪些事件会被过滤和传输。

常量定义 #

模式 类型 描述 使用场景
StreamModeValues StreamMode 发射每步后的完整状态 调试或需要渲染整个上下文的 UI
StreamModeUpdates StreamMode 发射每个节点的更新(增量) 显示进度(如"步骤 1 完成",“工具已执行”)
StreamModeMessages StreamMode 发射 LLM 消息/令牌(计划中) 实现打字机效果
StreamModeDebug StreamMode 发射所有内部事件 深度检查和调试

模式行为详解 #

flowchart TD
Start([开始流式处理]) --> CheckMode{检查 StreamMode }
CheckMode --> |StreamModeDebug| EmitAll[发射所有事件]
CheckMode --> |StreamModeValues| FilterValues[过滤 graph_step 事件]
CheckMode --> |StreamModeUpdates| FilterUpdates[过滤 ToolEnd/ChainEnd/NodeEventComplete 事件]
CheckMode --> |StreamModeMessages| FilterMessages[过滤 LLM 事件]
EmitAll --> SendToChannel[发送到事件通道]
FilterValues --> CheckEventType{检查事件类型}
FilterUpdates --> CheckEventType
FilterMessages --> CheckEventType
CheckEventType --> |graph_step| SendToChannel
CheckEventType --> |ToolEnd/ChainEnd/NodeEventComplete| SendToChannel
CheckEventType --> |EventLLMStart/EventLLMEnd| SendToChannel
CheckEventType --> |其他事件| Skip[跳过事件]
SendToChannel --> Backpressure{启用背压?}
Backpressure --> |是| HandleBackpressure[处理背压]
Backpressure --> |否| DropEvent[丢弃事件]
HandleBackpressure --> End([结束])
DropEvent --> End
Skip --> End

图表来源

章节来源

StreamConfig 配置结构体 #

StreamConfig 提供了流式处理的全面配置选项,控制事件通道的行为和性能特征。

字段说明 #

字段名 类型 默认值 描述
BufferSize int 1000 事件通道的缓冲区大小
EnableBackpressure bool true 是否启用背压处理机制
MaxDroppedEvents int 100 最大丢弃事件数量(超过时记录日志)
Mode StreamMode StreamModeDebug 流式模式配置

配置策略 #

graph LR
subgraph "配置决策树"
A[高吞吐量需求] --> B[增大 BufferSize]
C[低延迟要求] --> D[启用背压处理]
E[调试模式] --> F[使用 StreamModeDebug]
G[生产环境] --> H[优化性能参数]
end
subgraph "性能影响"
B --> I[减少丢弃事件]
D --> J[防止内存溢出]
F --> K[完整事件追踪]
H --> L[平衡资源使用]
end

默认配置 #

func DefaultStreamConfig() StreamConfig {
    return StreamConfig{
        BufferSize:         1000,
        EnableBackpressure: true,
        MaxDroppedEvents:   100,
        Mode:               StreamModeDebug,
    }
}

章节来源

StreamResult 结果结构体 #

StreamResult 包含流式执行的所有输出通道,提供了完整的异步执行结果访问接口。

通道类型 #

通道 类型 用途 关闭时机
Events <-chan StreamEvent 接收实时事件 执行完成时关闭
Result <-chan interface{} 接收最终结果 执行完成时关闭
Errors <-chan error 接收执行错误 执行完成时关闭
Done <-chan struct{} 通知执行完成 执行完成时关闭

生命周期管理 #

sequenceDiagram
participant User as 用户代码
participant SR as StreamResult
participant GR as GraphRunner
participant SC as StreamingChannel
User->>GR : Stream(ctx, initialState)
GR->>SC : 创建事件通道
GR->>SC : 创建结果通道
GR->>SC : 创建错误通道
GR->>SC : 创建完成通道
GR-->>SR : 返回 StreamResult
SR-->>User : 提供事件监听接口
loop 流式执行
GR->>SC : 发送事件
SC-->>User : 接收事件
end
GR->>SC : 发送最终结果
GR->>SC : 关闭所有通道
SC-->>User : 通知执行完成
User->>SR : 调用 Cancel()
SR->>GR : 清理资源

图表来源

章节来源

StreamingRunnable 核心组件 #

StreamingRunnable 是流式处理的核心执行器,封装了 ListenableRunnable 并添加了流式功能。

构造函数 #

func NewStreamingRunnable(runnable *ListenableRunnable, config StreamConfig) *StreamingRunnable
func NewStreamingRunnableWithDefaults(runnable *ListenableRunnable) *StreamingRunnable

Stream 方法 #

Stream 方法是流式处理的主要入口点,负责启动异步执行并返回 StreamResult

执行流程 #

flowchart TD
Start([开始 Stream]) --> CreateChannels[创建通信通道]
CreateChannels --> CreateContext[创建可取消上下文]
CreateContext --> CreateListener[创建 StreamingListener]
CreateListener --> AddListeners[为所有节点添加监听器]
AddListeners --> StartGoroutine[启动执行协程]
StartGoroutine --> ConfigListener[配置监听器回调]
ConfigListener --> ExecuteRunnable[执行 ListenableRunnable]
ExecuteRunnable --> SendResult[发送结果到通道]
SendResult --> Cleanup[清理资源]
Cleanup --> CloseChannels[关闭所有通道]
CloseChannels --> Return[返回 StreamResult]
Return --> UserCode[用户代码处理事件]

图表来源

资源管理 #

系统实现了完善的资源清理机制:

章节来源

StreamingListener 事件监听器 #

StreamingListener 实现了 NodeListenerGraphCallbackHandler 接口,负责捕获和过滤事件。

核心方法 #

emitEvent 方法 #

flowchart TD
Start([emitEvent 调用]) --> CheckClosed{检查是否关闭}
CheckClosed --> |是| Return[直接返回]
CheckClosed --> |否| FilterEvents[根据模式过滤事件]
FilterEvents --> ShouldEmit{shouldEmit 判断}
ShouldEmit --> |否| Return
ShouldEmit --> |是| TrySend[尝试发送事件]
TrySend --> SendSuccess{发送成功?}
SendSuccess --> |是| Return
SendSuccess --> |否| CheckBackpressure{启用背压?}
CheckBackpressure --> |是| HandleBackpressure[处理背压]
CheckBackpressure --> |否| DropEvent[丢弃事件]
HandleBackpressure --> Return
DropEvent --> Return

图表来源

shouldEmit 方法 #

该方法根据配置的 StreamMode 决定是否应该发出特定事件:

func (sl *StreamingListener) shouldEmit(event StreamEvent) bool {
    switch sl.config.Mode {
    case StreamModeDebug:
        return true
    case StreamModeValues:
        return event.Event == "graph_step"
    case StreamModeUpdates:
        return event.Event == EventToolEnd || 
               event.Event == EventChainEnd || 
               event.Event == NodeEventComplete
    case StreamModeMessages:
        return event.Event == EventLLMEnd || event.Event == EventLLMStart
    default:
        return true
    }
}

事件回调处理 #

StreamingListener 实现了多个回调方法来处理不同类型的事件:

方法 触发条件 事件类型
OnNodeEvent 节点事件发生 NodeEventStart, NodeEventComplete, NodeEventError
OnChainStart 链开始执行 EventChainStart
OnChainEnd 链执行完成 EventChainEnd
OnLLMStart LLM 调用开始 EventLLMStart
OnLLMEnd LLM 调用完成 EventLLMEnd
OnToolStart 工具调用开始 EventToolStart
OnToolEnd 工具调用完成 EventToolEnd
OnGraphStep 图执行步骤 自定义事件 "graph_step"

章节来源

StreamingExecutor 执行器 #

StreamingExecutor 提供了高级的流式执行接口,简化了事件处理和结果收集。

ExecuteWithCallback 方法 #

这是最常用的执行方式,提供回调函数处理实时事件:

sequenceDiagram
participant Client as 客户端
participant Executor as StreamingExecutor
participant Runnable as StreamingRunnable
participant Channels as 通信通道
Client->>Executor : ExecuteWithCallback(ctx, state, eventCallback, resultCallback)
Executor->>Runnable : Stream(ctx, state)
Runnable-->>Executor : StreamResult
loop 监听通道
Executor->>Channels : 监听 Events 通道
Channels-->>Executor : StreamEvent
Executor->>Client : eventCallback(event)
Executor->>Channels : 监听 Result 通道
Channels-->>Executor : 最终结果
Executor->>Client : resultCallback(result, err)
Executor->>Channels : 监听 Done 通道
Channels-->>Executor : 执行完成信号
end
Executor->>Runnable : Cancel()
Executor-->>Client : 返回错误如有

图表来源

ExecuteAsync 方法 #

提供异步执行能力,立即返回 StreamResult

func (se *StreamingExecutor) ExecuteAsync(ctx context.Context, initialState interface{}) *StreamResult {
    return se.runnable.Stream(ctx, initialState)
}

执行模式对比 #

执行方式 特点 适用场景
ExecuteWithCallback 同步阻塞,提供回调处理 简单的事件处理需求
ExecuteAsync 异步非阻塞,立即返回 需要并发处理多个流式任务

章节来源

流式模式详解 #

StreamModeValues - 完整状态模式 #

特点

使用场景

事件过滤规则

return event.Event == "graph_step"

StreamModeUpdates - 更新模式 #

特点

使用场景

事件过滤规则

return event.Event == EventToolEnd || 
       event.Event == EventChainEnd || 
       event.Event == NodeEventComplete

StreamModeMessages - 消息模式 #

特点

当前状态

StreamModeDebug - 调试模式 #

特点

使用场景

事件过滤规则

return true  // 发射所有事件

模式选择指南 #

flowchart TD
Start([选择流式模式]) --> NeedDebug{需要调试?}
NeedDebug --> |是| DebugMode[StreamModeDebug]
NeedDebug --> |否| NeedRealTime{需要实时反馈?}
NeedRealTime --> |是| ProgressMode[StreamModeUpdates]
NeedRealTime --> |否| StateMode{需要完整状态?}
StateMode --> |是| ValuesMode[StreamModeValues]
StateMode --> |否| MessagesMode[StreamModeMessages]
DebugMode --> Production[生产环境谨慎使用]
ProgressMode --> Recommended[推荐用于大多数场景]
ValuesMode --> HeavyLoad[注意性能开销]
MessagesMode --> Future[未来功能]

章节来源

使用示例 #

基础流式处理示例 #

以下展示了如何使用不同的流式模式:

// 创建流式消息图
g := graph.NewStreamingMessageGraph()

// 添加节点
g.AddNode("step_1", func(ctx context.Context, state interface{}) (interface{}, error) {
    time.Sleep(500 * time.Millisecond)
    return "Result from Step 1", nil
})

g.AddNode("step_2", func(ctx context.Context, state interface{}) (interface{}, error) {
    time.Sleep(500 * time.Millisecond)
    return "Result from Step 2", nil
})

// 设置入口点和边
g.SetEntryPoint("step_1")
g.AddEdge("step_1", "step_2")
g.AddEdge("step_2", graph.END)

// 配置流式模式
g.SetStreamConfig(graph.StreamConfig{
    Mode: graph.StreamModeUpdates,
})

// 编译和执行
runnable, _ := g.CompileStreaming()
streamResult := runnable.Stream(context.Background(), nil)

// 处理事件
for event := range streamResult.Events {
    fmt.Printf("[%s] Node: %s, Event: %s, State: %v\n",
        event.Timestamp.Format("15:04:05"),
        event.NodeName,
        event.Event,
        event.State)
}

高级执行器使用 #

// 创建执行器
executor := graph.NewStreamingExecutor(runnable)

// 使用回调方式执行
err := executor.ExecuteWithCallback(
    context.Background(),
    "initial_state",
    func(event graph.StreamEvent) {
        fmt.Printf("[%.3fs] %s: %v\n", 
            time.Since(startTime).Seconds(), 
            event.Event, 
            event.State)
    },
    func(result interface{}, err error) {
        if err != nil {
            fmt.Printf("Error: %v\n", err)
        } else {
            fmt.Printf("Final result: %v\n", result)
        }
    },
)

异步执行示例 #

// 异步执行
streamResult := executor.ExecuteAsync(context.Background(), "async_state")

// 立即返回,可以在后台处理事件
go func() {
    for event := range streamResult.Events {
        processEvent(event)
    }
}()

// 继续执行其他逻辑
performOtherWork()

// 等待完成
finalResult := <-streamResult.Result

章节来源

最佳实践 #

性能优化建议 #

  1. 合理配置缓冲区大小

    config := graph.StreamConfig{
        BufferSize:         1000,        // 根据预期事件量调整
        EnableBackpressure: true,        // 生产环境建议启用
        MaxDroppedEvents:   100,         // 设置合理的丢弃阈值
    }
    
  2. 选择合适的流式模式

    • 开发调试:使用 StreamModeDebug
    • 生产环境:优先考虑 StreamModeUpdates
    • 高性能场景:避免不必要的状态广播
  3. 事件处理优化

    // 使用缓冲的通道处理大量事件
    eventChan := make(chan graph.StreamEvent, 1000)
    
    // 在单独的 goroutine 中处理事件
    go func() {
        for event := range eventChan {
            // 快速处理逻辑
            processEvent(event)
        }
    }()
    

错误处理策略 #

func robustStreamExecution() error {
    executor := graph.NewStreamingExecutor(runnable)
    
    return executor.ExecuteWithCallback(
        context.Background(),
        initialState,
        func(event graph.StreamEvent) {
            defer func() {
                if r := recover(); r != nil {
                    log.Printf("Event handler panic: %v", r)
                }
            }()
            
            // 安全的事件处理
            handleEventSafely(event)
        },
        func(result interface{}, err error) {
            if err != nil {
                log.Printf("Execution failed: %v", err)
                // 实现重试或降级策略
            }
        },
    )
}

资源管理 #

func managedStreamExecution() {
    executor := graph.NewStreamingExecutor(runnable)
    streamResult := executor.ExecuteAsync(context.Background(), state)
    
    // 确保资源清理
    defer func() {
        streamResult.Cancel()
        // 等待清理完成
        time.Sleep(100 * time.Millisecond)
    }()
    
    // 处理事件
    for event := range streamResult.Events {
        // 处理逻辑
    }
}

监控和指标 #

type MonitoringListener struct {
    droppedEvents int
    totalEvents   int
}

func (ml *MonitoringListener) OnNodeEvent(ctx context.Context, event graph.NodeEvent, 
                                        nodeName string, state interface{}, err error) {
    ml.totalEvents++
    
    // 监控丢弃事件
    if droppedCount := ml.GetDroppedEventsCount(); droppedCount > 0 {
        log.Printf("Dropped %d events", droppedCount)
    }
}

故障排除 #

常见问题及解决方案 #

1. 事件丢失问题 #

症状:某些事件没有被接收到

可能原因

解决方案

config := graph.StreamConfig{
    BufferSize:         5000,        // 增大缓冲区
    EnableBackpressure: true,        // 启用背压处理
    MaxDroppedEvents:   500,         // 增加丢弃阈值
}

2. 内存泄漏 #

症状:长时间运行后内存持续增长

可能原因

解决方案

// 确保正确清理
defer streamResult.Cancel()

// 使用超时控制
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// 实现优雅关闭
done := make(chan struct{})
go func() {
    defer close(done)
    // 处理事件
}()

select {
case <-done:
    // 正常完成
case <-ctx.Done():
    // 超时处理
}

3. 性能问题 #

症状:事件处理速度慢,影响整体性能

诊断步骤

  1. 检查事件过滤规则
  2. 监控通道使用情况
  3. 分析事件处理逻辑

优化方案

// 使用高效的事件处理
func efficientEventHandler(event graph.StreamEvent) {
    // 避免阻塞操作
    go func() {
        // 异步处理耗时操作
        heavyProcessing(event)
    }()
}

// 减少不必要的状态序列化
func optimizedStateHandler(state interface{}) interface{} {
    // 只处理必要的字段
    if msg, ok := state.(map[string]interface{}); ok {
        return msg["important_field"]
    }
    return state
}

4. 调试困难 #

症状:难以理解事件流式传输的行为

解决方案

// 使用调试模式
g.SetStreamConfig(graph.StreamConfig{
    Mode: graph.StreamModeDebug,
})

// 添加自定义事件监听器
type DebugListener struct{}

func (dl *DebugListener) OnNodeEvent(ctx context.Context, event graph.NodeEvent, 
                                   nodeName string, state interface{}, err error) {
    log.Printf("DEBUG: [%s] %s - %v", nodeName, event, state)
}

// 注册监听器
node.AddListener(&DebugListener{})

调试工具和技巧 #

1. 事件计数器 #

type EventCounter struct {
    counts map[string]int
    mu     sync.Mutex
}

func (ec *EventCounter) OnNodeEvent(ctx context.Context, event graph.NodeEvent, 
                                  nodeName string, state interface{}, err error) {
    ec.mu.Lock()
    defer ec.mu.Unlock()
    
    key := fmt.Sprintf("%s:%s", nodeName, event)
    ec.counts[key]++
}

func (ec *EventCounter) Report() {
    ec.mu.Lock()
    defer ec.mu.Unlock()
    
    for key, count := range ec.counts {
        fmt.Printf("Event %s: %d times\n", key, count)
    }
}

2. 性能分析 #

type PerformanceAnalyzer struct {
    startTime time.Time
    events    []graph.StreamEvent
}

func (pa *PerformanceAnalyzer) OnNodeEvent(ctx context.Context, event graph.NodeEvent, 
                                         nodeName string, state interface{}, err error) {
    pa.events = append(pa.events, graph.StreamEvent{
        Timestamp: time.Now(),
        NodeName:  nodeName,
        Event:     event,
        State:     state,
    })
}

func (pa *PerformanceAnalyzer) Analyze() {
    if len(pa.events) < 2 {
        return
    }
    
    // 计算事件间隔
    for i := 1; i < len(pa.events); i++ {
        interval := pa.events[i].Timestamp.Sub(pa.events[i-1].Timestamp)
        fmt.Printf("Interval %d-%d: %v\n", i-1, i, interval)
    }
}

章节来源

总结 #

LangGraph Go 的流式处理模块提供了一个功能强大且灵活的事件驱动架构,支持多种粒度的事件流式传输。通过合理配置 StreamModeStreamConfig 和使用适当的执行模式,开发者可以构建高性能、响应式的应用程序。

关键要点:

通过遵循最佳实践和故障排除指南,可以充分发挥流式处理模块的潜力,构建高质量的实时应用程序。