Streaming

LangGraph supports streaming graph execution output in multiple modes, enabling real-time visibility into workflow progress.

Stream Modes

Mode What it streams Use Case
values Full state after each super-step Track state evolution
updates State delta after each node Monitor per-node output
messages LLM message tokens Typewriter-effect output
custom User-defined events Progress bars, logs
debug Detailed execution trace Debugging, diagnostics

Configuring Streaming

Pass stream_mode to the graph invocation:

# Single mode:
for event in graph.stream(inputs, config, stream_mode="values"):
    print(event)

# Multiple modes simultaneously:
for event in graph.stream(inputs, config, stream_mode=["updates", "custom"]):
    print(event)

Each event is a tuple of (namespace, mode, data).


Streaming LLM Tokens

To stream LLM tokens as they are generated, use stream_mode="messages":

from langchain_core.messages import AIMessageChunk

for event in graph.stream(inputs, config, stream_mode="messages"):
    chunk, metadata = event
    if isinstance(chunk, AIMessageChunk):
        print(chunk.content, end="", flush=True)

This requires the underlying model provider to support streaming (OpenAI, Anthropic, etc. do by default).


Custom Streaming

Use get_stream_writer() inside a node to emit custom events:

from langgraph.config import get_stream_writer

def my_node(state):
    writer = get_stream_writer()
    writer({"progress": "starting analysis", "percent": 0})

    for i, chunk in enumerate(process_data(state)):
        writer({"progress": f"Processing chunk {i+1}", "percent": (i+1)*10})
        # ... work ...

    writer({"progress": "complete", "percent": 100})
    return {"processed": True}

Custom events are consumed when stream_mode includes "custom":

for ns, mode, data in graph.stream(inputs, config, stream_mode=["custom"]):
    if mode == "custom":
        print(f"[{data['percent']}%] {data['progress']}")

V2 Format: StreamPart

LangGraph v2 introduces type-safe streaming with StreamPart:

from langgraph.types import StreamPart

for part in graph.astream_events(inputs, config):
    # StreamPart is typed -- access fields directly:
    # part.event, part.data, part.name, part.tags, part.metadata
    if part.event == "on_chat_model_stream":
        print(part.data["chunk"].content, end="")

StreamPart provides structured, type-safe events with consistent schema across different event types.


Stream Events

Common events emitted during graph execution:

Event Description
on_chain_start Entering a node or graph
on_chain_end Exiting a node or graph
on_chat_model_stream LLM token chunk
on_custom_event User-defined event (from get_stream_writer)
on_tool_start Tool invocation begins
on_tool_end Tool invocation completes

Events include .name (node/component name), .tags (hierarchical path), and .metadata (thread_id, checkpoint_id).


Examples

Track Node Progress

for ns, mode, data in graph.stream(inputs, config, stream_mode=["updates"]):
    print(f"Node output: {data}")

Debug Execution

for event in graph.stream(inputs, config, stream_mode=["debug"]):
    print(f"[{event['type']}] {event['name']}: {event['payload']}")

Combined: Progress + LLM Tokens

for ns, mode, data in graph.stream(
    inputs, config, stream_mode=["custom", "messages"]
):
    if mode == "custom":
        print(f"  >>> {data['message']}")
    elif mode == "messages":
        chunk, _metadata = data
        if hasattr(chunk, 'content'):
            print(chunk.content, end="")

Related: Interrupts, Persistence, Functional API