流式输出 #

目录 #

  1. 简介
  2. 核心概念
  3. StreamMode 枚举详解
  4. StreamConfig 配置结构
  5. StreamingListener 实现机制
  6. StreamResult 数据结构
  7. StreamingRunnable 执行流程
  8. 实际应用示例
  9. 背压处理策略
  10. 性能优化建议
  11. 总结

简介 #

LangGraph Go 的流式输出机制提供了一套完整的实时事件流处理系统,允许开发者在图执行过程中捕获和处理各种事件。该机制支持多种流式模式,能够满足从简单的进度跟踪到复杂的实时交互等各种应用场景。

流式输出的核心优势在于:

核心概念 #

事件类型层次结构 #

classDiagram
class NodeEvent {
<<enumeration>>
+NodeEventStart
+NodeEventProgress
+NodeEventComplete
+NodeEventError
+EventChainStart
+EventChainEnd
+EventToolStart
+EventToolEnd
+EventLLMStart
+EventLLMEnd
+EventToken
+EventCustom
}
class StreamEvent {
+time.Time Timestamp
+string NodeName
+NodeEvent Event
+interface State
+error Error
+map[string]interface Metadata
+time.Duration Duration
}
class CallbackHandler {
<<interface>>
+OnChainStart()
+OnChainEnd()
+OnChainError()
+OnLLMStart()
+OnLLMEnd()
+OnLLMError()
+OnToolStart()
+OnToolEnd()
+OnToolError()
+OnRetrieverStart()
+OnRetrieverEnd()
+OnRetrieverError()
}
class GraphCallbackHandler {
<<interface>>
+OnGraphStep()
}
class StreamingListener {
+chan~StreamEvent~ eventChan
+StreamConfig config
+sync.RWMutex mutex
+int droppedEvents
+bool closed
+emitEvent()
+shouldEmit()
+handleBackpressure()
}
NodeEvent --> StreamEvent : "包含"
CallbackHandler <|-- GraphCallbackHandler : "继承"
GraphCallbackHandler <|.. StreamingListener : "实现"
CallbackHandler <|.. StreamingListener : "实现"

图表来源

流程架构图 #

sequenceDiagram
participant Client as "客户端"
participant SR as "StreamingRunnable"
participant SL as "StreamingListener"
participant LN as "ListenableNode"
participant CB as "CallbackHandler"
Client->>SR : Stream(ctx, initialState)
SR->>SL : NewStreamingListener(eventChan, config)
SR->>LN : AddListener(streamingListener)
SR->>CB : OnChainStart()
loop 图执行循环
LN->>CB : OnNodeEvent()
CB->>SL : emitEvent()
SL->>SL : shouldEmit()
alt 事件过滤通过
SL->>Client : 发送事件到 eventChan
else 事件被过滤
SL->>SL : 忽略事件
end
CB->>SL : OnGraphStep()
SL->>SL : emitEvent()
SL->>Client : 发送步骤事件
end
CB->>SL : OnChainEnd()
SL->>Client : 发送最终结果
SR->>LN : RemoveListener(streamingListener)
SR->>Client : 关闭通道

图表来源

StreamMode 枚举详解 #

StreamMode 定义了四种不同的流式输出模式,每种模式都有其特定的应用场景和过滤逻辑。

模式对比表 #

模式 描述 适用场景 过滤条件
StreamModeValues 输出完整状态 调试、UI 渲染 仅接收 graph_step 事件
StreamModeUpdates 输出节点更新 进度跟踪、状态监控 接收 ToolEndChainEndNodeEventComplete 事件
StreamModeMessages 输出 LLM 消息 实时聊天、文本生成 接收 LLMStartLLMEnd 事件
StreamModeDebug 输出所有事件 深度调试、开发测试 接收所有事件

详细说明 #

StreamModeValues - 完整状态模式 #

StreamModeUpdates - 更新模式 #

StreamModeMessages - 消息模式 #

StreamModeDebug - 调试模式 #

章节来源

StreamConfig 配置结构 #

StreamConfig 提供了对流式行为的精细控制,包含以下关键参数:

参数详解 #

参数名 类型 默认值 描述 性能影响
BufferSize int 1000 事件通道缓冲区大小 影响内存使用和背压处理能力
EnableBackpressure bool true 是否启用背压处理 控制事件丢失率
MaxDroppedEvents int 100 最大丢弃事件数 超出后触发警告
Mode StreamMode StreamModeDebug 流式输出模式 决定事件过滤规则

配置策略 #

高吞吐量场景 #

config := StreamConfig{
    BufferSize:         5000,
    EnableBackpressure: false,
    MaxDroppedEvents:   500,
    Mode:               StreamModeUpdates,
}

低延迟场景 #

config := StreamConfig{
    BufferSize:         100,
    EnableBackpressure: true,
    MaxDroppedEvents:   10,
    Mode:               StreamModeDebug,
}

生产环境推荐 #

config := StreamConfig{
    BufferSize:         2000,
    EnableBackpressure: true,
    MaxDroppedEvents:   200,
    Mode:               StreamModeValues,
}

章节来源

StreamingListener 实现机制 #

StreamingListener 是流式输出的核心组件,它同时实现了 NodeListenerCallbackHandler 接口,负责捕获和过滤事件。

核心功能架构 #

flowchart TD
A[事件源] --> B{事件类型判断}
B --> |节点事件| C[OnNodeEvent]
B --> |链路事件| D[OnChainStart/End/Error]
B --> |LLM事件| E[OnLLMStart/End/Error]
B --> |工具事件| F[OnToolStart/End/Error]
B --> |图步骤| G[OnGraphStep]
C --> H[创建 StreamEvent]
D --> H
E --> H
F --> H
G --> H
H --> I{shouldEmit过滤}
I --> |通过| J[emitEvent]
I --> |拒绝| K[忽略事件]
J --> L{背压处理}
L --> |启用| M[handleBackpressure]
L --> |禁用| N[直接丢弃]
M --> O[记录丢弃计数]
N --> P[事件丢失]

图表来源

事件过滤机制 #

shouldEmit 方法根据当前配置的 StreamMode 来决定是否发送事件:

flowchart TD
A[shouldEmit调用] --> B{检查配置模式}
B --> |StreamModeDebug| C[返回true - 发送所有事件]
B --> |StreamModeValues| D{检查事件类型}
D --> |event.Event == "graph_step"| E[返回true]
D --> |其他事件| F[返回false]
B --> |StreamModeUpdates| G{检查事件类型}
G --> |EventToolEnd 或 EventChainEnd 或 NodeEventComplete| H[返回true]
G --> |其他事件| F
B --> |StreamModeMessages| I{检查事件类型}
I --> |EventLLMStart 或 EventLLMEnd| J[返回true]
I --> |其他事件| F

图表来源

并发安全设计 #

StreamingListener 使用读写锁确保线程安全:

章节来源

StreamResult 数据结构 #

StreamResult 包含四个重要的通道,提供了完整的流式执行结果访问:

通道功能说明 #

通道名 类型 用途 关闭时机
Events <-chan StreamEvent 实时事件流 图执行完成时
Result <-chan interface{} 最终执行结果 成功完成时
Errors <-chan error 错误信息 发生错误时
Done <-chan struct{} 完成信号 所有通道关闭时

使用模式 #

基本消费模式 #

streamResult := runnable.Stream(ctx, initialState)
for {
    select {
    case event, ok := <-streamResult.Events:
        if !ok { break }
        processEvent(event)
    case result, ok := <-streamResult.Result:
        if ok { handleResult(result) }
    case err, ok := <-streamResult.Errors:
        if ok { handleError(err) }
    case <-streamResult.Done:
        return
    }
}

简化回调模式 #

executor := NewStreamingExecutor(runnable)
err := executor.ExecuteWithCallback(
    ctx, 
    initialState,
    func(event StreamEvent) { /* 实时处理 */ },
    func(result interface{}, err error) { /* 最终处理 */ },
)

章节来源

StreamingRunnable 执行流程 #

StreamingRunnable 是流式执行的主要入口点,它封装了完整的执行生命周期管理。

执行生命周期 #

sequenceDiagram
participant Client as "客户端"
participant SR as "StreamingRunnable"
participant EC as "执行上下文"
participant SL as "StreamingListener"
participant Nodes as "节点集合"
Client->>SR : Stream(ctx, initialState)
SR->>EC : 创建通道和上下文
SR->>SL : NewStreamingListener(eventChan, config)
SR->>Nodes : AddListener(streamingListener)
par 异步执行
SR->>EC : 启动执行协程
EC->>Nodes : 执行图节点
Nodes->>SL : 触发事件监听
SL->>Client : 发送事件
end
EC->>SL : OnChainEnd()
SL->>Client : 发送最终结果
SR->>Nodes : RemoveListener(streamingListener)
SR->>EC : 关闭通道
SR->>Client : 返回 StreamResult

图表来源

资源管理策略 #

生命周期管理 #

  1. 初始化阶段:创建通道、监听器、上下文
  2. 执行阶段:添加监听器到所有节点,启动异步执行
  3. 清理阶段:移除监听器,关闭通道,释放资源

错误处理机制 #

章节来源

实际应用示例 #

基础流式模式示例 #

参考 examples/streaming_modes/main.go,展示了如何在不同模式下处理流式数据:

更新模式 (Updates Mode) #

// 配置为更新模式
g.SetStreamConfig(graph.StreamConfig{
    Mode: graph.StreamModeUpdates,
})

// 执行并处理事件
streamResult := runnable.Stream(context.Background(), nil)
for event := range streamResult.Events {
    fmt.Printf("[%s] Node: %s, Event: %s, State: %v\n",
        event.Timestamp.Format("15:04:05"),
        event.NodeName,
        event.Event,
        event.State)
}

消息模式 (Messages Mode) #

// 配置为消息模式
g.SetStreamConfig(graph.StreamConfig{
    Mode: graph.StreamModeMessages,
})

// 处理 LLM 令牌流
streamResult := runnable.Stream(context.Background(), userInput)
for event := range streamResult.Events {
    if event.Event == graph.EventLLMEnd {
        fmt.Print(event.State)
    }
}

高级管道示例 #

参考 examples/streaming_pipeline/main.go,展示了复杂场景下的流式处理:

多监听器组合 #

// 添加多个监听器
progressListener := graph.NewProgressListener()
chatListener := graph.NewChatListener()
metricsListener := graph.NewMetricsListener()

// 将监听器绑定到节点
analyze.AddListener(progressListener)
analyze.AddListener(chatListener)
analyze.AddListener(metricsListener)

// 统一处理所有事件
executor.ExecuteWithCallback(ctx, input, 
    func(event graph.StreamEvent) {
        fmt.Printf("[%s] Event: %s from %s\n",
            time.Now().Format("15:04:05.000"),
            event.Event, event.NodeName)
    },
    func(result interface{}, err error) {
        // 处理最终结果
    },
)

Web 应用集成 #

SSE (Server-Sent Events) 实现 #

func streamHandler(w http.ResponseWriter, r *http.Request) {
    // 设置 SSE 头部
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    
    // 创建流式执行器
    executor := graph.NewStreamingExecutor(runnable)
    
    // 执行并发送事件
    executor.ExecuteWithCallback(r.Context(), input,
        func(event graph.StreamEvent) {
            // 发送事件到客户端
            fmt.Fprintf(w, "data: %v\n\n", event)
            w.(http.Flusher).Flush()
        },
        func(result interface{}, err error) {
            // 发送完成信号
            fmt.Fprintf(w, "data: [DONE]\n\n")
            w.(http.Flusher).Flush()
        },
    )
}

章节来源

背压处理策略 #

背压处理是流式输出系统的关键特性,防止快速产生的事件淹没消费者。

背压检测机制 #

flowchart TD
A[事件发送请求] --> B{通道是否满}
B --> |否| C[直接发送成功]
B --> |是| D{启用背压处理?}
D --> |否| E[丢弃事件]
D --> |是| F[handleBackpressure]
F --> G[增加丢弃计数]
G --> H{超过最大丢弃阈值?}
H --> |是| I[记录警告日志]
H --> |否| J[继续处理]
C --> K[事件发送完成]
E --> L[事件丢失]
I --> M[事件丢失]
J --> N[等待下次重试]

图表来源

配置优化策略 #

高吞吐量场景 #

config := StreamConfig{
    BufferSize:         5000,      // 增大缓冲区
    EnableBackpressure: false,     // 禁用背压,允许丢弃
    MaxDroppedEvents:   1000,      // 允许更多丢弃
}

低延迟场景 #

config := StreamConfig{
    BufferSize:         100,       // 减小缓冲区
    EnableBackpressure: true,      // 启用背压
    MaxDroppedEvents:   10,        // 严格限制丢弃
}

生产环境平衡 #

config := StreamConfig{
    BufferSize:         2000,      // 中等缓冲区
    EnableBackpressure: true,      // 启用背压
    MaxDroppedEvents:   200,       // 适度限制丢弃
}

监控和诊断 #

丢弃事件统计 #

// 获取丢弃事件数量
droppedCount := streamingListener.GetDroppedEventsCount()
if droppedCount > 0 {
    log.Printf("警告: 已丢弃 %d 个事件", droppedCount)
}

性能指标收集 #

type MetricsCollector struct {
    totalEvents     int64
    droppedEvents   int64
    processingTime  time.Duration
}

func (mc *MetricsCollector) RecordEvent(duration time.Duration) {
    atomic.AddInt64(&mc.totalEvents, 1)
    mc.processingTime += duration
}

章节来源

性能优化建议 #

内存优化 #

缓冲区大小调优 #

事件序列化优化 #

// 避免深拷贝
type OptimizedStreamEvent struct {
    Timestamp time.Time
    NodeName  string
    EventType string
    StateRef  unsafe.Pointer // 使用指针避免复制
    Error     error
    Metadata  map[string]interface{}
}

并发优化 #

监听器并发处理 #

// 使用工作池模式
type EventWorkerPool struct {
    workers    int
    jobQueue   chan StreamEvent
    workerWg   sync.WaitGroup
}

func (p *EventWorkerPool) ProcessEvent(event StreamEvent) {
    p.jobQueue <- event
}

func (p *EventWorkerPool) Start() {
    for i := 0; i < p.workers; i++ {
        p.workerWg.Add(1)
        go func() {
            defer p.workerWg.Done()
            for event := range p.jobQueue {
                // 处理事件
            }
        }()
    }
}

网络优化 #

批量发送策略 #

type BatchSender struct {
    batchSize    int
    batchTimeout time.Duration
    events       []StreamEvent
    flushCh      chan struct{}
}

func (bs *BatchSender) AddEvent(event StreamEvent) {
    bs.events = append(bs.events, event)
    if len(bs.events) >= bs.batchSize {
        bs.flushCh <- struct{}{}
    }
}

func (bs *BatchSender) Flush() {
    if len(bs.events) > 0 {
        // 批量发送事件
        bs.sendBatch(bs.events)
        bs.events = nil
    }
}

监控和告警 #

关键指标监控 #

type StreamingMetrics struct {
    ActiveListeners    int
    EventRate          float64
    DropRate           float64
    AverageProcessingTime time.Duration
    ChannelUtilization float64
}

func (sm *StreamingMetrics) Collect() {
    // 收集各种指标
    sm.DropRate = float64(sm.DroppedEvents) / float64(sm.TotalEvents)
    sm.ChannelUtilization = float64(sm.ActiveChannels) / float64(sm.MaxChannels)
}

总结 #

LangGraph Go 的流式输出机制提供了一个强大而灵活的实时事件处理框架。通过合理配置 StreamModeStreamConfig 参数,结合 StreamingListener 的事件过滤和 StreamingRunnable 的执行管理,开发者可以构建出高性能、可扩展的流式应用。

核心优势 #

  1. 多模式支持:四种不同的流式模式满足各种应用场景需求
  2. 灵活配置:丰富的配置选项允许针对具体场景进行优化
  3. 可靠机制:完善的背压处理和错误恢复确保系统稳定性
  4. 易于集成:清晰的 API 设计便于与现有系统集成

最佳实践 #

  1. 选择合适的模式:根据具体需求选择 StreamModeUpdatesStreamModeValues
  2. 合理配置缓冲区:根据流量特征调整 BufferSizeMaxDroppedEvents
  3. 实施背压策略:在高负载场景下启用背压处理
  4. 监控关键指标:定期检查事件处理速率和丢弃率

通过深入理解和正确使用这些机制,开发者可以构建出响应迅速、用户体验优秀的流式应用系统。