feat(langgraph): add async_interrupt#6729
feat(langgraph): add async_interrupt#6729pawel-twardziak wants to merge 6 commits intolangchain-ai:mainfrom
Conversation
Add `async_interrupt` and `DeferredValue` for lazy/async interrupt values Introduce `DeferredValue` wrapper and `async_interrupt()` to support lazily computed and awaitable interrupt values. The interrupt value is only evaluated when no resume value exists, avoiding unnecessary work on subsequent invocations. - Add `DeferredValue` dataclass wrapping a sync or async callable - Add `async_interrupt()` for async nodes with awaitable deferred values - Refactor `interrupt()` to support `DeferredValue` and reject awaitables - Extract shared `_get_resume_value()` helper - Add sync and async tests for lazy, multi-interrupt, and error cases
|
Thanks for putting this up! Agreed the functionality would be nice to have. Left a few comments. I've got a deadline tonight but will try to revisit reviewing here tomorrow |
| GraphInterrupt: On the first invocation within the node, halts execution | ||
| and surfaces the value to the client. | ||
| """ | ||
| from langgraph._internal._constants import CONFIG_KEY_CHECKPOINT_NS |
There was a problem hiding this comment.
Imports should be outside the function at the top of the file unless there's a compelling reason not to. I know that at least 1 place before we broke this rule
There was a problem hiding this comment.
when I do that, I'm getting an error (circular dependency / circular import):
ImportError while loading conftest 'langgraph/libs/langgraph/tests/conftest.py'.
tests/conftest.py:15: in <module>
from langgraph.types import Durability
langgraph/types.py:32: in <module>
from langgraph.config import get_config
langgraph/config.py:10: in <module>
from langgraph.types import StreamWriter
E ImportError: cannot import name 'StreamWriter' from partially initialized module 'langgraph.types' (most likely due to a circular import) (langgraph/libs/langgraph/langgraph/types.py)
libs/langgraph/langgraph/types.py
Outdated
| def interrupt(value: InterruptValueT) -> InterruptValueT: ... | ||
|
|
||
|
|
||
| def interrupt(value: Any | DeferredValue) -> Any: |
There was a problem hiding this comment.
Hm - I generally prefer allowing stdlib imports rather than forcing people to remember and import another objet type (in this case DeferredValue).
What I had in mind was something like:
@overload
def interrupt(value: T) -> T: ...
@overload
def interrupt(*, deferred: Callable[[], T]) -> T: ...
def interrupt(value: Any = MISSING, *, deferred: Callable[[], Any] = MISSING) -> Any:cc @sydney-runkle icyc
There was a problem hiding this comment.
Yes, definitely you are right. I did overengineering. Changing to Callable ;)
|
Ok, thanks for such a quick review @hinthornw - I'll apply your comment within a few hours :) |
|
bumping up @hinthornw @sydney-runkle :) |
|
Another way to do this would be to do the callback in a |
|
In this scenario, when does |
|
Aaaa I see this: and # if there are pending writes from a previous loop, apply them
if self.skip_done_tasks and self.checkpoint_pending_writes:
self._match_writes(self.tasks)Awesome! So that's how |
|
Hey @hinthornw my original problem is within a subgraph. @task
async def generate_interrupt_message(
tool_descriptor: Callable[[ToolCallRequest], Awaitable[str]],
request: ToolCallRequest,
) -> Dict[str, Dict[str, Any]]:
description = await tool_descriptor(request)
return {
"action_request": {
"name": request.tool_call["name"],
"description": description,
"args": request.tool_call["args"],
}
}
class HITLMiddleware(AgentMiddleware):
"""Custom Human-In-The-Loop Middleware for Agent interactions."""
def __init__(
self,
tools_names: Dict[str, Dict[str, Any]],
tool_descriptor: Callable[[ToolCallRequest], Awaitable[str]],
) -> None:
super().__init__()
self.tool_descriptor = tool_descriptor # heavy logic function
self.tools_names = tools_names
async def handle_interrupt(
self,
request: ToolCallRequest,
) -> InterruptAnswer:
tool_desc = await generate_interrupt_message(self.tool_descriptor, request)
human_approval = interrupt(tool_desc)
return cast(InterruptAnswer, human_approval)
async def awrap_tool_call(
self,
request: ToolCallRequest,
handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]],
) -> ToolMessage | Command:
if request.tool_call["name"] not in self.tools_names:
return await handler(request)
human_approval = await self.handle_interrupt(request)
if human_approval["decisions"][0]["type"] == "approve":
return await handler(request)
content = human_approval["decisions"][0].get("message") or (
f"User rejected the tool call for `{request.tool_call['name']}` with id {request.tool_call['id']}"
)
update_messages_list = []
if "openai" in get_settings().model_name.split()[0].lower():
update_messages_list.append(
ToolMessage(
tool_call_id=request.tool_call["id"],
content=content,
status="error",
)
)
update_messages_list.append(HumanMessage(content=content))
return Command(
update={"messages": update_messages_list},
)If you want a full code, i would write an example |
Inspiration: https://forum.langchain.com/t/feature-request-support-lazy-evaluation-in-interrupt-via-callables/2837
Add
async_interruptandDeferredValuefor lazy/async interrupt valuesIntroduce
DeferredValuewrapper andasync_interrupt()to support lazily computed and awaitable interrupt values. The interrupt value is only evaluated when no resume value exists, avoiding unnecessary work on subsequent invocations.DeferredValuedataclass wrapping a sync or async callableasync_interrupt()for async nodes with awaitable deferred valuesinterrupt()to supportDeferredValueand reject awaitables_get_resume_value()helper