Functional API Overview
Status: ACTIVE (pulled from docs.langchain.com) Source: https://docs.langchain.com/oss/python/langgraph/functional-api Timestamp: 2026-05-11
Add LangGraph features (persistence, memory, human-in-the-loop, streaming) to applications with minimal code changes.
Core Decorators
@task
Marks a function as a durable, cacheable unit of work:
from langgraph.func import task
@task
def is_even(number: int) -> bool:
return number % 2 == 0
@entrypoint
Marks the main workflow function:
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def workflow(inputs: dict) -> str:
even = is_even(inputs["number"]).result()
return format_message(even).result()
Key Features
Parallel Execution
@entrypoint(checkpointer=checkpointer)
def graph(numbers: list[int]) -> list[str]:
futures = [add_one(i) for i in numbers]
return [f.result() for f in futures]
Calling Other Graphs and Entrypoints
@entrypoint()
def my_workflow(inputs: dict) -> int:
result_1 = some_graph.invoke(...)
result_2 = another_workflow.invoke(...)
return {"result_1": result_1, "result_2": result_2}
Streaming
from langgraph.config import get_stream_writer
@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> int:
writer = get_stream_writer()
writer("Started processing")
result = inputs["x"] * 2
writer(f"Result is {result}")
return result
for mode, chunk in main.stream({"x": 5}, stream_mode=["custom", "updates"], config=config):
print(f"{mode}: {chunk}")
Retry Policy
from langgraph.types import RetryPolicy
@task(retry_policy=RetryPolicy(retry_on=ValueError))
def get_info():
...
Timeouts
@task(timeout=1.0, retry_policy=RetryPolicy(retry_on=NodeTimeoutError))
async def call_api(url: str) -> str:
await asyncio.sleep(2)
return f"result from {url}"
Caching
from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy
@task(cache_policy=CachePolicy(ttl=120))
def slow_add(x: int) -> int:
time.sleep(1)
return x * 2
@entrypoint(cache=InMemoryCache())
def main(inputs: dict) -> dict[str, int]:
result1 = slow_add(inputs["x"]).result()
result2 = slow_add(inputs["x"]).result() # Uses cache
return {"result1": result1, "result2": result2}
Short-Term Memory
@entrypoint(checkpointer=checkpointer)
def workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage]):
if previous:
inputs = add_messages(previous, inputs)
response = call_model(inputs).result()
return entrypoint.final(value=response, save=add_messages(inputs, response))
entrypoint.final
Decouple return value from persisted state:
@entrypoint(checkpointer=checkpointer)
def accumulate(n: int, *, previous: int | None) -> entrypoint.final[int, int]:
previous = previous or 0
total = previous + n
return entrypoint.final(value=previous, save=total)
Error Recovery
try:
main.invoke({'any_input': 'foobar'}, config=config)
except ValueError:
pass
# Resume where it left off
main.invoke(None, config=config)
Determinism
Task results are cached. On resume, tasks whose inputs haven't changed return cached results instead of re-executing. Non-deterministic operations must be inside @task.