Streaming
LangGraph supports streaming graph execution output in multiple modes, enabling real-time visibility into workflow progress.
Stream Modes
| Mode | What it streams | Use Case |
|---|---|---|
| values | Full state after each super-step | Track state evolution |
| updates | State delta after each node | Monitor per-node output |
| messages | LLM message tokens | Typewriter-effect output |
| custom | User-defined events | Progress bars, logs |
| debug | Detailed execution trace | Debugging, diagnostics |
Configuring Streaming
Pass stream_mode to the graph invocation:
# Single mode:
for event in graph.stream(inputs, config, stream_mode="values"):
print(event)
# Multiple modes simultaneously:
for event in graph.stream(inputs, config, stream_mode=["updates", "custom"]):
print(event)
Each event is a tuple of (namespace, mode, data).
Streaming LLM Tokens
To stream LLM tokens as they are generated, use stream_mode="messages":
from langchain_core.messages import AIMessageChunk
for event in graph.stream(inputs, config, stream_mode="messages"):
chunk, metadata = event
if isinstance(chunk, AIMessageChunk):
print(chunk.content, end="", flush=True)
This requires the underlying model provider to support streaming (OpenAI, Anthropic, etc. do by default).
Custom Streaming
Use get_stream_writer() inside a node to emit custom events:
from langgraph.config import get_stream_writer
def my_node(state):
writer = get_stream_writer()
writer({"progress": "starting analysis", "percent": 0})
for i, chunk in enumerate(process_data(state)):
writer({"progress": f"Processing chunk {i+1}", "percent": (i+1)*10})
# ... work ...
writer({"progress": "complete", "percent": 100})
return {"processed": True}
Custom events are consumed when stream_mode includes "custom":
for ns, mode, data in graph.stream(inputs, config, stream_mode=["custom"]):
if mode == "custom":
print(f"[{data['percent']}%] {data['progress']}")
V2 Format: StreamPart
LangGraph v2 introduces type-safe streaming with StreamPart:
from langgraph.types import StreamPart
for part in graph.astream_events(inputs, config):
# StreamPart is typed -- access fields directly:
# part.event, part.data, part.name, part.tags, part.metadata
if part.event == "on_chat_model_stream":
print(part.data["chunk"].content, end="")
StreamPart provides structured, type-safe events with consistent schema across different event types.
Stream Events
Common events emitted during graph execution:
| Event | Description |
|---|---|
on_chain_start |
Entering a node or graph |
on_chain_end |
Exiting a node or graph |
on_chat_model_stream |
LLM token chunk |
on_custom_event |
User-defined event (from get_stream_writer) |
on_tool_start |
Tool invocation begins |
on_tool_end |
Tool invocation completes |
Events include .name (node/component name), .tags (hierarchical path), and .metadata (thread_id, checkpoint_id).
Examples
Track Node Progress
for ns, mode, data in graph.stream(inputs, config, stream_mode=["updates"]):
print(f"Node output: {data}")
Debug Execution
for event in graph.stream(inputs, config, stream_mode=["debug"]):
print(f"[{event['type']}] {event['name']}: {event['payload']}")
Combined: Progress + LLM Tokens
for ns, mode, data in graph.stream(
inputs, config, stream_mode=["custom", "messages"]
):
if mode == "custom":
print(f" >>> {data['message']}")
elif mode == "messages":
chunk, _metadata = data
if hasattr(chunk, 'content'):
print(chunk.content, end="")
Related: Interrupts, Persistence, Functional API