Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ LangGraph provides low-level supporting infrastructure for *any* long-running, s
- [Debugging with LangSmith](http://www.langchain.com/langsmith): Gain deep visibility into complex agent behavior with visualization tools that trace execution paths, capture state transitions, and provide detailed runtime metrics.
- [Production-ready deployment](https://docs.langchain.com/langsmith/app-development): Deploy sophisticated agent systems confidently with scalable infrastructure designed to handle the unique challenges of stateful, long-running workflows.

When using human-in-the-loop interrupts, you can defer expensive context building until
an interrupt is actually raised:

```python
from langgraph.types import interrupt, ainterrupt

answer = interrupt(deferred=lambda: build_expensive_context())
# or in async nodes:
answer = await ainterrupt(deferred=fetch_latest_context)
```

## LangGraph’s ecosystem

While LangGraph can be used standalone, it also integrates seamlessly with any LangChain product, giving developers a full suite of tools for building agents. To improve your LLM application development, pair LangGraph with:
Expand Down
11 changes: 11 additions & 0 deletions libs/langgraph/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ LangGraph provides low-level supporting infrastructure for *any* long-running, s
- [Debugging with LangSmith](http://www.langchain.com/langsmith): Gain deep visibility into complex agent behavior with visualization tools that trace execution paths, capture state transitions, and provide detailed runtime metrics.
- [Production-ready deployment](https://docs.langchain.com/langsmith/app-development): Deploy sophisticated agent systems confidently with scalable infrastructure designed to handle the unique challenges of stateful, long-running workflows.

When using human-in-the-loop interrupts, you can defer expensive context building until
an interrupt is actually raised:

```python
from langgraph.types import interrupt, ainterrupt

answer = interrupt(deferred=lambda: build_expensive_context())
# or in async nodes:
answer = await ainterrupt(deferred=fetch_latest_context)
```

## LangGraph’s ecosystem

While LangGraph can be used standalone, it also integrates seamlessly with any LangChain product, giving developers a full suite of tools for building agents. To improve your LLM application development, pair LangGraph with:
Expand Down
32 changes: 32 additions & 0 deletions libs/langgraph/langgraph/_internal/_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import annotations

from typing import Any

from langgraph._internal._constants import (
CONFIG_KEY_SCRATCHPAD,
CONFIG_KEY_SEND,
RESUME,
)


def _get_resume_value(conf: dict[str, Any]) -> tuple[bool, Any | None]:
"""Return resume value if present; mutates scratchpad and emits RESUME writes."""
# track interrupt index
scratchpad = conf[CONFIG_KEY_SCRATCHPAD]
idx = scratchpad.interrupt_counter()
# find previous resume values
if scratchpad.resume:
if idx < len(scratchpad.resume):
conf[CONFIG_KEY_SEND]([(RESUME, scratchpad.resume)])
return True, scratchpad.resume[idx]
# find current resume value
v = scratchpad.get_null_resume(True)
if v is not None:
if len(scratchpad.resume) != idx:
raise RuntimeError(
f"Resume index mismatch: expected {idx}, got {len(scratchpad.resume)}"
)
scratchpad.resume.append(v)
conf[CONFIG_KEY_SEND]([(RESUME, scratchpad.resume)])
return True, v
return False, None
125 changes: 101 additions & 24 deletions libs/langgraph/langgraph/types.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

import inspect
import sys
from collections import deque
from collections.abc import Callable, Hashable, Sequence
from collections.abc import Awaitable, Callable, Hashable, Sequence
from dataclasses import asdict, dataclass
from typing import (
TYPE_CHECKING,
Expand All @@ -12,7 +13,9 @@
Literal,
NamedTuple,
TypeVar,
cast,
final,
overload,
)
from warnings import warn

Expand All @@ -24,6 +27,7 @@
from langgraph._internal._cache import default_cache_key
from langgraph._internal._fields import get_cached_annotated_keys, get_update_as_tuples
from langgraph._internal._retry import default_retry_on
from langgraph._internal._types import _get_resume_value
from langgraph._internal._typing import MISSING, DeprecatedKwargs
from langgraph.warnings import LangGraphDeprecatedSinceV10

Expand Down Expand Up @@ -55,6 +59,7 @@ class ToolOutputMixin: # type: ignore[no-redef]
"Command",
"Durability",
"interrupt",
"ainterrupt",
"Overwrite",
"ensure_valid_checkpointer",
)
Expand Down Expand Up @@ -139,6 +144,7 @@ class RetryPolicy(NamedTuple):


KeyFuncT = TypeVar("KeyFuncT", bound=Callable[..., str | bytes])
InterruptValueT = TypeVar("InterruptValueT")


@dataclass(**_DC_KWARGS)
Expand Down Expand Up @@ -417,7 +423,17 @@ def _update_as_tuples(self) -> Sequence[tuple[str, Any]]:
PARENT: ClassVar[Literal["__parent__"]] = "__parent__"


def interrupt(value: Any) -> Any:
@overload
def interrupt(value: InterruptValueT) -> InterruptValueT: ...


@overload
def interrupt(*, deferred: Callable[[], InterruptValueT]) -> InterruptValueT: ...


def interrupt(
value: Any = MISSING, *, deferred: Callable[[], Any] | object = MISSING
) -> Any:
"""Interrupt the graph with a resumable exception from within a node.

The `interrupt` function enables human-in-the-loop workflows by pausing graph
Expand Down Expand Up @@ -500,43 +516,104 @@ def node(state: State):

Args:
value: The value to surface to the client when the graph is interrupted.
deferred: Callable to lazily compute the interrupt value. Only invoked
when no resume value is found for the current interrupt index.

Returns:
Any: On subsequent invocations within the same node (same task to be precise), returns the value provided during the first invocation

Raises:
GraphInterrupt: On the first invocation within the node, halts execution and surfaces the provided value to the client.
"""
from langgraph._internal._constants import (
CONFIG_KEY_CHECKPOINT_NS,
CONFIG_KEY_SCRATCHPAD,
CONFIG_KEY_SEND,
RESUME,
)
from langgraph._internal._constants import CONFIG_KEY_CHECKPOINT_NS
from langgraph.config import get_config
from langgraph.errors import GraphInterrupt

conf = get_config()["configurable"]
# track interrupt index
scratchpad = conf[CONFIG_KEY_SCRATCHPAD]
idx = scratchpad.interrupt_counter()
# find previous resume values
if scratchpad.resume:
if idx < len(scratchpad.resume):
conf[CONFIG_KEY_SEND]([(RESUME, scratchpad.resume)])
return scratchpad.resume[idx]
# find current resume value
v = scratchpad.get_null_resume(True)
if v is not None:
assert len(scratchpad.resume) == idx, (scratchpad.resume, idx)
scratchpad.resume.append(v)
conf[CONFIG_KEY_SEND]([(RESUME, scratchpad.resume)])
return v
has_resume, resume_value = _get_resume_value(conf)
if has_resume:
return resume_value
# no resume value found
if value is MISSING and deferred is MISSING:
raise TypeError("interrupt() requires either value or deferred.")
if value is not MISSING and deferred is not MISSING:
raise TypeError("interrupt() accepts either value or deferred, not both.")
if deferred is not MISSING:
deferred_callable = cast(Callable[[], Any], deferred)
interrupt_value = deferred_callable()
else:
interrupt_value = value
if inspect.isawaitable(interrupt_value):
if hasattr(interrupt_value, "close"):
interrupt_value.close()
raise TypeError("interrupt() received an awaitable; use ainterrupt() instead.")
raise GraphInterrupt(
(
Interrupt.from_ns(
value=interrupt_value,
ns=conf[CONFIG_KEY_CHECKPOINT_NS],
),
)
)


@overload
async def ainterrupt(value: InterruptValueT) -> InterruptValueT: ...


@overload
async def ainterrupt(
*,
deferred: Callable[[], Awaitable[InterruptValueT]] | Callable[[], InterruptValueT],
) -> InterruptValueT: ...


async def ainterrupt(
value: Any = MISSING, *, deferred: Callable[[], Any] | object = MISSING
) -> Any:
"""Async version of `interrupt`, supporting lazy and awaitable values.

Use this when your interrupt value needs async work (DB/API calls) or when
you want to lazily compute the value only if the graph actually interrupts.
Use `deferred=` to defer execution until no resume value is found
for the current interrupt index.
This function must be awaited.

Args:
value: A value to surface.
deferred: Callable that returns the interrupt value (sync or awaitable).

Returns:
Any: On subsequent invocations within the same node (same task),
returns the resume value.

Raises:
GraphInterrupt: On the first invocation within the node, halts execution
and surfaces the value to the client.
"""
from langgraph._internal._constants import CONFIG_KEY_CHECKPOINT_NS
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports should be outside the function at the top of the file unless there's a compelling reason not to. I know that at least 1 place before we broke this rule

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I do that, I'm getting an error (circular dependency / circular import):

ImportError while loading conftest 'langgraph/libs/langgraph/tests/conftest.py'.
tests/conftest.py:15: in <module>
    from langgraph.types import Durability
langgraph/types.py:32: in <module>
    from langgraph.config import get_config
langgraph/config.py:10: in <module>
    from langgraph.types import StreamWriter
E   ImportError: cannot import name 'StreamWriter' from partially initialized module 'langgraph.types' (most likely due to a circular import) (langgraph/libs/langgraph/langgraph/types.py)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks!

from langgraph.config import get_config
from langgraph.errors import GraphInterrupt

conf = get_config()["configurable"]
has_resume, resume_value = _get_resume_value(conf)
if has_resume:
return resume_value
if value is MISSING and deferred is MISSING:
raise TypeError("ainterrupt() requires either value or deferred.")
if value is not MISSING and deferred is not MISSING:
raise TypeError("ainterrupt() accepts either value or deferred, not both.")
if deferred is not MISSING:
deferred_callable = cast(Callable[[], Any], deferred)
interrupt_value = deferred_callable()
else:
interrupt_value = value
if inspect.isawaitable(interrupt_value):
interrupt_value = await interrupt_value
raise GraphInterrupt(
(
Interrupt.from_ns(
value=value,
value=interrupt_value,
ns=conf[CONFIG_KEY_CHECKPOINT_NS],
),
)
Expand Down
96 changes: 96 additions & 0 deletions libs/langgraph/tests/test_pregel.py
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,102 @@ def graph(input: list[int]) -> list[str]:
assert mapper_calls == 2


def test_interrupt_callable_lazy(
sync_checkpointer: BaseCheckpointSaver, durability: Durability
) -> None:
calls = 0

def lazy_value() -> str:
nonlocal calls
calls += 1
return "question"

@entrypoint(checkpointer=sync_checkpointer)
def graph(input: str) -> str:
answer = interrupt(deferred=lazy_value)
return input + answer

thread = {"configurable": {"thread_id": "1"}}
events = [*graph.stream("hi", thread, durability=durability)]
assert events[-1] == {"__interrupt__": (Interrupt(value="question", id=AnyStr()),)}
assert calls == 1

assert (
graph.invoke(Command(resume="answer"), thread, durability=durability)
== "hianswer"
)
assert calls == 1


def test_interrupt_rejects_async_callable(
sync_checkpointer: BaseCheckpointSaver, durability: Durability
) -> None:
async def lazy_async() -> str:
return "question"

@entrypoint(checkpointer=sync_checkpointer)
def graph(input: str) -> str:
interrupt(deferred=lazy_async)
return input

thread = {"configurable": {"thread_id": "lazy-async-1"}}
with pytest.raises(TypeError, match="use ainterrupt"):
graph.invoke("hi", thread, durability=durability)


def test_lazy_interrupt_callable_raises(
sync_checkpointer: BaseCheckpointSaver, durability: Durability
) -> None:
def lazy_value() -> str:
raise ValueError("boom")

@entrypoint(checkpointer=sync_checkpointer)
def graph(input: str) -> str:
interrupt(deferred=lazy_value)
return input

thread = {"configurable": {"thread_id": "lazy-raise-1"}}
with pytest.raises(ValueError, match="boom"):
graph.invoke("hi", thread, durability=durability)


def test_multiple_lazy_interrupts_in_node(
sync_checkpointer: BaseCheckpointSaver, durability: Durability
) -> None:
calls: list[str] = []

def lazy_first() -> str:
calls.append("first")
return "question-one"

def lazy_second() -> str:
calls.append("second")
return "question-two"

@entrypoint(checkpointer=sync_checkpointer)
def graph(input: str) -> str:
first = interrupt(deferred=lazy_first)
second = interrupt(deferred=lazy_second)
return f"{input}:{first}:{second}"

thread = {"configurable": {"thread_id": "lazy-multi-1"}}
assert [*graph.stream("hi", thread, durability=durability)] == [
{"__interrupt__": (Interrupt(value="question-one", id=AnyStr()),)}
]
assert calls == ["first"]

assert [
*graph.stream(Command(resume="answer-one"), thread, durability=durability)
] == [{"__interrupt__": (Interrupt(value="question-two", id=AnyStr()),)}]
assert calls == ["first", "second"]

assert (
graph.invoke(Command(resume="answer-two"), thread, durability=durability)
== "hi:answer-one:answer-two"
)
assert calls == ["first", "second"]


def test_imp_nested(
sync_checkpointer: BaseCheckpointSaver, durability: Durability
) -> None:
Expand Down
Loading