langgraphgo API 参考文档 #
本文档中引用的文件
目录 #
简介 #
langgraphgo 是一个强大的 Go 语言库,用于构建基于图的 AI 应用程序。它提供了灵活的状态管理、并发执行、检查点持久化和流式处理功能。本文档详细介绍了所有公共 API 接口。
核心图类型 #
图的基本概念 #
langgraphgo 提供两种主要的图类型:状态图(StateGraph)和消息图(MessageGraph)。这两种图都支持并行执行、条件边和中断机制。
classDiagram
class StateGraph {
+map~string,Node~ nodes
+[]Edge edges
+map~string,func~ conditionalEdges
+string entryPoint
+RetryPolicy retryPolicy
+StateMerger stateMerger
+StateSchema Schema
+AddNode(name, fn) void
+AddEdge(from, to) void
+AddConditionalEdge(from, condition) void
+SetEntryPoint(name) void
+SetRetryPolicy(policy) void
+SetStateMerger(merger) void
+SetSchema(schema) void
+Compile() StateRunnable
}
class MessageGraph {
+map~string,Node~ nodes
+[]Edge edges
+map~string,func~ conditionalEdges
+string entryPoint
+StateMerger stateMerger
+StateSchema Schema
+AddNode(name, fn) void
+AddEdge(from, to) void
+AddConditionalEdge(from, condition) void
+SetEntryPoint(name) void
+SetStateMerger(merger) void
+SetSchema(schema) void
+Compile() Runnable
}
class Node {
+string Name
+func Function
}
class Edge {
+string From
+string To
}
StateGraph --> Node : "包含"
StateGraph --> Edge : "包含"
MessageGraph --> Node : "包含"
MessageGraph --> Edge : "包含"
图表来源
- [graph/state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L11-L32)
- [graph/graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L52-L60)
状态图 API #
StateGraph 构造函数 #
NewStateGraph() #
创建一个新的状态图实例。
签名: func NewStateGraph() *StateGraph
返回值: 新的状态图实例
使用示例:
workflow := graph.NewStateGraph()
NewMessagesStateGraph() #
创建一个带有默认消息模式的状态图,推荐用于基于聊天的代理。
签名: func NewMessagesStateGraph() *StateGraph
返回值: 配置好的状态图实例
使用示例:
workflow := graph.NewMessagesStateGraph()
核心方法 #
AddNode(name string, fn func) void #
向状态图添加新节点。
参数:
name: 节点名称,必须唯一fn: 节点函数,接收上下文和状态,返回更新后的状态和错误
签名: func (g *StateGraph) AddNode(name string, fn func(ctx context.Context, state interface{}) (interface{}, error))
使用示例:
workflow.AddNode("agent", func(ctx context.Context, state interface{}) (interface{}, error) {
// 处理逻辑
return newState, nil
})
AddEdge(from string, to string) void #
添加静态边连接两个节点。
参数:
from: 源节点名称to: 目标节点名称
签名: func (g *StateGraph) AddEdge(from, to string)
使用示例:
workflow.AddEdge("agent", "tools")
AddConditionalEdge(from string, condition func) void #
添加条件边,在运行时根据条件决定目标节点。
参数:
from: 源节点名称condition: 条件函数,接收上下文和状态,返回下一个节点名称
签名: func (g *StateGraph) AddConditionalEdge(from string, condition func(ctx context.Context, state interface{}) string)
使用示例:
workflow.AddConditionalEdge("agent", func(ctx context.Context, state interface{}) string {
// 基于状态判断下一个节点
return "tools"
})
SetEntryPoint(name string) void #
设置入口节点。
参数:
name: 入口节点名称
签名: func (g *StateGraph) SetEntryPoint(name string)
使用示例:
workflow.SetEntryPoint("agent")
SetRetryPolicy(policy *RetryPolicy) void #
设置重试策略。
参数:
policy: 重试策略配置
签名: func (g *StateGraph) SetRetryPolicy(policy *RetryPolicy)
使用示例:
retryPolicy := &graph.RetryPolicy{
MaxRetries: 3,
BackoffStrategy: graph.ExponentialBackoff,
RetryableErrors: []string{"timeout", "network"},
}
workflow.SetRetryPolicy(retryPolicy)
SetStateMerger(merger StateMerger) void #
设置状态合并器。
参数:
merger: 状态合并函数
签名: func (g *StateGraph) SetStateMerger(merger StateMerger)
SetSchema(schema StateSchema) void #
设置状态模式。
参数:
schema: 状态模式接口
签名: func (g *StateGraph) SetSchema(schema StateSchema)
编译与执行 #
Compile() (*StateRunnable, error) #
编译状态图并返回可执行实例。
返回值:
*StateRunnable: 编译后的可执行图error: 错误信息(如果入口点未设置)
签名: func (g *StateGraph) Compile() (*StateRunnable, error)
使用示例:
runnable, err := workflow.Compile()
if err != nil {
log.Fatal(err)
}
Invoke(ctx context.Context, initialState interface{}) (interface{}, error) #
执行编译后的状态图。
参数:
ctx: 上下文initialState: 初始状态
返回值:
interface{}: 最终状态error: 执行错误
签名: func (r *StateRunnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)
InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error) #
带配置执行状态图。
参数:
ctx: 上下文initialState: 初始状态config: 配置选项
签名: func (r *StateRunnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)
图表来源
- [graph/state_graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/state_graph.go#L58-L113)
消息图 API #
MessageGraph 构造函数 #
NewMessageGraph() #
创建新的消息图实例。
签名: func NewMessageGraph() *MessageGraph
返回值: 新的消息图实例
使用示例:
workflow := graph.NewMessageGraph()
核心方法 #
消息图的方法与状态图基本相同,但主要用于消息传递模式:
// 添加节点
workflow.AddNode("process", processFunction)
// 设置入口
workflow.SetEntryPoint("process")
// 添加边
workflow.AddEdge("process", "END")
// 编译
runnable, err := workflow.Compile()
编译与执行 #
Compile() (*Runnable, error) #
编译消息图。
返回值:
*Runnable: 编译后的可执行图error: 错误信息
签名: func (g *MessageGraph) Compile() (*Runnable, error)
Invoke(ctx context.Context, initialState interface{}) (interface{}, error) #
执行消息图。
签名: func (r *Runnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)
InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error) #
带配置执行消息图。
签名: func (r *Runnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)
图表来源
- [graph/graph.go](https://github.com/smallnest/langgraphgo/blob/main/graph/graph.go#L95-L148)
预构建组件 #
CreateAgent() #
创建智能代理,支持工具调用和对话管理。
签名: func CreateAgent(model llms.Model, inputTools []tools.Tool, opts ...CreateAgentOption) (*graph.StateRunnable, error)
参数:
model: LLM 模型实例inputTools: 工具列表opts: 可选配置选项
返回值:
*graph.StateRunnable: 编译后的代理图error: 错误信息
配置选项:
WithSystemMessage(message string): 设置系统消息WithStateModifier(modifier func): 设置状态修改器WithCheckpointer(checkpointer): 设置检查点存储
使用示例:
agent, err := prebuilt.CreateAgent(
model,
[]tools.Tool{calculator, webSearch},
prebuilt.WithSystemMessage("你是一个有用的助手"),
prebuilt.WithStateModifier(customModifier),
)
CreateReactAgent() #
创建 ReAct 格式的代理,遵循推理-行动-观察循环。
签名: func CreateReactAgent(model llms.Model, inputTools []tools.Tool) (*graph.StateRunnable, error)
参数:
model: LLM 模型inputTools: 工具列表
返回值:
*graph.StateRunnable: 编译后的 ReAct 代理
使用示例:
reactAgent, err := prebuilt.CreateReactAgent(model, tools)
CreateSupervisor() #
创建监督者代理,协调多个子代理。
签名: func CreateSupervisor(model llms.Model, members map[string]*graph.StateRunnable) (*graph.StateRunnable, error)
参数:
model: 监督者模型members: 子代理映射
返回值:
*graph.StateRunnable: 编译后的监督者图
使用示例:
supervisor, err := prebuilt.CreateSupervisor(model, map[string]*graph.StateRunnable{
"researcher": researcherAgent,
"writer": writerAgent,
})
RAG 组件 #
RAGPipeline #
检索增强生成(RAG)管道组件。
签名: func NewRAGPipeline(config *RAGConfig) *RAGPipeline
配置选项:
TopK int: 检索的文档数量ScoreThreshold float64: 相关性阈值UseReranking bool: 是否使用重排序UseFallback bool: 是否使用回退搜索SystemPrompt string: 系统提示IncludeCitations bool: 是否包含引用
方法:
BuildBasicRAG() error: 构建基础 RAG 流水线BuildAdvancedRAG() error: 构建高级 RAG 流水线BuildConditionalRAG() error: 构建条件 RAG 流水线Compile() (*graph.Runnable, error): 编译流水线
使用示例:
config := &prebuilt.RAGConfig{
TopK: 4,
SystemPrompt: "回答问题时要准确且有帮助",
}
pipeline := prebuilt.NewRAGPipeline(config)
err := pipeline.BuildBasicRAG()
if err != nil {
log.Fatal(err)
}
runnable, err := pipeline.Compile()
图表来源
- [prebuilt/create_agent.go](https://github.com/smallnest/langgraphgo/blob/main/prebuilt/create_agent.go#L46-L251)
- [prebuilt/react_agent.go](https://github.com/smallnest/langgraphgo/blob/main/prebuilt/react_agent.go#L14-L181)
- [prebuilt/supervisor.go](https://github.com/smallnest/langgraphgo/blob/main/prebuilt/supervisor.go#L14-L152)
- [prebuilt/rag.go](https://github.com/smallnest/langgraphgo/blob/main/prebuilt/rag.go#L113-L248)
检查点系统 #
CheckpointStore 接口 #
定义检查点存储的标准接口。
接口方法:
Save(ctx context.Context, checkpoint *Checkpoint) error: 保存检查点Load(ctx context.Context, checkpointID string) (*Checkpoint, error): 加载检查点List(ctx context.Context, executionID string) ([]*Checkpoint, error): 列出所有检查点Delete(ctx context.Context, checkpointID string) error: 删除检查点Clear(ctx context.Context, executionID string) error: 清除执行的所有检查点
内存检查点存储 #
MemoryCheckpointStore #
基于内存的检查点存储实现。
签名: func NewMemoryCheckpointStore() *MemoryCheckpointStore
方法:
Save(ctx context.Context, checkpoint *Checkpoint) errorLoad(ctx context.Context, checkpointID string) (*Checkpoint, error)List(ctx context.Context, executionID string) ([]*Checkpoint, error)Delete(ctx context.Context, checkpointID string) errorClear(ctx context.Context, executionID string) error
文件检查点存储 #
FileCheckpointStore #
基于文件的检查点存储实现。
签名: func NewFileCheckpointStore(writer io.Writer, reader io.Reader) *FileCheckpointStore
检查点配置 #
CheckpointConfig #
检查点配置结构。
字段:
Store CheckpointStore: 存储后端AutoSave bool: 自动保存SaveInterval time.Duration: 保存间隔MaxCheckpoints int: 最大检查点数
DefaultCheckpointConfig() #
返回默认检查点配置。
签名: func DefaultCheckpointConfig() CheckpointConfig
检查点可执行对象 #
CheckpointableRunnable #
扩展 Runnable 以支持检查点功能。
方法:
SaveCheckpoint(ctx context.Context, nodeName string, state interface{}) error: 手动保存检查点LoadCheckpoint(ctx context.Context, checkpointID string) (*Checkpoint, error): 加载检查点ListCheckpoints(ctx context.Context) ([]*Checkpoint, error): 列出检查点ResumeFromCheckpoint(ctx context.Context, checkpointID string) (interface{}, error): 从检查点恢复ClearCheckpoints(ctx context.Context) error: 清除检查点GetState(ctx context.Context, config *Config) (*StateSnapshot, error): 获取状态快照UpdateState(ctx context.Context, config *Config, values interface{}, asNode string) (*Config, error): 更新状态
图表来源
- [graph/checkpointing.go](https://github.com/smallnest/langgraphgo/blob/main/graph/checkpointing.go#L22-L559)
流式处理 #
StreamMode 枚举 #
定义流式处理模式。
常量:
StreamModeValues: 发送完整状态StreamModeUpdates: 发送更新(增量)StreamModeMessages: 发送 LLM 消息/令牌StreamModeDebug: 发送所有事件(默认)
StreamConfig #
流式处理配置。
字段:
BufferSize int: 事件通道缓冲区大小EnableBackpressure bool: 启用背压处理MaxDroppedEvents int: 最大丢弃事件数Mode StreamMode: 流式模式
DefaultStreamConfig() #
返回默认流式配置。
签名: func DefaultStreamConfig() StreamConfig
StreamResult #
流式执行结果。
字段:
Events <-chan StreamEvent: 事件通道Result <-chan interface{}: 结果通道Errors <-chan error: 错误通道Done <-chan struct{}: 完成通道Cancel context.CancelFunc: 取消函数
StreamingListener #
实现 NodeListener 以支持事件流式传输。
方法:
OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error)OnChainStart(ctx context.Context, serialized map[string]interface{}, inputs map[string]interface{}, runID string, parentRunID *string, tags []string, metadata map[string]interface{})OnChainEnd(ctx context.Context, outputs map[string]interface{}, runID string)OnChainError(ctx context.Context, err error, runID string)OnLLMStart(ctx context.Context, serialized map[string]interface{}, prompts []string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})OnLLMEnd(ctx context.Context, response interface{}, runID string)OnLLMError(ctx context.Context, err error, runID string)OnToolStart(ctx context.Context, serialized map[string]interface{}, inputStr string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})OnToolEnd(ctx context.Context, output string, runID string)OnToolError(ctx context.Context, err error, runID string)OnGraphStep(ctx context.Context, stepNode string, state interface{})
StreamingRunnable #
扩展 Runnable 以支持流式处理。
方法:
Stream(ctx context.Context, initialState interface{}) *StreamResult: 开始流式执行
StreamingExecutor #
高级流式执行器。
方法:
ExecuteWithCallback(ctx context.Context, initialState interface{}, eventCallback func, resultCallback func) error: 带回调的执行ExecuteAsync(ctx context.Context, initialState interface{}) *StreamResult: 异步执行
图表来源
- [graph/streaming.go](https://github.com/smallnest/langgraphgo/blob/main/graph/streaming.go#L9-L475)
工具集成 #
ExaSearch #
Exa API 搜索工具。
构造函数:
签名: func NewExaSearch(apiKey string, opts ...ExaOption) (*ExaSearch, error)
选项:
WithExaBaseURL(url string): 设置基础 URLWithExaNumResults(num int): 设置结果数量
方法:
Name() string: 返回工具名称Description() string: 返回描述Call(ctx context.Context, input string) (string, error): 执行搜索
使用示例:
exa, err := tool.NewExaSearch("", tool.WithExaNumResults(10))
if err != nil {
log.Fatal(err)
}
result, err := exa.Call(ctx, "Go programming language")
if err != nil {
log.Fatal(err)
}
TavilySearch #
Tavily API 搜索工具。
构造函数:
签名: func NewTavilySearch(apiKey string, opts ...TavilyOption) (*TavilySearch, error)
选项:
WithTavilyBaseURL(url string): 设置基础 URLWithTavilySearchDepth(depth string): 设置搜索深度(“basic” 或 “advanced”)
方法:
Name() string: 返回工具名称Description() string: 返回描述Call(ctx context.Context, input string) (string, error): 执行搜索
使用示例:
tavily, err := tool.NewTavilySearch("", tool.WithTavilySearchDepth("advanced"))
if err != nil {
log.Fatal(err)
}
result, err := tavily.Call(ctx, "最新的人工智能发展")
if err != nil {
log.Fatal(err)
}
ToolExecutor #
工具执行器。
构造函数:
签名: func NewToolExecutor(inputTools []tools.Tool) *ToolExecutor
方法:
Execute(ctx context.Context, invocation ToolInvocation) (string, error): 执行单个工具调用ExecuteMany(ctx context.Context, invocations []ToolInvocation) ([]string, error): 执行多个工具调用ToolNode(ctx context.Context, state interface{}) (interface{}, error): 工具节点函数
使用示例:
executor := prebuilt.NewToolExecutor([]tools.Tool{exa, tavily})
// 执行单个工具
result, err := executor.Execute(ctx, prebuilt.ToolInvocation{
Tool: "Exa_Search",
ToolInput: "机器学习教程",
})
// 执行多个工具
invocations := []prebuilt.ToolInvocation{
{Tool: "Exa_Search", ToolInput: "AI"},
{Tool: "Tavily_Search", ToolInput: "最新技术"},
}
results, err := executor.ExecuteMany(ctx, invocations)
图表来源
- [tool/exa.go](https://github.com/smallnest/langgraphgo/blob/main/tool/exa.go#L36-L127)
- [tool/tavily.go](https://github.com/smallnest/langgraphgo/blob/main/tool/tavily.go#L37-L121)
- [prebuilt/tool_executor.go](https://github.com/smallnest/langgraphgo/blob/main/prebuilt/tool_executor.go#L21-L83)
状态模式与架构 #
StateSchema 接口 #
定义状态结构和更新逻辑。
接口方法:
Init() interface{}: 返回初始状态Update(current, new interface{}) (interface{}, error): 合并新状态到当前状态
MapSchema #
基于映射的状态模式实现。
构造函数:
签名: func NewMapSchema() *MapSchema
方法:
RegisterReducer(key string, reducer Reducer): 注册键的 reducerRegisterChannel(key string, reducer Reducer, isEphemeral bool): 注册通道Init() interface{}: 初始化空映射Update(current, new interface{}) (interface{}, error): 更新状态Cleanup(state interface{}) interface{}: 清理临时键
Reducers #
OverwriteReducer #
覆盖旧值。
签名: func OverwriteReducer(current, new interface{}) (interface{}, error)
AppendReducer #
追加到切片。
签名: func AppendReducer(current, new interface{}) (interface{}, error)
重试策略 #
RetryPolicy #
字段:
MaxRetries int: 最大重试次数BackoffStrategy BackoffStrategy: 退避策略RetryableErrors []string: 可重试错误模式
BackoffStrategy 枚举 #
FixedBackoff: 固定时间间隔ExponentialBackoff: 指数退避LinearBackoff: 线性退避
图表来源
- [graph/schema.go](https://github.com/smallnest/langgraphgo/blob/main/graph/schema.go#L8-L185)
配置与上下文 #
Config 结构 #
执行配置。
字段:
Callbacks []CallbackHandler: 回调处理器InterruptBefore []string: 执行前中断节点InterruptAfter []string: 执行后中断节点ResumeFrom []string: 从中断点恢复ResumeValue interface{}: 恢复值Configurable map[string]interface{}: 可配置参数Tags []string: 标签Metadata map[string]interface{}: 元数据
中断机制 #
Interrupt(ctx context.Context, value interface{}) (interface{}, error) #
暂停执行并等待输入。
参数:
ctx: 上下文value: 中断值
返回值:
interface{}: 恢复值或 nilerror: 中断错误
WithResumeValue(ctx context.Context, value interface{}) context.Context #
在上下文中添加恢复值。
签名: func WithResumeValue(ctx context.Context, value interface{}) context.Context
GetResumeValue(ctx context.Context) interface{} #
从上下文中获取恢复值。
签名: func GetResumeValue(ctx context.Context) interface{}
错误处理 #
GraphInterrupt #
执行中断错误。
字段:
Node string: 中断节点State interface{}: 中断时的状态NextNodes []string: 下一个节点InterruptValue interface{}: 中断值
图表来源
- [graph/context.go](https://github.com/smallnest/langgraphgo/blob/main/graph/context.go#L1-L16)
总结 #
langgraphgo 提供了一个强大而灵活的框架来构建复杂的 AI 应用程序。其核心特性包括:
- 双图架构: 支持状态图和消息图两种模式
- 并发执行: 自动并行处理多个节点
- 灵活的状态管理: 通过状态模式和 reducer 实现复杂的状态操作
- 持久化支持: 检查点系统支持长时间运行的应用
- 实时反馈: 流式处理提供实时事件流
- 工具集成: 内置多种外部工具集成
- 可扩展性: 丰富的配置选项和自定义能力
该库特别适合需要复杂工作流程、状态管理和持久化的 AI 应用场景,如智能代理、RAG 系统和多步骤任务处理。