可监听可运行实例 #

目录 #

  1. 简介
  2. 核心结构体架构
  3. CompileListenable 编译阶段
  4. Invoke 和 InvokeWithConfig 执行流程
  5. 监听器系统设计
  6. 回调处理器集成
  7. 性能优化特性
  8. 实际应用示例
  9. 总结

简介 #

ListenableRunnable 是 langgraphgo 框架中的核心组件,它扩展了基础的可运行实例(Runnable)功能,提供了完整的事件监听和回调处理能力。该组件作为监听体系的最终执行入口,在保持执行逻辑与监听解耦的同时,为开发者提供了强大的可观测性和可扩展性。

核心结构体架构 #

ListenableRunnable 结构体 #

ListenableRunnable 是一个轻量级的包装器,它将 ListenableMessageGraph 的监听能力封装到可执行实例中:

classDiagram
class ListenableRunnable {
+graph *ListenableMessageGraph
+listenableNodes map[string]*ListenableNode
+Invoke(ctx, initialState) (interface, error)
+InvokeWithConfig(ctx, initialState, config) (interface, error)
+GetGraph() *Exporter
}
class ListenableMessageGraph {
+MessageGraph *MessageGraph
+listenableNodes map[string]*ListenableNode
+CompileListenable() (*ListenableRunnable, error)
+AddNode(name, fn) *ListenableNode
+GetListenableNode(name) *ListenableNode
+AddGlobalListener(listener)
+RemoveGlobalListener(listener)
}
class ListenableNode {
+Node
+listeners []NodeListener
+mutex sync.RWMutex
+Execute(ctx, state) (interface, error)
+NotifyListeners(ctx, event, state, err)
+AddListener(listener) *ListenableNode
+RemoveListener(listener)
+GetListeners() []NodeListener
}
class NodeListener {
<<interface>>
+OnNodeEvent(ctx, event, nodeName, state, err)
}
ListenableRunnable --> ListenableMessageGraph : "包含"
ListenableMessageGraph --> ListenableNode : "管理多个"
ListenableNode --> NodeListener : "支持多个"

图表来源

ListenableMessageGraph 结构体 #

ListenableMessageGraph 扩展了基础的消息图功能,添加了监听节点的映射管理:

classDiagram
class ListenableMessageGraph {
+MessageGraph *MessageGraph
+listenableNodes map[string]*ListenableNode
+AddNode(name, fn) *ListenableNode
+GetListenableNode(name) *ListenableNode
+AddGlobalListener(listener)
+RemoveGlobalListener(listener)
}
class MessageGraph {
+nodes map[string]Node
+edges []Edge
+entryPoint string
+AddNode(name, fn)
+AddEdge(from, to)
+SetEntryPoint(name)
}
ListenableMessageGraph --|> MessageGraph : "继承"

图表来源

章节来源

CompileListenable 编译阶段 #

CompileListenable 方法是构建可监听可运行实例的关键步骤,它负责验证图的完整性并创建最终的执行实例:

编译过程详解 #

flowchart TD
Start([开始编译]) --> CheckEntryPoint{检查入口点}
CheckEntryPoint --> |未设置| Error[返回 ErrEntryPointNotSet]
CheckEntryPoint --> |已设置| CreateInstance[创建 ListenableRunnable 实例]
CreateInstance --> CopyNodes[复制 listenableNodes 映射]
CopyNodes --> SetGraphRef[设置 graph 引用]
SetGraphRef --> Return[返回实例]
Error --> End([结束])
Return --> End

图表来源

编译阶段的核心职责 #

  1. 入口点验证:确保图具有明确的起始节点
  2. 节点映射复制:将所有监听节点从图中复制到可运行实例
  3. 引用建立:建立可运行实例与底层图的关联关系

章节来源

Invoke 和 InvokeWithConfig 执行流程 #

Invoke 方法执行流程 #

Invoke 方法提供最简化的执行接口,内部委托给 InvokeWithConfig:

sequenceDiagram
participant Client as 客户端
participant Runnable as ListenableRunnable
participant Graph as ListenableMessageGraph
participant Node as ListenableNode
Client->>Runnable : Invoke(ctx, initialState)
Runnable->>Runnable : InvokeWithConfig(ctx, initialState, nil)
Runnable->>Graph : 获取入口节点
Graph-->>Runnable : 返回节点名称
loop 遍历节点链
Runnable->>Node : 查找对应节点
Node->>Node : Execute(ctx, state)
Node->>Node : NotifyListeners(Start, state)
Node->>Node : 执行函数
alt 执行成功
Node->>Node : NotifyListeners(Complete, result)
else 执行失败
Node->>Node : NotifyListeners(Error, err)
end
Node-->>Runnable : 返回结果
Runnable->>Runnable : 更新状态
Runnable->>Graph : 查找下一个节点
end
Runnable-->>Client : 返回最终结果

图表来源

InvokeWithConfig 方法执行流程 #

InvokeWithConfig 提供了更灵活的执行控制,支持配置参数和回调处理器:

flowchart TD
Start([开始执行]) --> CheckConfig{是否有配置?}
CheckConfig --> |是| SetContext[设置上下文配置]
CheckConfig --> |否| InitState[初始化状态]
SetContext --> InitState
InitState --> GetCurrentNode[获取当前节点]
GetCurrentNode --> Loop{是否到达END?}
Loop --> |是| Return[返回最终状态]
Loop --> |否| FindNode[查找监听节点]
FindNode --> NodeExists{节点存在?}
NodeExists --> |否| Error[返回 ErrNodeNotFound]
NodeExists --> |是| ExecuteNode[执行节点]
ExecuteNode --> UpdateState[更新状态]
UpdateState --> FindNext[查找下一个节点]
FindNext --> HasNext{有下一个节点?}
HasNext --> |否| Error
HasNext --> |是| CheckCallbacks{有回调处理器?}
CheckCallbacks --> |是| NotifyCallbacks[通知回调处理器]
CheckCallbacks --> |否| SetCurrentNode[设置当前节点]
NotifyCallbacks --> SetCurrentNode
SetCurrentNode --> Loop
Error --> End([结束])
Return --> End

图表来源

执行流程的关键特性 #

  1. 状态管理:支持自定义状态模式和默认状态替换
  2. 错误处理:完整的错误传播机制
  3. 状态清理:支持临时状态的自动清理
  4. 回调集成:无缝集成回调处理器

章节来源

监听器系统设计 #

ListenableNode 的监听机制 #

ListenableNode 通过 NotifyListeners 方法实现了异步事件通知系统:

sequenceDiagram
participant Node as ListenableNode
participant Mutex as 读写锁
participant WG as WaitGroup
participant Listener as NodeListener
participant Recovery as Panic恢复
Node->>Mutex : RLock() 获取读锁
Node->>Node : 复制监听器列表
Node->>Mutex : RUnlock() 释放读锁
Node->>WG : 创建 WaitGroup
loop 遍历监听器
Node->>WG : Add(1)
Node->>Recovery : 启动goroutine
Recovery->>Recovery : defer recover()
Recovery->>Listener : OnNodeEvent(ctx, event, name, state, err)
alt 监听器发生panic
Recovery->>Recovery : 捕获并忽略panic
else 正常执行
Recovery->>Recovery : 正常完成
end
Recovery->>WG : Done()
end
Node->>WG : Wait() 等待所有监听器完成

图表来源

监听器生命周期管理 #

stateDiagram-v2
[*] --> Created : NewListenableNode()
Created --> Listening : AddListener()
Listening --> Listening : 多次AddListener()
Listening --> Notifying : Execute()
Notifying --> Listening : Execute完成
Listening --> Removed : RemoveListener()
Removed --> [*] : 节点销毁
Notifying --> Recovered : 监听器panic
Recovered --> Listening : 继续执行

图表来源

监听器系统的核心优势 #

  1. 并发安全:使用读写锁保护监听器列表
  2. 异步通知:监听器通知在独立goroutine中执行
  3. 异常隔离:监听器panic不会影响主执行流程
  4. 批量操作:支持全局监听器添加和移除

章节来源

回调处理器集成 #

GraphCallbackHandler 接口 #

GraphCallbackHandler 扩展了标准的 CallbackHandler,专门用于图级别的步骤通知:

classDiagram
class CallbackHandler {
<<interface>>
+OnChainStart(ctx, serialized, inputs, runID, parentRunID, tags, metadata)
+OnChainEnd(ctx, outputs, runID)
+OnChainError(ctx, err, runID)
+OnLLMStart(ctx, serialized, prompts, runID, parentRunID, tags, metadata)
+OnLLMEnd(ctx, response, runID)
+OnLLMError(ctx, err, runID)
+OnToolStart(ctx, serialized, inputStr, runID, parentRunID, tags, metadata)
+OnToolEnd(ctx, output, runID)
+OnToolError(ctx, err, runID)
+OnRetrieverStart(ctx, serialized, query, runID, parentRunID, tags, metadata)
+OnRetrieverEnd(ctx, documents, runID)
+OnRetrieverError(ctx, err, runID)
}
class GraphCallbackHandler {
<<interface>>
+OnGraphStep(ctx, stepNode, state)
}
class Config {
+Callbacks []CallbackHandler
+Metadata map[string]interface
+Tags []string
+Configurable map[string]interface
+RunName string
+Timeout *time.Duration
+InterruptBefore []string
+InterruptAfter []string
+ResumeFrom []string
+ResumeValue interface
}
GraphCallbackHandler --|> CallbackHandler : "继承"
Config --> CallbackHandler : "包含多个"

图表来源

回调处理器的类型断言逻辑 #

在 InvokeWithConfig 方法中,回调处理器的集成采用了类型断言机制:

flowchart TD
Start([开始回调处理]) --> CheckConfig{配置不为空且有回调?}
CheckConfig --> |否| Skip[跳过回调处理]
CheckConfig --> |是| IterateCallbacks[遍历回调处理器]
IterateCallbacks --> TypeAssert{类型断言为 GraphCallbackHandler?}
TypeAssert --> |否| NextCallback[下一个回调]
TypeAssert --> |是| FormatNodeName[格式化节点名称]
FormatNodeName --> CallCallback[调用 OnGraphStep]
CallCallback --> NextCallback
NextCallback --> MoreCallbacks{还有回调?}
MoreCallbacks --> |是| IterateCallbacks
MoreCallbacks --> |否| End([结束])
Skip --> End

图表来源

回调处理器的集成优势 #

  1. 类型安全:通过类型断言确保只调用兼容的方法
  2. 格式化适配:自动将节点名称格式化为数组形式
  3. 条件执行:只有兼容的处理器才会被调用
  4. 扩展性:支持多种类型的回调处理器共存

章节来源

性能优化特性 #

并发执行优化 #

ListenableRunnable 在执行过程中采用了多项性能优化策略:

flowchart TD
Start([开始执行]) --> ParallelExecution[并行节点执行]
ParallelExecution --> AsyncNotifications[异步监听器通知]
AsyncNotifications --> WaitGroupSync[WaitGroup同步]
WaitGroupSync --> StateManagement[状态管理优化]
StateManagement --> MemoryEfficient[内存高效处理]
MemoryEfficient --> End([执行完成])

性能优化技术 #

  1. 异步监听器通知:监听器通知在独立goroutine中执行,避免阻塞主流程
  2. WaitGroup 同步:确保所有监听器完成后再继续执行
  3. 读写锁保护:最小化锁竞争,提高并发性能
  4. panic 恢复:防止监听器异常影响整体执行

内存管理优化 #

  1. 监听器列表复制:在通知前复制监听器列表,避免迭代时修改
  2. 状态传递优化:支持自定义状态模式,减少不必要的状态拷贝
  3. 资源清理:支持临时状态的自动清理机制

章节来源

实际应用示例 #

基础使用示例 #

以下展示了 ListenableRunnable 的典型使用模式:

sequenceDiagram
participant App as 应用程序
participant Graph as ListenableMessageGraph
participant Runnable as ListenableRunnable
participant Listener as 监听器
App->>Graph : NewListenableMessageGraph()
App->>Graph : AddNode("process", function)
App->>Graph : AddNode("analyze", function)
App->>Graph : SetEntryPoint("process")
App->>Graph : AddEdge("process", "analyze")
App->>Graph : AddEdge("analyze", END)
App->>Graph : CompileListenable()
Graph-->>App : 返回 ListenableRunnable
App->>Runnable : AddListener(listener)
App->>Runnable : Invoke(ctx, initialState)
Runnable->>Listener : 通知开始事件
Runnable->>Runnable : 执行节点函数
Runnable->>Listener : 通知完成事件
Runnable-->>App : 返回执行结果

图表来源

多监听器组合使用 #

在实际应用中,可以同时使用多种类型的监听器:

  1. 进度监听器:跟踪执行进度和状态
  2. 指标监听器:收集性能指标和统计数据
  3. 聊天监听器:提供用户友好的执行反馈
  4. 日志监听器:记录详细的执行日志

流式执行集成 #

ListenableRunnable 还可以与流式执行系统无缝集成:

classDiagram
class StreamingRunnable {
+runnable *ListenableRunnable
+config StreamConfig
+Stream(ctx, initialState) *StreamResult
}
class ListenableRunnable {
+InvokeWithConfig(ctx, initialState, config) (interface, error)
}
class StreamingListener {
+OnNodeEvent(ctx, event, nodeName, state, err)
+Close()
+GetDroppedEventsCount() int
}
StreamingRunnable --> ListenableRunnable : "包装"
StreamingRunnable --> StreamingListener : "使用"
ListenableRunnable --> StreamingListener : "添加监听器"

图表来源

章节来源

总结 #

ListenableRunnable 作为 langgraphgo 框架的核心组件,成功地将监听能力和可执行性结合在一起。它的设计体现了以下几个关键原则:

设计优势 #

  1. 关注点分离:执行逻辑与监听机制完全解耦
  2. 异步非阻塞:监听器通知采用异步模式,不影响主执行流程
  3. 异常安全:完善的panic恢复机制,确保系统稳定性
  4. 高度可扩展:支持多种类型的监听器和回调处理器
  5. 性能优化:并发执行和内存管理优化

架构价值 #

ListenableRunnable 不仅是一个简单的包装器,更是整个监听体系的最终执行入口。它通过精心设计的接口和执行流程,为开发者提供了一个强大而灵活的可观测性平台,同时保持了良好的性能特征和易用性。

这种设计使得开发者可以在不修改业务逻辑的情况下,轻松地添加监控、日志、指标收集等功能,真正实现了横切关注点的优雅分离。