Functional API Overview

Status: ACTIVE (pulled from docs.langchain.com) Source: https://docs.langchain.com/oss/python/langgraph/functional-api Timestamp: 2026-05-11

Add LangGraph features (persistence, memory, human-in-the-loop, streaming) to applications with minimal code changes.

Core Decorators

@task

Marks a function as a durable, cacheable unit of work:

from langgraph.func import task

@task
def is_even(number: int) -> bool:
    return number % 2 == 0

@entrypoint

Marks the main workflow function:

from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(inputs: dict) -> str:
    even = is_even(inputs["number"]).result()
    return format_message(even).result()

Key Features

Parallel Execution

@entrypoint(checkpointer=checkpointer)
def graph(numbers: list[int]) -> list[str]:
    futures = [add_one(i) for i in numbers]
    return [f.result() for f in futures]

Calling Other Graphs and Entrypoints

@entrypoint()
def my_workflow(inputs: dict) -> int:
    result_1 = some_graph.invoke(...)
    result_2 = another_workflow.invoke(...)
    return {"result_1": result_1, "result_2": result_2}

Streaming

from langgraph.config import get_stream_writer

@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> int:
    writer = get_stream_writer()
    writer("Started processing")
    result = inputs["x"] * 2
    writer(f"Result is {result}")
    return result

for mode, chunk in main.stream({"x": 5}, stream_mode=["custom", "updates"], config=config):
    print(f"{mode}: {chunk}")

Retry Policy

from langgraph.types import RetryPolicy

@task(retry_policy=RetryPolicy(retry_on=ValueError))
def get_info():
    ...

Timeouts

@task(timeout=1.0, retry_policy=RetryPolicy(retry_on=NodeTimeoutError))
async def call_api(url: str) -> str:
    await asyncio.sleep(2)
    return f"result from {url}"

Caching

from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy

@task(cache_policy=CachePolicy(ttl=120))
def slow_add(x: int) -> int:
    time.sleep(1)
    return x * 2

@entrypoint(cache=InMemoryCache())
def main(inputs: dict) -> dict[str, int]:
    result1 = slow_add(inputs["x"]).result()
    result2 = slow_add(inputs["x"]).result()  # Uses cache
    return {"result1": result1, "result2": result2}

Short-Term Memory

@entrypoint(checkpointer=checkpointer)
def workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage]):
    if previous:
        inputs = add_messages(previous, inputs)
    response = call_model(inputs).result()
    return entrypoint.final(value=response, save=add_messages(inputs, response))

entrypoint.final

Decouple return value from persisted state:

@entrypoint(checkpointer=checkpointer)
def accumulate(n: int, *, previous: int | None) -> entrypoint.final[int, int]:
    previous = previous or 0
    total = previous + n
    return entrypoint.final(value=previous, save=total)

Error Recovery

try:
    main.invoke({'any_input': 'foobar'}, config=config)
except ValueError:
    pass

# Resume where it left off
main.invoke(None, config=config)

Determinism

Task results are cached. On resume, tasks whose inputs haven't changed return cached results instead of re-executing. Non-deterministic operations must be inside @task.