流式输出 #
本文档中引用的文件
- examples/streaming_modes/main.go
- examples/streaming_pipeline/main.go
- examples/streaming_modes/README.md
- examples/streaming_pipeline/README.md
- examples/listeners/main.go
- graph/streaming.go
- graph/listeners.go
- graph/retry.go
- showcases/deerflow/web/index.html
- showcases/deerflow/web/app.js
- showcases/deerflow/main.go
目录 #
简介 #
LangGraph Go 提供了强大的流式输出功能,支持实时数据处理和用户交互。该系统通过多种流模式(Streaming Modes)提供灵活的数据传输方式,从逐令牌输出到完整状态更新,满足不同应用场景的需求。
流式输出的核心优势包括:
- 实时反馈:用户可以立即看到处理进度
- 背压控制:防止系统过载
- 错误恢复:优雅处理各种异常情况
- 多模式支持:根据需求选择合适的流模式
项目结构 #
graph TB
subgraph "流式示例"
A[streaming_modes] --> A1[基础流模式演示]
B[streaming_pipeline] --> B1[管道流处理]
C[listeners] --> C1[事件监听器]
end
subgraph "核心流式模块"
D[graph/streaming.go] --> D1[流配置与执行]
E[graph/listeners.go] --> E1[事件监听机制]
end
subgraph "可视化展示"
F[showcases/deerflow] --> F1[Web界面]
G[web/index.html] --> G1[HTML模板]
H[web/app.js] --> H1[JavaScript逻辑]
end
A1 --> D1
B1 --> D1
C1 --> E1
F1 --> D1
H1 --> D1
图表来源
- [examples/streaming_modes/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/streaming_modes/main.go#L1-L55)
- [examples/streaming_pipeline/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/streaming_pipeline/main.go#L1-L80)
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L1-L476)
核心组件 #
流配置系统 #
流式输出的核心是 StreamConfig 结构体,它定义了流行为的各种参数:
classDiagram
class StreamConfig {
+int BufferSize
+bool EnableBackpressure
+int MaxDroppedEvents
+StreamMode Mode
+DefaultStreamConfig() StreamConfig
}
class StreamMode {
<<enumeration>>
StreamModeValues
StreamModeUpdates
StreamModeMessages
StreamModeDebug
}
class StreamResult {
+Events <-chan StreamEvent
+Result <-chan interface
+Errors <-chan error
+Done <-chan struct{}
+Cancel context.CancelFunc
}
StreamConfig --> StreamMode
StreamConfig --> StreamResult
图表来源
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L24-L46)
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L9-L21)
事件监听机制 #
系统通过监听器模式实现事件捕获和处理:
classDiagram
class NodeListener {
<<interface>>
+OnNodeEvent(ctx, event, nodeName, state, err)
}
class StreamingListener {
-eventChan chan~StreamEvent~
-config StreamConfig
-mutex sync.RWMutex
-droppedEvents int
-closed bool
+emitEvent(event)
+shouldEmit(event) bool
+handleBackpressure()
}
class StreamEvent {
+Timestamp time.Time
+NodeName string
+Event NodeEvent
+State interface
+Error error
+Metadata map[string]interface
+Duration time.Duration
}
NodeListener <|.. StreamingListener
StreamingListener --> StreamEvent
图表来源
- [graph/listeners.go](https://github.com/smallnest/langgraphgo/blob/main/graph/listeners.go#L51-L55)
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L66-L74)
章节来源
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L24-L64)
- [graph/listeners.go](https://github.com/smallnest/langgraphgo/blob/main/graph/listeners.go#L51-L87)
架构概览 #
LangGraph Go 的流式输出架构采用分层设计,确保高性能和可扩展性:
graph TD
subgraph "应用层"
A[用户请求] --> B[流式执行器]
end
subgraph "控制层"
B --> C[流配置管理]
C --> D[监听器注册]
end
subgraph "执行层"
D --> E[节点执行引擎]
E --> F[事件生成器]
end
subgraph "传输层"
F --> G[流监听器]
G --> H[事件过滤器]
H --> I[通道缓冲区]
end
subgraph "客户端层"
I --> J[事件消费者]
J --> K[实时处理]
end
图表来源
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L289-L358)
- [examples/streaming_pipeline/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/streaming_pipeline/main.go#L50-L71)
详细组件分析 #
流式执行器 #
StreamingExecutor 是流式输出的主要入口点,提供了高级的执行接口:
sequenceDiagram
participant Client as 客户端
participant Executor as StreamingExecutor
participant Runnable as StreamingRunnable
participant Listener as StreamingListener
participant Node as 节点
Client->>Executor : ExecuteWithCallback()
Executor->>Runnable : Stream()
Runnable->>Listener : 注册监听器
Runnable->>Node : 执行节点
Node->>Listener : 发送事件
Listener->>Executor : 过滤事件
Executor->>Client : 回调函数
Node->>Runnable : 返回结果
Runnable->>Executor : 最终结果
Executor->>Client : 结果回调
图表来源
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L414-L465)
- [examples/streaming_pipeline/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/streaming_pipeline/main.go#L52-L71)
事件类型系统 #
系统定义了丰富的事件类型来表示不同的执行阶段:
| 事件类型 | 描述 | 触发时机 |
|---|---|---|
NodeEventStart |
节点开始执行 | 节点函数被调用时 |
NodeEventComplete |
节点成功完成 | 节点返回成功结果时 |
NodeEventError |
节点执行出错 | 节点返回错误时 |
EventChainStart |
链开始执行 | 图执行开始时 |
EventChainEnd |
链执行完成 | 图执行结束时 |
EventToolStart |
工具开始执行 | 工具调用开始时 |
EventToolEnd |
工具执行完成 | 工具调用结束时 |
EventLLMStart |
LLM调用开始 | LLM请求发送时 |
EventLLMEnd |
LLM调用完成 | LLM响应接收时 |
章节来源
- [graph/listeners.go](https://github.com/smallnest/langgraphgo/blob/main/graph/listeners.go#L14-L48)
流模式详解 #
更新模式 (Updates Mode) #
更新模式是最常用的流模式,只发送节点输出:
flowchart TD
A[节点开始] --> B[执行节点]
B --> C{检查模式}
C --> |Updates| D[发送节点输出]
C --> |其他模式| E[跳过事件]
D --> F[继续下一个节点]
E --> F
F --> G{还有节点?}
G --> |是| B
G --> |否| H[执行完成]
图表来源
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L124-L126)
值模式 (Values Mode) #
值模式发送完整的图状态:
flowchart TD
A[图步骤] --> B{检查模式}
B --> |Values| C[发送完整状态]
B --> |其他模式| D[跳过事件]
C --> E[等待下一步]
D --> E
E --> F{继续执行?}
F --> |是| A
F --> |否| G[完成]
图表来源
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L117-L122)
消息模式 (Messages Mode) #
消息模式专门用于 LLM 输出流:
flowchart TD
A[LLM开始] --> B[接收令牌]
B --> C{检查模式}
C --> |Messages| D[发送令牌事件]
C --> |其他模式| E[跳过事件]
D --> F[累积令牌]
E --> F
F --> G{LLM完成?}
G --> |否| B
G --> |是| H[发送最终消息]
图表来源
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L127-L129)
调试模式 (Debug Mode) #
调试模式提供最详细的事件信息:
flowchart TD
A[所有事件] --> B{检查模式}
B --> |Debug| C[无条件发送]
B --> |其他模式| D[条件过滤]
C --> E[记录详细信息]
D --> F{匹配条件?}
F --> |是| G[发送事件]
F --> |否| H[丢弃事件]
E --> I[继续执行]
G --> I
H --> I
图表来源
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L114-L116)
章节来源
- [examples/streaming_modes/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/streaming_modes/main.go#L32-L35)
- [examples/streaming_pipeline/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/streaming_pipeline/main.go#L32-L42)
背压处理机制 #
背压检测与处理 #
系统实现了智能的背压处理机制来防止内存溢出:
flowchart TD
A[发送事件] --> B{通道是否满?}
B --> |否| C[直接发送]
B --> |是| D{启用背压处理?}
D --> |否| E[丢弃事件]
D --> |是| F[记录丢弃计数]
F --> G[等待空间可用]
G --> H{超时?}
H --> |是| I[强制丢弃]
H --> |否| J[重新尝试发送]
J --> A
C --> K[发送成功]
E --> L[事件丢失]
I --> L
图表来源
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L99-L109)
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L252-L261)
配置参数 #
| 参数 | 类型 | 默认值 | 描述 |
|---|---|---|---|
BufferSize |
int | 1000 | 事件通道缓冲区大小 |
EnableBackpressure |
bool | true | 是否启用背压处理 |
MaxDroppedEvents |
int | 100 | 最大允许丢弃事件数 |
章节来源
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L25-L33)
错误处理与超时 #
上下文取消机制 #
系统完全支持 Go 的上下文取消机制:
sequenceDiagram
participant Client as 客户端
participant Context as 上下文
participant Executor as 执行器
participant Node as 节点
Client->>Context : 创建带超时的上下文
Client->>Executor : 开始执行
Executor->>Node : 启动节点
Node->>Context : 检查取消信号
Context-->>Node : 取消信号
Node-->>Executor : 返回取消错误
Executor-->>Client : 返回错误
Client->>Context : 清理资源
图表来源
- [graph/retry.go](https://github.com/smallnest/langgraphgo/blob/main/graph/retry.go#L126-L150)
超时处理 #
系统提供了多种超时处理机制:
flowchart TD
A[开始执行] --> B[创建超时上下文]
B --> C[启动节点]
C --> D{等待结果}
D --> |正常完成| E[返回结果]
D --> |超时| F[取消操作]
F --> G[清理资源]
G --> H[返回超时错误]
E --> I[执行完成]
H --> I
图表来源
- [graph/retry.go](https://github.com/smallnest/langgraphgo/blob/main/graph/retry.go#L128-L149)
错误恢复策略 #
| 策略 | 适用场景 | 实现方式 |
|---|---|---|
| 重试机制 | 临时性错误 | 指数退避算法 |
| 熔断器 | 服务不可用 | 状态机切换 |
| 限流 | 防止过载 | 时间窗口计数 |
| 超时 | 长时间无响应 | 上下文超时 |
章节来源
- [graph/retry.go](https://github.com/smallnest/langgraphgo/blob/main/graph/retry.go#L101-L363)
前端可视化示例 #
WebSocket 实现 #
前端通过 Server-Sent Events (SSE) 实现实时通信:
sequenceDiagram
participant Browser as 浏览器
participant Server as 服务器
participant Graph as 流式图
participant UI as 用户界面
Browser->>Server : 建立SSE连接
Server->>Graph : 启动流式执行
Graph->>Server : 发送事件
Server->>Browser : 推送事件
Browser->>UI : 更新界面
Graph->>Server : 发送完成事件
Server->>Browser : 关闭连接
Browser->>UI : 显示最终结果
图表来源
- [showcases/deerflow/web/app.js](https://github.com/smallnest/langgraphgo/blob/main/showcases/deerflow/web/app.js#L69-L109)
- [showcases/deerflow/main.go](https://github.com/smallnest/langgraphgo/blob/main/showcases/deerflow/main.go#L67-L163)
HTML 结构 #
前端界面采用响应式设计,包含聊天和报告面板:
graph TB
subgraph "聊天界面"
A[消息容器] --> B[用户消息]
A --> C[系统消息]
D[输入区域] --> E[文本框]
D --> F[发送按钮]
end
subgraph "报告面板"
G[标签页] --> H[报告标签]
G --> I[活动标签]
J[状态指示器] --> K[运行状态]
L[内容区域] --> M[报告内容]
L --> N[活动日志]
end
O[整体布局] --> A
O --> D
O --> G
O --> J
O --> L
图表来源
- [showcases/deerflow/web/index.html](https://github.com/smallnest/langgraphgo/blob/main/showcases/deerflow/web/index.html#L40-L96)
JavaScript 处理逻辑 #
前端通过事件源处理不同类型的消息:
| 事件类型 | 处理方式 | 显示内容 |
|---|---|---|
update |
更新状态指示器 | 步骤信息、进度 |
log |
添加到日志列表 | 详细执行日志 |
result |
显示最终报告 | Markdown格式报告 |
error |
显示错误消息 | 错误详情 |
章节来源
- [showcases/deerflow/web/app.js](https://github.com/smallnest/langgraphgo/blob/main/showcases/deerflow/web/app.js#L72-L102)
- [showcases/deerflow/web/index.html](https://github.com/smallnest/langgraphgo/blob/main/showcases/deerflow/web/index.html#L1-L103)
最佳实践 #
性能优化建议 #
-
合理设置缓冲区大小
- 小型应用:100-500
- 中型应用:1000-5000
- 大型应用:10000+
-
选择合适的流模式
- 实时反馈:使用 Updates 模式
- 调试开发:使用 Debug 模式
- 生产环境:根据需求选择
-
监控背压情况
- 定期检查丢弃事件数量
- 调整缓冲区大小
- 实施降级策略
错误处理最佳实践 #
flowchart TD
A[开始流式执行] --> B[设置超时上下文]
B --> C[启动执行goroutine]
C --> D[主循环监听]
D --> E{收到事件?}
E --> |是| F[处理事件]
E --> |否| G{收到结果?}
F --> H{发生错误?}
H --> |是| I[记录错误]
H --> |否| J[继续处理]
G --> |是| K[处理结果]
G --> |否| L{连接关闭?}
I --> M[优雅关闭]
J --> D
K --> M
L --> |是| N[检查错误]
L --> |否| D
N --> O{有错误?}
O --> |是| P[返回错误]
O --> |否| Q[返回结果]
M --> R[清理资源]
P --> R
Q --> R
并发安全考虑 #
- 使用互斥锁保护共享状态
- 避免在监听器中阻塞
- 正确处理通道关闭
- 实施优雅关闭机制
故障排除指南 #
常见问题及解决方案 #
| 问题 | 症状 | 解决方案 |
|---|---|---|
| 事件丢失 | 部分事件未到达客户端 | 增加缓冲区大小或启用背压 |
| 内存泄漏 | 内存使用持续增长 | 检查通道关闭逻辑 |
| 连接超时 | 客户端连接中断 | 调整超时设置 |
| 性能下降 | 处理速度变慢 | 优化事件过滤逻辑 |
调试技巧 #
- 启用调试模式:使用
StreamModeDebug获取详细信息 - 监控指标:跟踪事件发送和接收统计
- 日志记录:添加适当的日志语句
- 压力测试:模拟高并发场景
性能监控 #
graph LR
A[事件计数器] --> B[丢弃事件统计]
C[执行时间监控] --> D[平均响应时间]
E[内存使用监控] --> F[峰值内存占用]
G[错误率统计] --> H[失败事件比例]
B --> I[性能报告]
D --> I
F --> I
H --> I
章节来源
- [examples/streaming_pipeline/main.go](https://github.com/smallnest/langgraphgo/blob/main/examples/streaming_pipeline/main.go#L76-L79)
总结 #
LangGraph Go 的流式输出系统提供了强大而灵活的实时数据处理能力。通过多种流模式、智能背压处理和完善的错误恢复机制,开发者可以构建高性能的实时应用程序。
主要特性回顾 #
- 多样化的流模式:从简单的节点输出到完整的状态更新
- 智能背压处理:防止系统过载和内存溢出
- 完善的错误处理:支持超时、重试和熔断机制
- 实时前端集成:通过 SSE 实现真正的实时通信
- 高度可配置:灵活的参数调整和监控选项
应用场景 #
- AI对话系统:实时显示生成过程
- 数据分析平台:流式处理大量数据
- 监控仪表板:实时显示系统状态
- 协作工具:实时同步编辑状态
通过合理使用这些功能,开发者可以创建响应迅速、用户体验优秀的现代应用程序。