Durable Execution
Durable execution is a core LangGraph feature that enables long-running workflows to survive interruptions, failures, and restarts without losing progress.
What It Is
Durable execution saves workflow progress at key points (super-steps) so the graph can:
- Pause: Stop mid-execution, persist state
- Resume: Continue from exact save point on restart
- Recover: Pick up after crashes without redoing work
Progress is saved to a checkpointer after each super-step (every node boundary). On resume, the graph reads the latest checkpoint and continues.
Requirements
To enable durable execution:
- Checkpointer: A
CheckpointSaver(e.g.,MemorySaver,SqliteSaver,PostgresSaver) - Thread ID: A unique
thread_idin config to isolate execution state per workflow instance - Task wrapping: Non-deterministic operations should be wrapped in tasks for safe replay
from langgraph.checkpoint.memory import MemorySaver
graph = builder.compile(checkpointer=MemorySaver())
config = {"configurable": {"thread_id": "my-thread-1"}}
graph.invoke({"input": "data"}, config)
Determinism
To ensure correct replay behavior:
- Avoid repeating work: Use idempotent operations where possible
- Encapsulate non-deterministic ops: Wrap LLM calls, random number generation, and network requests in tasks so they are checkpointed and not re-executed on resume
- Use idempotent operations: Design nodes so that replaying them with the same inputs produces the same state transition
If a node calls an LLM, the LLM response is saved in the checkpoint so replaying does not re-invoke the LLM (and incur duplicate cost).
Durability Modes
Durability modes control when checkpoints are persisted relative to graph execution:
| Mode | Behavior |
|---|---|
| exit | Save checkpoint only when node exits. Fastest, but on crash the entire in-flight step is lost. |
| async | Save checkpoint asynchronously in background while next step begins. Good throughput/fault-tolerance balance. |
| sync | Save checkpoint synchronously before next step executes. Maximum safety, highest latency. |
Usage:
graph.invoke(inputs, config, durability="async")
Using Tasks in Nodes
Tasks convert operations within a node into durable, resumable units. The task itself is checkpointed.
from langgraph.types import Send
def my_node(state, *, runtime):
task = runtime.task("fetch_data", fetch_from_api, state["query"])
result = task.result() # returns immediately, or resumes from checkpoint
return {"data": result}
This prevents re-executing the API call on replay -- the task result is persisted in the checkpoint.
Resuming Workflows
Interrupt and Command
Use interrupt() to pause and Command(resume=...) to resume with a value:
from langgraph.types import interrupt, Command
def approval_node(state):
result = interrupt("Approve this action?")
return {"approved": result}
# Resume:
graph.invoke(Command(resume=True), config)
Recovering from Failures
If a workflow crashes, simply invoke with the same thread_id and config. The graph resumes from the last checkpoint automatically:
# After crash:
graph.invoke(None, config) # resumes from last saved checkpoint
Starting Points for Resuming
Where the graph resumes depends on the API used:
| API | Resume Point |
|---|---|
| StateGraph | Beginning of the node that was executing when interrupted/crashed |
Functional API (@entrypoint) |
Beginning of the entrypoint call (top-level or inner @entrypoint) |
In StateGraph, any code before
interrupt()in a node re-executes on resume. Putinterrupt()as early as possible in the node.
Graceful Shutdown
RunControl
LangGraph supports graceful shutdown via RunControl:
from langgraph.types import RunControl
def my_node(state, *, runtime):
if runtime.run_control.should_stop:
# Save progress and exit cleanly
return {"partial": state.get("progress")}
Request Drain
Drain all in-flight runs before shutdown:
await graph.request_drain() # stop accepting new runs, finish current ones
GraphDrained
Wait until all runs complete:
await graph.await_drain() # blocks until all in-flight runs finish
SIGTERM Pattern
Production shutdown handler:
import signal
async def handle_shutdown():
await graph.request_drain()
await graph.await_drain()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.create_task(handle_shutdown()))
Resume After Drain
After draining and restarting, workflows with unsaved mid-node progress resume from the last checkpoint. Use durability="sync" for critical sections where in-progress state must survive restarts.
Related: Persistence, Fault Tolerance, Streaming