Streaming

In modern AI applications, user experience is paramount. Users don't want to stare at a spinning loading icon for seconds or even longer while the entire response is being generated. Streaming allows applications to display intermediate results in real-time, greatly improving response speed and interactivity.

1. Streaming Mode

Background & Functionality

LangGraphGo's graph execution is inherently step-by-step. After each node finishes execution, the state is updated. Streaming mode allows subscribers to get these state changes in real-time, instead of waiting for the entire graph execution to finish.

This is very useful for debugging, progress bar display, and building complex interactive UIs.

Implementation Principle

The Stream method returns a Go Channel. At the end of each step (Superstep) of graph execution, the runtime sends the current state snapshot or state increment to this Channel.

Code Showcase

// Start streaming execution
// input: initial input
// config: run config
stream, err := runnable.Stream(ctx, input)
if err != nil {
    panic(err)
}

// Consume stream
for chunk := range stream {
    // chunk is a map where key is node name and value is output of that node
    for nodeName, output := range chunk {
        fmt.Printf("Node [%s] finished, output: %v\n", nodeName, output)
    }
}

2. Streaming LLM Tokens

Background & Functionality

This is the most common streaming requirement: displaying LLM generated content character by character like a typewriter. This is different from the node-level streaming above; this is fine-grained streaming inside a node.

Implementation Principle

This relies on the callback mechanism of the underlying LLM driver (such as langchaingo). We need to register a callback function when calling the LLM, and this function is called whenever the LLM generates a new Token. In LangGraphGo, we usually pass this callback function to the node via Context or Config.

Code Showcase

// 1. Define node, support streaming callback
func chatNode(ctx context.Context, state interface{}) (interface{}, error) {
    messages := state.([]llms.MessageContent)
    model, _ := openai.New()
    
    // Define streaming callback function
    streamingFunc := func(ctx context.Context, chunk []byte) error {
        // Here you can send chunk to frontend or print to console
        fmt.Print(string(chunk)) 
        return nil
    }
    
    // Pass WithStreamingFunc when calling LLM
    response, err := model.GenerateContent(ctx, messages, llms.WithStreamingFunc(streamingFunc))
    if err != nil {
        return nil, err
    }
    
    return append(messages, llms.TextParts("ai", response.Choices[0].Content)), nil
}

// ... In main program ...
fmt.Println("AI Response:")
runnable.Invoke(ctx, input) // Console will print tokens in real-time
fmt.Println() // Newline

Application in Web Services

When building HTTP APIs, you can combine Go's http.Flusher or Server-Sent Events (SSE) to push these Tokens to the browser in real-time.