流式传输 (Streaming)
在现代 AI 应用中,用户体验至关重要。用户不希望盯着一个旋转的加载图标等待几秒钟甚至更长时间,直到整个回复生成完毕。流式传输允许应用实时展示中间结果,极大地提升了响应速度和交互感。
1. 流式模式
背景与功能
LangGraphGo 的图执行本质上是分步的。每个节点执行完毕后,状态会更新。流式模式允许订阅者实时获取这些状态变化,而不是等待整个图执行结束。
这对于调试、进度条显示以及构建复杂的交互式 UI 非常有用。
实现原理
Stream 方法返回一个 Go Channel。在图执行的每个步骤(Superstep)结束时,运行时会将当前的状态快照或状态增量发送到这个 Channel 中。
代码展示
// 启动流式执行
// input: 初始输入
// config: 运行配置
stream, err := runnable.Stream(ctx, input)
if err != nil {
panic(err)
}
// 消费流
for chunk := range stream {
// chunk 是一个 map,键是节点名称,值是该节点的输出
for nodeName, output := range chunk {
fmt.Printf("Node [%s] 完成,输出: %v\n", nodeName, output)
}
}
2. 流式传输 LLM Token
背景与功能
这是最常见的流式需求:像打字机一样逐字显示 LLM 生成的内容。这不同于上面的节点级流式,这是节点内部的细粒度流式。
实现原理
这依赖于底层 LLM 驱动(如 langchaingo)的回调机制。我们需要在调用 LLM 时注册一个回调函数,每当 LLM 生成一个新的 Token,就调用该函数。在
LangGraphGo 中,我们通常将这个回调函数通过 Context 或 Config 传递给节点。
代码展示
// 1. 定义节点,支持流式回调
func chatNode(ctx context.Context, state interface{}) (interface{}, error) {
messages := state.([]llms.MessageContent)
model, _ := openai.New()
// 定义流式回调函数
streamingFunc := func(ctx context.Context, chunk []byte) error {
// 这里可以将 chunk 发送到前端,或者打印到控制台
fmt.Print(string(chunk))
return nil
}
// 调用 LLM 时传入 WithStreamingFunc
response, err := model.GenerateContent(ctx, messages, llms.WithStreamingFunc(streamingFunc))
if err != nil {
return nil, err
}
return append(messages, llms.TextParts("ai", response.Choices[0].Content)), nil
}
// ... 在主程序中 ...
fmt.Println("AI 回复:")
runnable.Invoke(ctx, input) // 此时控制台会实时打印 Token
fmt.Println() // 换行
在 Web 服务中的应用
在构建 HTTP API 时,你可以结合 Go 的 http.Flusher 或 Server-Sent Events (SSE) 来将这些 Token 实时推送到浏览器。