Skip to content

Commit 3d08c22

Browse files
majdyzclaude
andauthored
feat(platform): add Human In The Loop block with review workflow (#11380)
## Summary This PR implements a comprehensive Human In The Loop (HITL) block that allows agents to pause execution and wait for human approval/modification of data before continuing. https://github.com/user-attachments/assets/c027d731-17d3-494c-85ca-97c3bf33329c ## Key Features - Added WAITING_FOR_REVIEW status to AgentExecutionStatus enum - Created PendingHumanReview database table for storing review requests - Implemented HumanInTheLoopBlock that extracts input data and creates review entries - Added API endpoints at /api/executions/review for fetching and reviewing pending data - Updated execution manager to properly handle waiting status and resume after approval ## Frontend Components - PendingReviewCard for individual review handling - PendingReviewsList for multiple reviews - FloatingReviewsPanel for graph builder integration - Integrated review UI into 3 locations: legacy library, new library, and graph builder ## Technical Implementation - Added proper type safety throughout with SafeJson handling - Optimized database queries using count functions instead of full data fetching - Fixed imports to be top-level instead of local - All formatters and linters pass ## Test plan - [ ] Test Human In The Loop block creation in graph builder - [ ] Test block execution pauses and creates pending review - [ ] Test review UI appears in all 3 locations - [ ] Test data modification and approval workflow - [ ] Test rejection workflow - [ ] Test execution resumes after approval 🤖 Generated with [Claude Code](https://claude.ai/code) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added Human-In-The-Loop review workflows to pause executions for human validation. * Users can approve or reject pending tasks, optionally editing submitted data and adding a message. * New "Waiting for Review" execution status with UI indicators across run lists, badges, and activity views. * Review management UI: pending review cards, list view, and a floating reviews panel for quick access. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Claude <[email protected]>
1 parent ff5dd7a commit 3d08c22

File tree

42 files changed

+3137
-140
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+3137
-140
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import logging
2+
from typing import Any, Literal
3+
4+
from prisma.enums import ReviewStatus
5+
6+
from backend.data.block import (
7+
Block,
8+
BlockCategory,
9+
BlockOutput,
10+
BlockSchemaInput,
11+
BlockSchemaOutput,
12+
)
13+
from backend.data.execution import ExecutionStatus
14+
from backend.data.human_review import ReviewResult
15+
from backend.data.model import SchemaField
16+
from backend.executor.manager import async_update_node_execution_status
17+
from backend.util.clients import get_database_manager_async_client
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
class HumanInTheLoopBlock(Block):
23+
"""
24+
This block pauses execution and waits for human approval or modification of the data.
25+
26+
When executed, it creates a pending review entry and sets the node execution status
27+
to REVIEW. The execution will remain paused until a human user either:
28+
- Approves the data (with or without modifications)
29+
- Rejects the data
30+
31+
This is useful for workflows that require human validation or intervention before
32+
proceeding to the next steps.
33+
"""
34+
35+
class Input(BlockSchemaInput):
36+
data: Any = SchemaField(description="The data to be reviewed by a human user")
37+
name: str = SchemaField(
38+
description="A descriptive name for what this data represents",
39+
)
40+
editable: bool = SchemaField(
41+
description="Whether the human reviewer can edit the data",
42+
default=True,
43+
advanced=True,
44+
)
45+
46+
class Output(BlockSchemaOutput):
47+
reviewed_data: Any = SchemaField(
48+
description="The data after human review (may be modified)"
49+
)
50+
status: Literal["approved", "rejected"] = SchemaField(
51+
description="Status of the review: 'approved' or 'rejected'"
52+
)
53+
review_message: str = SchemaField(
54+
description="Any message provided by the reviewer", default=""
55+
)
56+
57+
def __init__(self):
58+
super().__init__(
59+
id="8b2a7b3c-6e9d-4a5f-8c1b-2e3f4a5b6c7d",
60+
description="Pause execution and wait for human approval or modification of data",
61+
categories={BlockCategory.BASIC},
62+
input_schema=HumanInTheLoopBlock.Input,
63+
output_schema=HumanInTheLoopBlock.Output,
64+
test_input={
65+
"data": {"name": "John Doe", "age": 30},
66+
"name": "User profile data",
67+
"editable": True,
68+
},
69+
test_output=[
70+
("reviewed_data", {"name": "John Doe", "age": 30}),
71+
("status", "approved"),
72+
("review_message", ""),
73+
],
74+
test_mock={
75+
"get_or_create_human_review": lambda *_args, **_kwargs: ReviewResult(
76+
data={"name": "John Doe", "age": 30},
77+
status=ReviewStatus.APPROVED,
78+
message="",
79+
processed=False,
80+
node_exec_id="test-node-exec-id",
81+
),
82+
"update_node_execution_status": lambda *_args, **_kwargs: None,
83+
},
84+
)
85+
86+
async def run(
87+
self,
88+
input_data: Input,
89+
*,
90+
user_id: str,
91+
node_exec_id: str,
92+
graph_exec_id: str,
93+
graph_id: str,
94+
graph_version: int,
95+
**kwargs,
96+
) -> BlockOutput:
97+
"""
98+
Execute the Human In The Loop block.
99+
100+
This method uses one function to handle the complete workflow - checking existing reviews
101+
and creating pending ones as needed.
102+
"""
103+
try:
104+
logger.debug(f"HITL block executing for node {node_exec_id}")
105+
106+
# Use the data layer to handle the complete workflow
107+
db_client = get_database_manager_async_client()
108+
result = await db_client.get_or_create_human_review(
109+
user_id=user_id,
110+
node_exec_id=node_exec_id,
111+
graph_exec_id=graph_exec_id,
112+
graph_id=graph_id,
113+
graph_version=graph_version,
114+
input_data=input_data.data,
115+
message=input_data.name,
116+
editable=input_data.editable,
117+
)
118+
except Exception as e:
119+
logger.error(f"Error in HITL block for node {node_exec_id}: {str(e)}")
120+
raise
121+
122+
# Check if we're waiting for human input
123+
if result is None:
124+
logger.info(
125+
f"HITL block pausing execution for node {node_exec_id} - awaiting human review"
126+
)
127+
try:
128+
# Set node status to REVIEW so execution manager can't mark it as COMPLETED
129+
# The VALID_STATUS_TRANSITIONS will then prevent any unwanted status changes
130+
# Use the proper wrapper function to ensure websocket events are published
131+
await async_update_node_execution_status(
132+
db_client=db_client,
133+
exec_id=node_exec_id,
134+
status=ExecutionStatus.REVIEW,
135+
)
136+
# Execution pauses here until API routes process the review
137+
return
138+
except Exception as e:
139+
logger.error(
140+
f"Failed to update node status for HITL block {node_exec_id}: {str(e)}"
141+
)
142+
raise
143+
144+
# Review is complete (approved or rejected) - check if unprocessed
145+
if not result.processed:
146+
# Mark as processed before yielding
147+
await db_client.update_review_processed_status(
148+
node_exec_id=node_exec_id, processed=True
149+
)
150+
151+
if result.status == ReviewStatus.APPROVED:
152+
yield "status", "approved"
153+
yield "reviewed_data", result.data
154+
if result.message:
155+
yield "review_message", result.message
156+
157+
elif result.status == ReviewStatus.REJECTED:
158+
yield "status", "rejected"
159+
if result.message:
160+
yield "review_message", result.message

autogpt_platform/backend/backend/data/credit_test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ async def test_block_credit_usage(server: SpinTestServer):
7373
NodeExecutionEntry(
7474
user_id=DEFAULT_USER_ID,
7575
graph_id="test_graph",
76+
graph_version=1,
7677
node_id="test_node",
7778
graph_exec_id="test_graph_exec",
7879
node_exec_id="test_node_exec",
@@ -94,6 +95,7 @@ async def test_block_credit_usage(server: SpinTestServer):
9495
NodeExecutionEntry(
9596
user_id=DEFAULT_USER_ID,
9697
graph_id="test_graph",
98+
graph_version=1,
9799
node_id="test_node",
98100
graph_exec_id="test_graph_exec",
99101
node_exec_id="test_node_exec",

autogpt_platform/backend/backend/data/execution.py

Lines changed: 87 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
AgentNodeExecutionKeyValueDataCreateInput,
3535
AgentNodeExecutionUpdateInput,
3636
AgentNodeExecutionWhereInput,
37+
AgentNodeExecutionWhereUniqueInput,
3738
)
3839
from pydantic import BaseModel, ConfigDict, JsonValue, ValidationError
3940
from pydantic.fields import Field
@@ -96,11 +97,14 @@ def error_rate(self) -> float:
9697
VALID_STATUS_TRANSITIONS = {
9798
ExecutionStatus.QUEUED: [
9899
ExecutionStatus.INCOMPLETE,
100+
ExecutionStatus.TERMINATED, # For resuming halted execution
101+
ExecutionStatus.REVIEW, # For resuming after review
99102
],
100103
ExecutionStatus.RUNNING: [
101104
ExecutionStatus.INCOMPLETE,
102105
ExecutionStatus.QUEUED,
103106
ExecutionStatus.TERMINATED, # For resuming halted execution
107+
ExecutionStatus.REVIEW, # For resuming after review
104108
],
105109
ExecutionStatus.COMPLETED: [
106110
ExecutionStatus.RUNNING,
@@ -109,11 +113,16 @@ def error_rate(self) -> float:
109113
ExecutionStatus.INCOMPLETE,
110114
ExecutionStatus.QUEUED,
111115
ExecutionStatus.RUNNING,
116+
ExecutionStatus.REVIEW,
112117
],
113118
ExecutionStatus.TERMINATED: [
114119
ExecutionStatus.INCOMPLETE,
115120
ExecutionStatus.QUEUED,
116121
ExecutionStatus.RUNNING,
122+
ExecutionStatus.REVIEW,
123+
],
124+
ExecutionStatus.REVIEW: [
125+
ExecutionStatus.RUNNING,
117126
],
118127
}
119128

@@ -446,6 +455,7 @@ def to_node_execution_entry(
446455
user_id=self.user_id,
447456
graph_exec_id=self.graph_exec_id,
448457
graph_id=self.graph_id,
458+
graph_version=self.graph_version,
449459
node_exec_id=self.node_exec_id,
450460
node_id=self.node_id,
451461
block_id=self.block_id,
@@ -728,7 +738,7 @@ async def upsert_execution_input(
728738
input_name: str,
729739
input_data: JsonValue,
730740
node_exec_id: str | None = None,
731-
) -> tuple[str, BlockInput]:
741+
) -> tuple[NodeExecutionResult, BlockInput]:
732742
"""
733743
Insert AgentNodeExecutionInputOutput record for as one of AgentNodeExecution.Input.
734744
If there is no AgentNodeExecution that has no `input_name` as input, create new one.
@@ -761,7 +771,7 @@ async def upsert_execution_input(
761771
existing_execution = await AgentNodeExecution.prisma().find_first(
762772
where=existing_exec_query_filter,
763773
order={"addedTime": "asc"},
764-
include={"Input": True},
774+
include={"Input": True, "GraphExecution": True},
765775
)
766776
json_input_data = SafeJson(input_data)
767777

@@ -773,7 +783,7 @@ async def upsert_execution_input(
773783
referencedByInputExecId=existing_execution.id,
774784
)
775785
)
776-
return existing_execution.id, {
786+
return NodeExecutionResult.from_db(existing_execution), {
777787
**{
778788
input_data.name: type_utils.convert(input_data.data, JsonValue)
779789
for input_data in existing_execution.Input or []
@@ -788,9 +798,10 @@ async def upsert_execution_input(
788798
agentGraphExecutionId=graph_exec_id,
789799
executionStatus=ExecutionStatus.INCOMPLETE,
790800
Input={"create": {"name": input_name, "data": json_input_data}},
791-
)
801+
),
802+
include={"GraphExecution": True},
792803
)
793-
return result.id, {input_name: input_data}
804+
return NodeExecutionResult.from_db(result), {input_name: input_data}
794805

795806
else:
796807
raise ValueError(
@@ -886,9 +897,25 @@ async def update_node_execution_status_batch(
886897
node_exec_ids: list[str],
887898
status: ExecutionStatus,
888899
stats: dict[str, Any] | None = None,
889-
):
890-
await AgentNodeExecution.prisma().update_many(
891-
where={"id": {"in": node_exec_ids}},
900+
) -> int:
901+
# Validate status transitions - allowed_from should never be empty for valid statuses
902+
allowed_from = VALID_STATUS_TRANSITIONS.get(status, [])
903+
if not allowed_from:
904+
raise ValueError(
905+
f"Invalid status transition: {status} has no valid source statuses"
906+
)
907+
908+
# For batch updates, we filter to only update nodes with valid current statuses
909+
where_clause = cast(
910+
AgentNodeExecutionWhereInput,
911+
{
912+
"id": {"in": node_exec_ids},
913+
"executionStatus": {"in": [s.value for s in allowed_from]},
914+
},
915+
)
916+
917+
return await AgentNodeExecution.prisma().update_many(
918+
where=where_clause,
892919
data=_get_update_status_data(status, None, stats),
893920
)
894921

@@ -902,15 +929,32 @@ async def update_node_execution_status(
902929
if status == ExecutionStatus.QUEUED and execution_data is None:
903930
raise ValueError("Execution data must be provided when queuing an execution.")
904931

905-
res = await AgentNodeExecution.prisma().update(
906-
where={"id": node_exec_id},
932+
# Validate status transitions - allowed_from should never be empty for valid statuses
933+
allowed_from = VALID_STATUS_TRANSITIONS.get(status, [])
934+
if not allowed_from:
935+
raise ValueError(
936+
f"Invalid status transition: {status} has no valid source statuses"
937+
)
938+
939+
if res := await AgentNodeExecution.prisma().update(
940+
where=cast(
941+
AgentNodeExecutionWhereUniqueInput,
942+
{
943+
"id": node_exec_id,
944+
"executionStatus": {"in": [s.value for s in allowed_from]},
945+
},
946+
),
907947
data=_get_update_status_data(status, execution_data, stats),
908948
include=EXECUTION_RESULT_INCLUDE,
909-
)
910-
if not res:
911-
raise ValueError(f"Execution {node_exec_id} not found.")
949+
):
950+
return NodeExecutionResult.from_db(res)
951+
952+
if res := await AgentNodeExecution.prisma().find_unique(
953+
where={"id": node_exec_id}, include=EXECUTION_RESULT_INCLUDE
954+
):
955+
return NodeExecutionResult.from_db(res)
912956

913-
return NodeExecutionResult.from_db(res)
957+
raise ValueError(f"Execution {node_exec_id} not found.")
914958

915959

916960
def _get_update_status_data(
@@ -964,17 +1008,17 @@ async def get_node_execution(node_exec_id: str) -> NodeExecutionResult | None:
9641008
return NodeExecutionResult.from_db(execution)
9651009

9661010

967-
async def get_node_executions(
1011+
def _build_node_execution_where_clause(
9681012
graph_exec_id: str | None = None,
9691013
node_id: str | None = None,
9701014
block_ids: list[str] | None = None,
9711015
statuses: list[ExecutionStatus] | None = None,
972-
limit: int | None = None,
9731016
created_time_gte: datetime | None = None,
9741017
created_time_lte: datetime | None = None,
975-
include_exec_data: bool = True,
976-
) -> list[NodeExecutionResult]:
977-
"""⚠️ No `user_id` check: DO NOT USE without check in user-facing endpoints."""
1018+
) -> AgentNodeExecutionWhereInput:
1019+
"""
1020+
Build where clause for node execution queries.
1021+
"""
9781022
where_clause: AgentNodeExecutionWhereInput = {}
9791023
if graph_exec_id:
9801024
where_clause["agentGraphExecutionId"] = graph_exec_id
@@ -991,6 +1035,29 @@ async def get_node_executions(
9911035
"lte": created_time_lte or datetime.max.replace(tzinfo=timezone.utc),
9921036
}
9931037

1038+
return where_clause
1039+
1040+
1041+
async def get_node_executions(
1042+
graph_exec_id: str | None = None,
1043+
node_id: str | None = None,
1044+
block_ids: list[str] | None = None,
1045+
statuses: list[ExecutionStatus] | None = None,
1046+
limit: int | None = None,
1047+
created_time_gte: datetime | None = None,
1048+
created_time_lte: datetime | None = None,
1049+
include_exec_data: bool = True,
1050+
) -> list[NodeExecutionResult]:
1051+
"""⚠️ No `user_id` check: DO NOT USE without check in user-facing endpoints."""
1052+
where_clause = _build_node_execution_where_clause(
1053+
graph_exec_id=graph_exec_id,
1054+
node_id=node_id,
1055+
block_ids=block_ids,
1056+
statuses=statuses,
1057+
created_time_gte=created_time_gte,
1058+
created_time_lte=created_time_lte,
1059+
)
1060+
9941061
executions = await AgentNodeExecution.prisma().find_many(
9951062
where=where_clause,
9961063
include=(
@@ -1052,6 +1119,7 @@ class NodeExecutionEntry(BaseModel):
10521119
user_id: str
10531120
graph_exec_id: str
10541121
graph_id: str
1122+
graph_version: int
10551123
node_exec_id: str
10561124
node_id: str
10571125
block_id: str

0 commit comments

Comments
 (0)