监听器 #

目录 #

  1. 简介
  2. 核心概念
  3. NodeEvent 枚举
  4. NodeListener 接口
  5. 内置监听器
  6. 可监听节点设计
  7. 事件驱动架构
  8. 流式监听器
  9. 最佳实践
  10. 故障排除

简介 #

LangGraphGo 的监听器(Listeners)系统是一个强大的事件驱动架构,允许外部系统实时感知图执行的生命周期。通过监听器模式,开发者可以轻松实现流式输出、性能监控、审计日志和调试工具等功能。

监听器系统的核心价值在于:

核心概念 #

事件驱动架构 #

监听器系统基于事件驱动架构设计,主要包含以下组件:

graph TB
subgraph "图执行引擎"
GE[图执行器]
LN[可监听节点]
LMG[可监听消息图]
end
subgraph "监听器系统"
NL[NodeListener接口]
SL[StreamingListener]
BL[内置监听器]
end
subgraph "事件类型"
NE[NodeEvent]
SE[StreamEvent]
CE[CallbackEvent]
end
GE --> LN
LN --> NL
LMG --> LN
NL --> BL
NL --> SL
NE --> SE
SE --> CE

图表来源

监听器层次结构 #

classDiagram
class NodeListener {
<<interface>>
+OnNodeEvent(ctx, event, nodeName, state, err)
}
class NodeListenerFunc {
+OnNodeEvent(ctx, event, nodeName, state, err)
}
class ListenableNode {
+Node
+listeners []NodeListener
+mutex sync.RWMutex
+Execute(ctx, state) (interface, error)
+NotifyListeners(ctx, event, state, err)
+AddListener(listener)
+RemoveListener(listener)
}
class ListenableMessageGraph {
+MessageGraph
+listenableNodes map[string]*ListenableNode
+AddNode(name, fn) *ListenableNode
+AddGlobalListener(listener)
+RemoveGlobalListener(listener)
}
class ProgressListener {
+writer io.Writer
+nodeSteps map[string]string
+showTiming bool
+showDetails bool
+OnNodeEvent(ctx, event, nodeName, state, err)
}
class LoggingListener {
+logger *log.Logger
+logLevel LogLevel
+includeState bool
+OnNodeEvent(ctx, event, nodeName, state, err)
}
class MetricsListener {
+nodeExecutions map[string]int
+nodeDurations map[string][]time.Duration
+GetNodeExecutions() map[string]int
+GetNodeAverageDuration() map[string]time.Duration
}
NodeListener <|-- NodeListenerFunc
NodeListener <|-- ProgressListener
NodeListener <|-- LoggingListener
NodeListener <|-- MetricsListener
ListenableNode --> NodeListener
ListenableMessageGraph --> ListenableNode

图表来源

NodeEvent 枚举 #

NodeEvent 定义了图执行过程中可能发生的各种事件类型,为监听器提供了精确的事件识别能力。

事件类型详解 #

事件类型 描述 触发时机
NodeEventStart 节点开始执行 节点函数被调用时
NodeEventProgress 节点执行进度 节点执行过程中的中间状态
NodeEventComplete 节点成功完成 节点函数正常返回结果时
NodeEventError 节点执行出错 节点函数返回错误时
EventChainStart 图执行开始 整个图的执行流程启动时
EventChainEnd 图执行结束 整个图的执行流程完成时
EventToolStart 工具执行开始 调用外部工具或函数时
EventToolEnd 工具执行结束 外部工具或函数返回结果时
EventLLMStart LLM调用开始 调用语言模型服务时
EventLLMEnd LLM调用结束 语言模型服务返回响应时
EventToken 生成令牌 流式输出过程中的单个令牌
EventCustom 自定义事件 用户定义的特殊事件

节来源

事件生命周期 #

sequenceDiagram
participant GE as 图执行器
participant LN as 可监听节点
participant ML as 监听器列表
participant L1 as 监听器1
participant L2 as 监听器2
GE->>LN : 开始执行节点
LN->>ML : NotifyListeners(NodeEventStart)
ML->>L1 : OnNodeEvent(start)
ML->>L2 : OnNodeEvent(start)
Note over LN : 执行节点函数...
alt 正常完成
LN->>ML : NotifyListeners(NodeEventComplete)
ML->>L1 : OnNodeEvent(complete)
ML->>L2 : OnNodeEvent(complete)
else 发生错误
LN->>ML : NotifyListeners(NodeEventError)
ML->>L1 : OnNodeEvent(error)
ML->>L2 : OnNodeEvent(error)
end

图表来源

NodeListener 接口 #

NodeListener 是监听器系统的核心接口,定义了监听器必须实现的方法。

接口定义 #

NodeListener 接口简洁而强大,只包含一个方法:

type NodeListener interface {
    OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error)
}

函数适配器 #

为了方便使用,框架提供了 NodeListenerFunc 类型作为函数适配器:

type NodeListenerFunc func(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error)

func (f NodeListenerFunc) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error) {
    f(ctx, event, nodeName, state, err)
}

节来源

并发安全的通知机制 #

NotifyListeners 方法实现了安全的并发通知机制:

flowchart TD
Start([开始通知]) --> LockRead[读取监听器列表]
LockRead --> CopyListeners[复制监听器副本]
LockRead --> UnlockRead[释放读锁]
CopyListeners --> CreateWG[创建WaitGroup]
CopyListeners --> LoopStart{遍历监听器}
LoopStart --> AddToWG[添加到WaitGroup]
AddToWG --> SpawnGoroutine[启动goroutine]
SpawnGoroutine --> RecoverPanic[恢复panic]
RecoverPanic --> CallListener[调用监听器]
CallListener --> DoneGoroutine[标记goroutine完成]
DoneGoroutine --> CheckMore{还有更多监听器?}
CheckMore --> |是| LoopStart
CheckMore --> |否| WaitAll[等待所有goroutine完成]
WaitAll --> End([结束])

图表来源

内置监听器 #

框架提供了多种内置监听器,满足常见的监控和调试需求。

ProgressListener(进度监听器) #

ProgressListener 提供可视化的进度跟踪功能:

特性 #

使用场景 #

节来源

LoggingListener(日志监听器) #

LoggingListener 提供结构化的日志记录功能:

特性 #

日志级别映射 #

NodeEvent LogLevel 前缀
NodeEventStart Info START
NodeEventComplete Info COMPLETE
NodeEventProgress Debug PROGRESS
NodeEventError Error ERROR

节来源

MetricsListener(指标监听器) #

MetricsListener 专注于性能和执行统计:

收集的指标 #

数据结构 #

erDiagram
MetricsListener {
map nodeExecutions
map nodeDurations
map nodeErrors
map startTimes
int totalExecutions
}
nodeExecutions {
string nodeName
int executionCount
}
nodeDurations {
string nodeName
[]time.Duration durations
}
nodeErrors {
string nodeName
int errorCount
}
MetricsListener ||--o{ nodeExecutions : tracks
MetricsListener ||--o{ nodeDurations : measures
MetricsListener ||--o{ nodeErrors : counts

图表来源

ChatListener(聊天监听器) #

ChatListener 提供类似聊天界面的实时更新:

特性 #

节来源

可监听节点设计 #

ListenableNode 结构 #

ListenableNode 通过组合模式将普通节点包装成可监听的实体:

classDiagram
class Node {
+string Name
+NodeFunction Function
}
class ListenableNode {
+Node
+[]NodeListener listeners
+sync.RWMutex mutex
+Execute(ctx, state) (interface, error)
+NotifyListeners(ctx, event, state, err)
+AddListener(listener)
+RemoveListener(listener)
+GetListeners() []NodeListener
}
Node <|-- ListenableNode : 组合

图表来源

创建可监听节点 #

// 创建普通节点
node := Node{
    Name: "my_node",
    Function: func(ctx context.Context, state interface{}) (interface{}, error) {
        return result, nil
    },
}

// 包装成可监听节点
listenableNode := NewListenableNode(node)

节来源

全局监听器管理 #

ListenableMessageGraph 提供全局监听器管理功能:

flowchart TD
AddGlobal[添加全局监听器] --> IterateNodes[遍历所有节点]
IterateNodes --> AddToListeners[添加到每个节点的监听器列表]
AddToListeners --> UpdateExisting[更新现有节点]
RemoveGlobal[移除全局监听器] --> IterateNodes2[遍历所有节点]
IterateNodes2 --> RemoveFromListeners[从每个节点移除监听器]
RemoveFromListeners --> UpdateExisting2[更新现有节点]

图表来源

事件驱动架构 #

回调处理器接口 #

框架提供了更高级的回调处理接口:

classDiagram
class CallbackHandler {
<<interface>>
+OnChainStart(ctx, serialized, inputs, runID, parentRunID, tags, metadata)
+OnChainEnd(ctx, outputs, runID)
+OnChainError(ctx, err, runID)
+OnLLMStart(ctx, serialized, prompts, runID, parentRunID, tags, metadata)
+OnLLMEnd(ctx, response, runID)
+OnLLMError(ctx, err, runID)
+OnToolStart(ctx, serialized, inputStr, runID, parentRunID, tags, metadata)
+OnToolEnd(ctx, output, runID)
+OnToolError(ctx, err, runID)
+OnRetrieverStart(ctx, serialized, query, runID, parentRunID, tags, metadata)
+OnRetrieverEnd(ctx, documents, runID)
+OnRetrieverError(ctx, err, runID)
}
class GraphCallbackHandler {
<<interface>>
+CallbackHandler
+OnGraphStep(ctx, stepNode, state)
}
class Config {
+[]CallbackHandler Callbacks
+map[string]interface Metadata
+[]string Tags
+map[string]interface Configurable
+string RunName
+*time.Duration Timeout
+[]string InterruptBefore
+[]string InterruptAfter
+[]string ResumeFrom
+interface ResumeValue
}
CallbackHandler <|-- GraphCallbackHandler
Config --> CallbackHandler : contains

图表来源

配置传递机制 #

sequenceDiagram
participant Client as 客户端
participant Config as Config
participant Runnable as ListenableRunnable
participant Listener as CallbackHandler
Client->>Config : 创建配置
Config->>Config : 设置回调处理器
Client->>Runnable : InvokeWithConfig(ctx, state, config)
Runnable->>Runnable : 启动图执行
loop 每个步骤
Runnable->>Listener : OnGraphStep(ctx, stepNode, state)
Listener-->>Runnable : 处理回调
end
Runnable->>Listener : OnChainEnd(ctx, outputs, runID)
Listener-->>Runnable : 完成回调

图表来源

流式监听器 #

StreamingListener 设计 #

StreamingListener 将监听器事件转换为流式数据:

classDiagram
class StreamingListener {
+chan~StreamEvent~ eventChan
+StreamConfig config
+sync.RWMutex mutex
+int droppedEvents
+bool closed
+OnNodeEvent(ctx, event, nodeName, state, err)
+emitEvent(event)
+shouldEmit(event) bool
+handleBackpressure()
+Close()
+GetDroppedEventsCount() int
}
class StreamConfig {
+int BufferSize
+bool EnableBackpressure
+int MaxDroppedEvents
+StreamMode Mode
}
class StreamMode {
<<enumeration>>
StreamModeValues
StreamModeUpdates
StreamModeMessages
StreamModeDebug
}
StreamingListener --> StreamConfig : uses
StreamConfig --> StreamMode : contains

图表来源

流式执行流程 #

sequenceDiagram
participant Client as 客户端
participant SR as StreamingRunnable
participant SM as StreamingMessageGraph
participant SL as StreamingListener
participant GR as GraphRunner
Client->>SR : Stream(ctx, initialState)
SR->>SM : 创建事件通道
SR->>SL : 创建StreamingListener
SR->>GR : 添加监听器到所有节点
par 并行执行
GR->>SL : 发送事件到channel
SL->>SL : 应用过滤规则
SL->>SL : 处理背压
and
Client->>SR : 读取StreamResult
SR-->>Client : 返回Events通道
end
GR->>SL : 关闭监听器
SL->>GR : 清理资源
GR->>SR : 发送最终结果
SR-->>Client : 返回结果

图表来源

背压处理机制 #

flowchart TD
SendEvent[发送事件] --> CheckClosed{监听器已关闭?}
CheckClosed --> |是| Discard[丢弃事件]
CheckClosed --> |否| CheckChannel{通道已满?}
CheckChannel --> |否| SendDirectly[直接发送]
CheckChannel --> |是| CheckBackpressure{启用背压?}
CheckBackpressure --> |否| Discard
CheckBackpressure --> |是| HandleBackpressure[处理背压]
HandleBackpressure --> IncrementCounter[增加丢弃计数]
IncrementCounter --> Discard
SendDirectly --> Success[发送成功]

图表来源

最佳实践 #

1. 监听器选择指南 #

场景 推荐监听器 配置要点
生产环境监控 MetricsListener 启用持久化存储
开发调试 LoggingListener 设置Debug级别
用户界面 ChatListener 自定义友好消息
性能分析 MetricsListener + LoggingListener 双重记录
审计日志 LoggingListener 结构化JSON格式

2. 并发安全注意事项 #

3. 性能优化建议 #

// 推荐:使用无操作监听器进行基准测试
type NoOpListener struct{}

func (l *NoOpListener) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error) {
    // 无操作实现
}

// 推荐:批量处理事件而非逐个处理
type BatchListener struct {
    events []StreamEvent
    mutex  sync.Mutex
}

func (l *BatchListener) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error) {
    l.mutex.Lock()
    defer l.mutex.Unlock()
    l.events = append(l.events, StreamEvent{
        Timestamp: time.Now(),
        NodeName:  nodeName,
        Event:     event,
        State:     state,
        Error:     err,
    })
}

4. 自定义监听器开发 #

type CustomListener struct {
    // 自定义字段
    config map[string]interface{}
}

func NewCustomListener(config map[string]interface{}) *CustomListener {
    return &CustomListener{
        config: config,
    }
}

func (l *CustomListener) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error) {
    // 实现自定义逻辑
    switch event {
    case NodeEventStart:
        // 处理开始事件
    case NodeEventComplete:
        // 处理完成事件
    case NodeEventError:
        // 处理错误事件
    }
}

故障排除 #

常见问题及解决方案 #

1. 监听器未被调用 #

症状:添加的监听器没有收到任何事件

原因分析

解决方案

// 检查监听器是否正确添加
listeners := node.GetListeners()
fmt.Printf("当前监听器数量: %d\n", len(listeners))

// 确保监听器在执行前添加
node.AddListener(myListener)

2. 性能问题 #

症状:图执行变慢,怀疑监听器影响性能

诊断步骤

// 使用性能分析器
import "net/http"
import _ "net/http/pprof"

func main() {
    go http.ListenAndServe("localhost:6060", nil)
    
    // 运行带有监听器的图
    // 访问 http://localhost:6060/debug/pprof/
}

优化策略

3. 内存泄漏 #

症状:长时间运行后内存使用持续增长

排查方法

// 监控监听器数量
func monitorListeners() {
    ticker := time.NewTicker(1 * time.Minute)
    for range ticker.C {
        fmt.Printf("活跃监听器: %d\n", getCurrentListenerCount())
    }
}

预防措施

调试技巧 #

1. 事件追踪 #

type TracingListener struct {
    tracer *trace.Tracer
}

func (l *TracingListener) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error) {
    l.tracer.Record(nodeName, string(event), err)
}

2. 性能监控 #

type PerformanceListener struct {
    metrics *prometheus.HistogramVec
}

func (l *PerformanceListener) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error) {
    if event == NodeEventComplete {
        duration := time.Since(startTime)
        l.metrics.WithLabelValues(nodeName).Observe(duration.Seconds())
    }
}

节来源

通过遵循这些最佳实践和故障排除指南,开发者可以充分利用 LangGraphGo 监听器系统的强大功能,构建高效、可维护的图执行监控和调试解决方案。