diff --git a/autogpt_platform/backend/backend/blocks/human_in_the_loop.py b/autogpt_platform/backend/backend/blocks/human_in_the_loop.py
new file mode 100644
index 000000000000..1dd5dbac9deb
--- /dev/null
+++ b/autogpt_platform/backend/backend/blocks/human_in_the_loop.py
@@ -0,0 +1,160 @@
+import logging
+from typing import Any, Literal
+
+from prisma.enums import ReviewStatus
+
+from backend.data.block import (
+ Block,
+ BlockCategory,
+ BlockOutput,
+ BlockSchemaInput,
+ BlockSchemaOutput,
+)
+from backend.data.execution import ExecutionStatus
+from backend.data.human_review import ReviewResult
+from backend.data.model import SchemaField
+from backend.executor.manager import async_update_node_execution_status
+from backend.util.clients import get_database_manager_async_client
+
+logger = logging.getLogger(__name__)
+
+
+class HumanInTheLoopBlock(Block):
+ """
+ This block pauses execution and waits for human approval or modification of the data.
+
+ When executed, it creates a pending review entry and sets the node execution status
+ to REVIEW. The execution will remain paused until a human user either:
+ - Approves the data (with or without modifications)
+ - Rejects the data
+
+ This is useful for workflows that require human validation or intervention before
+ proceeding to the next steps.
+ """
+
+ class Input(BlockSchemaInput):
+ data: Any = SchemaField(description="The data to be reviewed by a human user")
+ name: str = SchemaField(
+ description="A descriptive name for what this data represents",
+ )
+ editable: bool = SchemaField(
+ description="Whether the human reviewer can edit the data",
+ default=True,
+ advanced=True,
+ )
+
+ class Output(BlockSchemaOutput):
+ reviewed_data: Any = SchemaField(
+ description="The data after human review (may be modified)"
+ )
+ status: Literal["approved", "rejected"] = SchemaField(
+ description="Status of the review: 'approved' or 'rejected'"
+ )
+ review_message: str = SchemaField(
+ description="Any message provided by the reviewer", default=""
+ )
+
+ def __init__(self):
+ super().__init__(
+ id="8b2a7b3c-6e9d-4a5f-8c1b-2e3f4a5b6c7d",
+ description="Pause execution and wait for human approval or modification of data",
+ categories={BlockCategory.BASIC},
+ input_schema=HumanInTheLoopBlock.Input,
+ output_schema=HumanInTheLoopBlock.Output,
+ test_input={
+ "data": {"name": "John Doe", "age": 30},
+ "name": "User profile data",
+ "editable": True,
+ },
+ test_output=[
+ ("reviewed_data", {"name": "John Doe", "age": 30}),
+ ("status", "approved"),
+ ("review_message", ""),
+ ],
+ test_mock={
+ "get_or_create_human_review": lambda *_args, **_kwargs: ReviewResult(
+ data={"name": "John Doe", "age": 30},
+ status=ReviewStatus.APPROVED,
+ message="",
+ processed=False,
+ node_exec_id="test-node-exec-id",
+ ),
+ "update_node_execution_status": lambda *_args, **_kwargs: None,
+ },
+ )
+
+ async def run(
+ self,
+ input_data: Input,
+ *,
+ user_id: str,
+ node_exec_id: str,
+ graph_exec_id: str,
+ graph_id: str,
+ graph_version: int,
+ **kwargs,
+ ) -> BlockOutput:
+ """
+ Execute the Human In The Loop block.
+
+ This method uses one function to handle the complete workflow - checking existing reviews
+ and creating pending ones as needed.
+ """
+ try:
+ logger.debug(f"HITL block executing for node {node_exec_id}")
+
+ # Use the data layer to handle the complete workflow
+ db_client = get_database_manager_async_client()
+ result = await db_client.get_or_create_human_review(
+ user_id=user_id,
+ node_exec_id=node_exec_id,
+ graph_exec_id=graph_exec_id,
+ graph_id=graph_id,
+ graph_version=graph_version,
+ input_data=input_data.data,
+ message=input_data.name,
+ editable=input_data.editable,
+ )
+ except Exception as e:
+ logger.error(f"Error in HITL block for node {node_exec_id}: {str(e)}")
+ raise
+
+ # Check if we're waiting for human input
+ if result is None:
+ logger.info(
+ f"HITL block pausing execution for node {node_exec_id} - awaiting human review"
+ )
+ try:
+ # Set node status to REVIEW so execution manager can't mark it as COMPLETED
+ # The VALID_STATUS_TRANSITIONS will then prevent any unwanted status changes
+ # Use the proper wrapper function to ensure websocket events are published
+ await async_update_node_execution_status(
+ db_client=db_client,
+ exec_id=node_exec_id,
+ status=ExecutionStatus.REVIEW,
+ )
+ # Execution pauses here until API routes process the review
+ return
+ except Exception as e:
+ logger.error(
+ f"Failed to update node status for HITL block {node_exec_id}: {str(e)}"
+ )
+ raise
+
+ # Review is complete (approved or rejected) - check if unprocessed
+ if not result.processed:
+ # Mark as processed before yielding
+ await db_client.update_review_processed_status(
+ node_exec_id=node_exec_id, processed=True
+ )
+
+ if result.status == ReviewStatus.APPROVED:
+ yield "status", "approved"
+ yield "reviewed_data", result.data
+ if result.message:
+ yield "review_message", result.message
+
+ elif result.status == ReviewStatus.REJECTED:
+ yield "status", "rejected"
+ if result.message:
+ yield "review_message", result.message
diff --git a/autogpt_platform/backend/backend/data/credit_test.py b/autogpt_platform/backend/backend/data/credit_test.py
index 8e9487f74a4a..6f604975cfd4 100644
--- a/autogpt_platform/backend/backend/data/credit_test.py
+++ b/autogpt_platform/backend/backend/data/credit_test.py
@@ -73,6 +73,7 @@ async def test_block_credit_usage(server: SpinTestServer):
NodeExecutionEntry(
user_id=DEFAULT_USER_ID,
graph_id="test_graph",
+ graph_version=1,
node_id="test_node",
graph_exec_id="test_graph_exec",
node_exec_id="test_node_exec",
@@ -94,6 +95,7 @@ async def test_block_credit_usage(server: SpinTestServer):
NodeExecutionEntry(
user_id=DEFAULT_USER_ID,
graph_id="test_graph",
+ graph_version=1,
node_id="test_node",
graph_exec_id="test_graph_exec",
node_exec_id="test_node_exec",
diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py
index a8253f313636..b78633cf5895 100644
--- a/autogpt_platform/backend/backend/data/execution.py
+++ b/autogpt_platform/backend/backend/data/execution.py
@@ -34,6 +34,7 @@
AgentNodeExecutionKeyValueDataCreateInput,
AgentNodeExecutionUpdateInput,
AgentNodeExecutionWhereInput,
+ AgentNodeExecutionWhereUniqueInput,
)
from pydantic import BaseModel, ConfigDict, JsonValue, ValidationError
from pydantic.fields import Field
@@ -96,11 +97,14 @@ def error_rate(self) -> float:
VALID_STATUS_TRANSITIONS = {
ExecutionStatus.QUEUED: [
ExecutionStatus.INCOMPLETE,
+ ExecutionStatus.TERMINATED, # For resuming halted execution
+ ExecutionStatus.REVIEW, # For resuming after review
],
ExecutionStatus.RUNNING: [
ExecutionStatus.INCOMPLETE,
ExecutionStatus.QUEUED,
ExecutionStatus.TERMINATED, # For resuming halted execution
+ ExecutionStatus.REVIEW, # For resuming after review
],
ExecutionStatus.COMPLETED: [
ExecutionStatus.RUNNING,
@@ -109,11 +113,16 @@ def error_rate(self) -> float:
ExecutionStatus.INCOMPLETE,
ExecutionStatus.QUEUED,
ExecutionStatus.RUNNING,
+ ExecutionStatus.REVIEW,
],
ExecutionStatus.TERMINATED: [
ExecutionStatus.INCOMPLETE,
ExecutionStatus.QUEUED,
ExecutionStatus.RUNNING,
+ ExecutionStatus.REVIEW,
+ ],
+ ExecutionStatus.REVIEW: [
+ ExecutionStatus.RUNNING,
],
}
@@ -446,6 +455,7 @@ def to_node_execution_entry(
user_id=self.user_id,
graph_exec_id=self.graph_exec_id,
graph_id=self.graph_id,
+ graph_version=self.graph_version,
node_exec_id=self.node_exec_id,
node_id=self.node_id,
block_id=self.block_id,
@@ -728,7 +738,7 @@ async def upsert_execution_input(
input_name: str,
input_data: JsonValue,
node_exec_id: str | None = None,
-) -> tuple[str, BlockInput]:
+) -> tuple[NodeExecutionResult, BlockInput]:
"""
Insert AgentNodeExecutionInputOutput record for as one of AgentNodeExecution.Input.
If there is no AgentNodeExecution that has no `input_name` as input, create new one.
@@ -761,7 +771,7 @@ async def upsert_execution_input(
existing_execution = await AgentNodeExecution.prisma().find_first(
where=existing_exec_query_filter,
order={"addedTime": "asc"},
- include={"Input": True},
+ include={"Input": True, "GraphExecution": True},
)
json_input_data = SafeJson(input_data)
@@ -773,7 +783,7 @@ async def upsert_execution_input(
referencedByInputExecId=existing_execution.id,
)
)
- return existing_execution.id, {
+ return NodeExecutionResult.from_db(existing_execution), {
**{
input_data.name: type_utils.convert(input_data.data, JsonValue)
for input_data in existing_execution.Input or []
@@ -788,9 +798,10 @@ async def upsert_execution_input(
agentGraphExecutionId=graph_exec_id,
executionStatus=ExecutionStatus.INCOMPLETE,
Input={"create": {"name": input_name, "data": json_input_data}},
- )
+ ),
+ include={"GraphExecution": True},
)
- return result.id, {input_name: input_data}
+ return NodeExecutionResult.from_db(result), {input_name: input_data}
else:
raise ValueError(
@@ -886,9 +897,25 @@ async def update_node_execution_status_batch(
node_exec_ids: list[str],
status: ExecutionStatus,
stats: dict[str, Any] | None = None,
-):
- await AgentNodeExecution.prisma().update_many(
- where={"id": {"in": node_exec_ids}},
+) -> int:
+ # Validate status transitions - allowed_from should never be empty for valid statuses
+ allowed_from = VALID_STATUS_TRANSITIONS.get(status, [])
+ if not allowed_from:
+ raise ValueError(
+ f"Invalid status transition: {status} has no valid source statuses"
+ )
+
+ # For batch updates, we filter to only update nodes with valid current statuses
+ where_clause = cast(
+ AgentNodeExecutionWhereInput,
+ {
+ "id": {"in": node_exec_ids},
+ "executionStatus": {"in": [s.value for s in allowed_from]},
+ },
+ )
+
+ return await AgentNodeExecution.prisma().update_many(
+ where=where_clause,
data=_get_update_status_data(status, None, stats),
)
@@ -902,15 +929,32 @@ async def update_node_execution_status(
if status == ExecutionStatus.QUEUED and execution_data is None:
raise ValueError("Execution data must be provided when queuing an execution.")
- res = await AgentNodeExecution.prisma().update(
- where={"id": node_exec_id},
+ # Validate status transitions - allowed_from should never be empty for valid statuses
+ allowed_from = VALID_STATUS_TRANSITIONS.get(status, [])
+ if not allowed_from:
+ raise ValueError(
+ f"Invalid status transition: {status} has no valid source statuses"
+ )
+
+ if res := await AgentNodeExecution.prisma().update(
+ where=cast(
+ AgentNodeExecutionWhereUniqueInput,
+ {
+ "id": node_exec_id,
+ "executionStatus": {"in": [s.value for s in allowed_from]},
+ },
+ ),
data=_get_update_status_data(status, execution_data, stats),
include=EXECUTION_RESULT_INCLUDE,
- )
- if not res:
- raise ValueError(f"Execution {node_exec_id} not found.")
+ ):
+ return NodeExecutionResult.from_db(res)
+
+ if res := await AgentNodeExecution.prisma().find_unique(
+ where={"id": node_exec_id}, include=EXECUTION_RESULT_INCLUDE
+ ):
+ return NodeExecutionResult.from_db(res)
- return NodeExecutionResult.from_db(res)
+ raise ValueError(f"Execution {node_exec_id} not found.")
def _get_update_status_data(
@@ -964,17 +1008,17 @@ async def get_node_execution(node_exec_id: str) -> NodeExecutionResult | None:
return NodeExecutionResult.from_db(execution)
-async def get_node_executions(
+def _build_node_execution_where_clause(
graph_exec_id: str | None = None,
node_id: str | None = None,
block_ids: list[str] | None = None,
statuses: list[ExecutionStatus] | None = None,
- limit: int | None = None,
created_time_gte: datetime | None = None,
created_time_lte: datetime | None = None,
- include_exec_data: bool = True,
-) -> list[NodeExecutionResult]:
- """⚠️ No `user_id` check: DO NOT USE without check in user-facing endpoints."""
+) -> AgentNodeExecutionWhereInput:
+ """
+ Build where clause for node execution queries.
+ """
where_clause: AgentNodeExecutionWhereInput = {}
if graph_exec_id:
where_clause["agentGraphExecutionId"] = graph_exec_id
@@ -991,6 +1035,29 @@ async def get_node_executions(
"lte": created_time_lte or datetime.max.replace(tzinfo=timezone.utc),
}
+ return where_clause
+
+
+async def get_node_executions(
+ graph_exec_id: str | None = None,
+ node_id: str | None = None,
+ block_ids: list[str] | None = None,
+ statuses: list[ExecutionStatus] | None = None,
+ limit: int | None = None,
+ created_time_gte: datetime | None = None,
+ created_time_lte: datetime | None = None,
+ include_exec_data: bool = True,
+) -> list[NodeExecutionResult]:
+ """⚠️ No `user_id` check: DO NOT USE without check in user-facing endpoints."""
+ where_clause = _build_node_execution_where_clause(
+ graph_exec_id=graph_exec_id,
+ node_id=node_id,
+ block_ids=block_ids,
+ statuses=statuses,
+ created_time_gte=created_time_gte,
+ created_time_lte=created_time_lte,
+ )
+
executions = await AgentNodeExecution.prisma().find_many(
where=where_clause,
include=(
@@ -1052,6 +1119,7 @@ class NodeExecutionEntry(BaseModel):
user_id: str
graph_exec_id: str
graph_id: str
+ graph_version: int
node_exec_id: str
node_id: str
block_id: str
diff --git a/autogpt_platform/backend/backend/data/human_review.py b/autogpt_platform/backend/backend/data/human_review.py
new file mode 100644
index 000000000000..2b0b2dbfb725
--- /dev/null
+++ b/autogpt_platform/backend/backend/data/human_review.py
@@ -0,0 +1,294 @@
+"""
+Data layer for Human In The Loop (HITL) review operations.
+Handles all database operations for pending human reviews.
+"""
+
+import asyncio
+import logging
+from datetime import datetime, timezone
+from typing import Optional
+
+from prisma.enums import ReviewStatus
+from prisma.models import PendingHumanReview
+from prisma.types import PendingHumanReviewUpdateInput
+from pydantic import BaseModel
+
+from backend.server.v2.executions.review.model import (
+ PendingHumanReviewModel,
+ SafeJsonData,
+)
+from backend.util.json import SafeJson
+
+logger = logging.getLogger(__name__)
+
+
+class ReviewResult(BaseModel):
+ """Result of a review operation."""
+
+ data: Optional[SafeJsonData] = None
+ status: ReviewStatus
+ message: str = ""
+ processed: bool
+ node_exec_id: str
+
+
+async def get_pending_review_by_node_exec_id(
+ node_exec_id: str, user_id: str
+) -> Optional["PendingHumanReviewModel"]:
+ """
+ Get a pending review by node execution ID with user ownership validation.
+
+ Args:
+ node_exec_id: The node execution ID to check
+ user_id: The user ID to validate ownership
+
+ Returns:
+ The existing review if found and owned by the user, None otherwise
+ """
+ review = await PendingHumanReview.prisma().find_first(
+ where={
+ "nodeExecId": node_exec_id,
+ "userId": user_id,
+ }
+ )
+
+ if review:
+ return PendingHumanReviewModel.from_db(review)
+
+ return None
+
+
+async def get_or_create_human_review(
+ user_id: str,
+ node_exec_id: str,
+ graph_exec_id: str,
+ graph_id: str,
+ graph_version: int,
+ input_data: SafeJsonData,
+ message: str,
+ editable: bool,
+) -> Optional[ReviewResult]:
+ """
+ Get existing review or create a new pending review entry.
+
+ Uses upsert with empty update to get existing or create new review in a single operation.
+
+ Args:
+ user_id: ID of the user who owns this review
+ node_exec_id: ID of the node execution
+ graph_exec_id: ID of the graph execution
+ graph_id: ID of the graph template
+ graph_version: Version of the graph template
+ input_data: The data to be reviewed
+ message: Instructions for the reviewer
+ editable: Whether the data can be edited
+
+ Returns:
+ ReviewResult if the review is complete, None if waiting for human input
+ """
+ try:
+ logger.debug(f"Getting or creating review for node {node_exec_id}")
+
+ # Upsert - get existing or create new review
+ review = await PendingHumanReview.prisma().upsert(
+ where={"nodeExecId": node_exec_id},
+ data={
+ "create": {
+ "userId": user_id,
+ "nodeExecId": node_exec_id,
+ "graphExecId": graph_exec_id,
+ "graphId": graph_id,
+ "graphVersion": graph_version,
+ "payload": SafeJson(input_data),
+ "instructions": message,
+ "editable": editable,
+ "status": ReviewStatus.WAITING,
+ },
+ "update": {}, # Do nothing on update - keep existing review as is
+ },
+ )
+
+ logger.info(
+ f"Review {'created' if review.createdAt == review.updatedAt else 'retrieved'} for node {node_exec_id} with status {review.status}"
+ )
+ except Exception as e:
+ logger.error(
+ f"Database error in get_or_create_human_review for node {node_exec_id}: {str(e)}"
+ )
+ raise
+
+ # Early return if already processed
+ if review.processed:
+ return None
+
+ if review.status == ReviewStatus.APPROVED:
+ # Return the approved review result
+ return ReviewResult(
+ data=review.payload,
+ status=ReviewStatus.APPROVED,
+ message=review.reviewMessage or "",
+ processed=review.processed,
+ node_exec_id=review.nodeExecId,
+ )
+ elif review.status == ReviewStatus.REJECTED:
+ # Return the rejected review result
+ return ReviewResult(
+ data=None,
+ status=ReviewStatus.REJECTED,
+ message=review.reviewMessage or "",
+ processed=review.processed,
+ node_exec_id=review.nodeExecId,
+ )
+ else:
+ # Review is pending - return None to continue waiting
+ return None
+
+
+async def has_pending_reviews_for_graph_exec(graph_exec_id: str) -> bool:
+ """
+ Check if a graph execution has any pending reviews.
+
+ Args:
+ graph_exec_id: The graph execution ID to check
+
+ Returns:
+ True if there are reviews waiting for human input, False otherwise
+ """
+ # Check if there are any reviews waiting for human input
+ count = await PendingHumanReview.prisma().count(
+ where={"graphExecId": graph_exec_id, "status": ReviewStatus.WAITING}
+ )
+ return count > 0
+
+
+async def get_pending_reviews_for_user(
+ user_id: str, page: int = 1, page_size: int = 25
+) -> list["PendingHumanReviewModel"]:
+ """
+ Get all pending reviews for a user with pagination.
+
+ Args:
+ user_id: User ID to get reviews for
+ page: Page number (1-indexed)
+ page_size: Number of reviews per page
+
+ Returns:
+ List of pending review models
+ """
+ # Calculate offset for pagination
+ offset = (page - 1) * page_size
+
+ reviews = await PendingHumanReview.prisma().find_many(
+ where={"userId": user_id, "status": ReviewStatus.WAITING},
+ order={"createdAt": "desc"},
+ skip=offset,
+ take=page_size,
+ )
+
+ return [PendingHumanReviewModel.from_db(review) for review in reviews]
+
+
+async def get_pending_reviews_for_execution(
+ graph_exec_id: str, user_id: str
+) -> list["PendingHumanReviewModel"]:
+ """
+ Get all pending reviews for a specific graph execution.
+
+ Args:
+ graph_exec_id: Graph execution ID
+ user_id: User ID for security validation
+
+ Returns:
+ List of pending review models
+ """
+ reviews = await PendingHumanReview.prisma().find_many(
+ where={
+ "userId": user_id,
+ "graphExecId": graph_exec_id,
+ "status": ReviewStatus.WAITING,
+ },
+ order={"createdAt": "asc"},
+ )
+
+ return [PendingHumanReviewModel.from_db(review) for review in reviews]
+
+
+async def process_all_reviews_for_execution(
+ user_id: str,
+ review_decisions: dict[str, tuple[ReviewStatus, SafeJsonData | None, str | None]],
+) -> dict[str, PendingHumanReviewModel]:
+ """Process all pending reviews for an execution with approve/reject decisions.
+
+ Args:
+ user_id: User ID for ownership validation
+ review_decisions: Map of node_exec_id -> (status, reviewed_data, message)
+
+ Returns:
+ Dict of node_exec_id -> updated review model
+ """
+ if not review_decisions:
+ return {}
+
+ node_exec_ids = list(review_decisions.keys())
+
+ # Get all reviews for validation
+ reviews = await PendingHumanReview.prisma().find_many(
+ where={
+ "nodeExecId": {"in": node_exec_ids},
+ "userId": user_id,
+ "status": ReviewStatus.WAITING,
+ },
+ )
+
+ # Validate all reviews can be processed
+ if len(reviews) != len(node_exec_ids):
+ missing_ids = set(node_exec_ids) - {review.nodeExecId for review in reviews}
+ raise ValueError(
+ f"Reviews not found, access denied, or not in WAITING status: {', '.join(missing_ids)}"
+ )
+
+ # Create parallel update tasks
+ update_tasks = []
+
+ for review in reviews:
+ new_status, reviewed_data, message = review_decisions[review.nodeExecId]
+ has_data_changes = reviewed_data is not None and reviewed_data != review.payload
+
+ # Check edit permissions for actual data modifications
+ if has_data_changes and not review.editable:
+ raise ValueError(f"Review {review.nodeExecId} is not editable")
+
+ update_data: PendingHumanReviewUpdateInput = {
+ "status": new_status,
+ "reviewMessage": message,
+ "wasEdited": has_data_changes,
+ "reviewedAt": datetime.now(timezone.utc),
+ }
+
+ if has_data_changes:
+ update_data["payload"] = SafeJson(reviewed_data)
+
+ task = PendingHumanReview.prisma().update(
+ where={"nodeExecId": review.nodeExecId},
+ data=update_data,
+ )
+ update_tasks.append(task)
+
+ # Execute all updates in parallel and get updated reviews
+ updated_reviews = await asyncio.gather(*update_tasks)
+
+ # Note: Execution resumption is now handled at the API layer after ALL reviews
+ # for an execution are processed (both approved and rejected)
+
+ # Return as dict for easy access
+ return {
+ review.nodeExecId: PendingHumanReviewModel.from_db(review)
+ for review in updated_reviews
+ }
+
+
+async def update_review_processed_status(node_exec_id: str, processed: bool) -> None:
+ """Update the processed status of a review."""
+ await PendingHumanReview.prisma().update(
+ where={"nodeExecId": node_exec_id}, data={"processed": processed}
+ )
diff --git a/autogpt_platform/backend/backend/data/human_review_test.py b/autogpt_platform/backend/backend/data/human_review_test.py
new file mode 100644
index 000000000000..fe6c9057c14a
--- /dev/null
+++ b/autogpt_platform/backend/backend/data/human_review_test.py
@@ -0,0 +1,376 @@
+import datetime
+from unittest.mock import AsyncMock, Mock
+
+import pytest
+import pytest_mock
+from prisma.enums import ReviewStatus
+
+from backend.data.human_review import (
+ get_or_create_human_review,
+ get_pending_review_by_node_exec_id,
+ get_pending_reviews_for_execution,
+ get_pending_reviews_for_user,
+ has_pending_reviews_for_graph_exec,
+ process_all_reviews_for_execution,
+)
+
+
+@pytest.fixture
+def sample_db_review():
+ """Create a sample database review object"""
+ mock_review = Mock()
+ mock_review.nodeExecId = "test_node_123"
+ mock_review.userId = "test_user"
+ mock_review.graphExecId = "test_graph_exec_456"
+ mock_review.graphId = "test_graph_789"
+ mock_review.graphVersion = 1
+ mock_review.payload = {"data": "test payload"}
+ mock_review.instructions = "Please review"
+ mock_review.editable = True
+ mock_review.status = ReviewStatus.WAITING
+ mock_review.reviewMessage = None
+ mock_review.wasEdited = False
+ mock_review.processed = False
+ mock_review.createdAt = datetime.datetime.now(datetime.timezone.utc)
+ mock_review.updatedAt = None
+ mock_review.reviewedAt = None
+ return mock_review
+
+
+@pytest.mark.asyncio
+async def test_get_pending_review_by_node_exec_id_found(
+ mocker: pytest_mock.MockFixture,
+ sample_db_review,
+):
+ """Test finding an existing pending review"""
+ mock_find_first = mocker.patch(
+ "backend.data.human_review.PendingHumanReview.prisma"
+ )
+ mock_find_first.return_value.find_first = AsyncMock(return_value=sample_db_review)
+
+ result = await get_pending_review_by_node_exec_id("test_node_123", "test_user")
+
+ assert result is not None
+ assert result.node_exec_id == "test_node_123"
+ assert result.user_id == "test_user"
+ assert result.status == ReviewStatus.WAITING
+
+
+@pytest.mark.asyncio
+async def test_get_pending_review_by_node_exec_id_not_found(
+ mocker: pytest_mock.MockFixture,
+):
+ """Test when review is not found"""
+ mock_find_first = mocker.patch(
+ "backend.data.human_review.PendingHumanReview.prisma"
+ )
+ mock_find_first.return_value.find_first = AsyncMock(return_value=None)
+
+ result = await get_pending_review_by_node_exec_id("nonexistent", "test_user")
+
+ assert result is None
+
+
+@pytest.mark.asyncio
+async def test_get_or_create_human_review_new(
+ mocker: pytest_mock.MockFixture,
+ sample_db_review,
+):
+ """Test creating a new human review"""
+ # Mock the upsert to return a new review (created_at == updated_at)
+ sample_db_review.status = ReviewStatus.WAITING
+ sample_db_review.processed = False
+
+ mock_upsert = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
+ mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review)
+
+ result = await get_or_create_human_review(
+ user_id="test_user",
+ node_exec_id="test_node_123",
+ graph_exec_id="test_graph_exec_456",
+ graph_id="test_graph_789",
+ graph_version=1,
+ input_data={"data": "test payload"},
+ message="Please review",
+ editable=True,
+ )
+
+ # Should return None for pending reviews (waiting for human input)
+ assert result is None
+
+
+@pytest.mark.asyncio
+async def test_get_or_create_human_review_approved(
+ mocker: pytest_mock.MockFixture,
+ sample_db_review,
+):
+ """Test retrieving an already approved review"""
+ # Set up review as already approved
+ sample_db_review.status = ReviewStatus.APPROVED
+ sample_db_review.processed = False
+ sample_db_review.reviewMessage = "Looks good"
+
+ mock_upsert = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
+ mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review)
+
+ result = await get_or_create_human_review(
+ user_id="test_user",
+ node_exec_id="test_node_123",
+ graph_exec_id="test_graph_exec_456",
+ graph_id="test_graph_789",
+ graph_version=1,
+ input_data={"data": "test payload"},
+ message="Please review",
+ editable=True,
+ )
+
+ # Should return the approved result
+ assert result is not None
+ assert result.status == ReviewStatus.APPROVED
+ assert result.data == {"data": "test payload"}
+ assert result.message == "Looks good"
+
+
+@pytest.mark.asyncio
+async def test_has_pending_reviews_for_graph_exec_true(
+ mocker: pytest_mock.MockFixture,
+):
+ """Test when there are pending reviews"""
+ mock_count = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
+ mock_count.return_value.count = AsyncMock(return_value=2)
+
+ result = await has_pending_reviews_for_graph_exec("test_graph_exec")
+
+ assert result is True
+
+
+@pytest.mark.asyncio
+async def test_has_pending_reviews_for_graph_exec_false(
+ mocker: pytest_mock.MockFixture,
+):
+ """Test when there are no pending reviews"""
+ mock_count = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
+ mock_count.return_value.count = AsyncMock(return_value=0)
+
+ result = await has_pending_reviews_for_graph_exec("test_graph_exec")
+
+ assert result is False
+
+
+@pytest.mark.asyncio
+async def test_get_pending_reviews_for_user(
+ mocker: pytest_mock.MockFixture,
+ sample_db_review,
+):
+ """Test getting pending reviews for a user with pagination"""
+ mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
+ mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
+
+ result = await get_pending_reviews_for_user("test_user", page=2, page_size=10)
+
+ assert len(result) == 1
+ assert result[0].node_exec_id == "test_node_123"
+
+ # Verify pagination parameters
+ call_args = mock_find_many.return_value.find_many.call_args
+ assert call_args.kwargs["skip"] == 10 # (page-1) * page_size = (2-1) * 10
+ assert call_args.kwargs["take"] == 10
+
+
+@pytest.mark.asyncio
+async def test_get_pending_reviews_for_execution(
+ mocker: pytest_mock.MockFixture,
+ sample_db_review,
+):
+ """Test getting pending reviews for specific execution"""
+ mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
+ mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
+
+ result = await get_pending_reviews_for_execution("test_graph_exec_456", "test_user")
+
+ assert len(result) == 1
+ assert result[0].graph_exec_id == "test_graph_exec_456"
+
+ # Verify it filters by execution and user
+ call_args = mock_find_many.return_value.find_many.call_args
+ where_clause = call_args.kwargs["where"]
+ assert where_clause["userId"] == "test_user"
+ assert where_clause["graphExecId"] == "test_graph_exec_456"
+ assert where_clause["status"] == ReviewStatus.WAITING
+
+
+@pytest.mark.asyncio
+async def test_process_all_reviews_for_execution_success(
+ mocker: pytest_mock.MockFixture,
+ sample_db_review,
+):
+ """Test successful processing of reviews for an execution"""
+ # Mock finding reviews
+ mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
+ mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
+
+ # Mock updating reviews
+ updated_review = Mock()
+ updated_review.nodeExecId = "test_node_123"
+ updated_review.userId = "test_user"
+ updated_review.graphExecId = "test_graph_exec_456"
+ updated_review.graphId = "test_graph_789"
+ updated_review.graphVersion = 1
+ updated_review.payload = {"data": "modified"}
+ updated_review.instructions = "Please review"
+ updated_review.editable = True
+ updated_review.status = ReviewStatus.APPROVED
+ updated_review.reviewMessage = "Approved"
+ updated_review.wasEdited = True
+ updated_review.processed = False
+ updated_review.createdAt = datetime.datetime.now(datetime.timezone.utc)
+ updated_review.updatedAt = datetime.datetime.now(datetime.timezone.utc)
+ updated_review.reviewedAt = datetime.datetime.now(datetime.timezone.utc)
+ mock_update = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
+ mock_update.return_value.update = AsyncMock(return_value=updated_review)
+
+ # Mock gather to simulate parallel updates
+ mocker.patch(
+ "backend.data.human_review.asyncio.gather",
+ new=AsyncMock(return_value=[updated_review]),
+ )
+
+ result = await process_all_reviews_for_execution(
+ user_id="test_user",
+ review_decisions={
+ "test_node_123": (ReviewStatus.APPROVED, {"data": "modified"}, "Approved")
+ },
+ )
+
+ assert len(result) == 1
+ assert "test_node_123" in result
+ assert result["test_node_123"].status == ReviewStatus.APPROVED
+
+
+@pytest.mark.asyncio
+async def test_process_all_reviews_for_execution_validation_errors(
+ mocker: pytest_mock.MockFixture,
+):
+ """Test validation errors in process_all_reviews_for_execution"""
+ # Mock finding fewer reviews than requested (some not found)
+ mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
+ mock_find_many.return_value.find_many = AsyncMock(
+ return_value=[]
+ ) # No reviews found
+
+ with pytest.raises(ValueError, match="Reviews not found"):
+ await process_all_reviews_for_execution(
+ user_id="test_user",
+ review_decisions={
+ "nonexistent_node": (ReviewStatus.APPROVED, {"data": "test"}, "message")
+ },
+ )
+
+
+@pytest.mark.asyncio
+async def test_process_all_reviews_edit_permission_error(
+ mocker: pytest_mock.MockFixture,
+ sample_db_review,
+):
+ """Test editing non-editable review"""
+ # Set review as non-editable
+ sample_db_review.editable = False
+
+ # Mock finding reviews
+ mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
+ mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
+
+ with pytest.raises(ValueError, match="not editable"):
+ await process_all_reviews_for_execution(
+ user_id="test_user",
+ review_decisions={
+ "test_node_123": (
+ ReviewStatus.APPROVED,
+ {"data": "modified"},
+ "message",
+ )
+ },
+ )
+
+
+@pytest.mark.asyncio
+async def test_process_all_reviews_mixed_approval_rejection(
+ mocker: pytest_mock.MockFixture,
+ sample_db_review,
+):
+ """Test processing mixed approval and rejection decisions"""
+ # Create second review for rejection
+ second_review = Mock()
+ second_review.nodeExecId = "test_node_456"
+ second_review.userId = "test_user"
+ second_review.graphExecId = "test_graph_exec_456"
+ second_review.graphId = "test_graph_789"
+ second_review.graphVersion = 1
+ second_review.payload = {"data": "original"}
+ second_review.instructions = "Second review"
+ second_review.editable = True
+ second_review.status = ReviewStatus.WAITING
+ second_review.reviewMessage = None
+ second_review.wasEdited = False
+ second_review.processed = False
+ second_review.createdAt = datetime.datetime.now(datetime.timezone.utc)
+ second_review.updatedAt = None
+ second_review.reviewedAt = None
+
+ # Mock finding reviews
+ mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
+ mock_find_many.return_value.find_many = AsyncMock(
+ return_value=[sample_db_review, second_review]
+ )
+
+ # Mock updating reviews
+ approved_review = Mock()
+ approved_review.nodeExecId = "test_node_123"
+ approved_review.userId = "test_user"
+ approved_review.graphExecId = "test_graph_exec_456"
+ approved_review.graphId = "test_graph_789"
+ approved_review.graphVersion = 1
+ approved_review.payload = {"data": "modified"}
+ approved_review.instructions = "Please review"
+ approved_review.editable = True
+ approved_review.status = ReviewStatus.APPROVED
+ approved_review.reviewMessage = "Approved"
+ approved_review.wasEdited = True
+ approved_review.processed = False
+ approved_review.createdAt = datetime.datetime.now(datetime.timezone.utc)
+ approved_review.updatedAt = datetime.datetime.now(datetime.timezone.utc)
+ approved_review.reviewedAt = datetime.datetime.now(datetime.timezone.utc)
+
+ rejected_review = Mock()
+ rejected_review.nodeExecId = "test_node_456"
+ rejected_review.userId = "test_user"
+ rejected_review.graphExecId = "test_graph_exec_456"
+ rejected_review.graphId = "test_graph_789"
+ rejected_review.graphVersion = 1
+ rejected_review.payload = {"data": "original"}
+ rejected_review.instructions = "Please review"
+ rejected_review.editable = True
+ rejected_review.status = ReviewStatus.REJECTED
+ rejected_review.reviewMessage = "Rejected"
+ rejected_review.wasEdited = False
+ rejected_review.processed = False
+ rejected_review.createdAt = datetime.datetime.now(datetime.timezone.utc)
+ rejected_review.updatedAt = datetime.datetime.now(datetime.timezone.utc)
+ rejected_review.reviewedAt = datetime.datetime.now(datetime.timezone.utc)
+
+ mocker.patch(
+ "backend.data.human_review.asyncio.gather",
+ new=AsyncMock(return_value=[approved_review, rejected_review]),
+ )
+
+ result = await process_all_reviews_for_execution(
+ user_id="test_user",
+ review_decisions={
+ "test_node_123": (ReviewStatus.APPROVED, {"data": "modified"}, "Approved"),
+ "test_node_456": (ReviewStatus.REJECTED, None, "Rejected"),
+ },
+ )
+
+ assert len(result) == 2
+ assert "test_node_123" in result
+ assert "test_node_456" in result
diff --git a/autogpt_platform/backend/backend/executor/database.py b/autogpt_platform/backend/backend/executor/database.py
index df581e0de408..27e8b0104398 100644
--- a/autogpt_platform/backend/backend/executor/database.py
+++ b/autogpt_platform/backend/backend/executor/database.py
@@ -31,6 +31,11 @@
get_node,
validate_graph_execution_permissions,
)
+from backend.data.human_review import (
+ get_or_create_human_review,
+ has_pending_reviews_for_graph_exec,
+ update_review_processed_status,
+)
from backend.data.notifications import (
clear_all_user_notification_batches,
create_or_add_to_user_notification_batch,
@@ -161,6 +166,11 @@ def _(
get_user_email_verification = _(get_user_email_verification)
get_user_notification_preference = _(get_user_notification_preference)
+ # Human In The Loop
+ get_or_create_human_review = _(get_or_create_human_review)
+ has_pending_reviews_for_graph_exec = _(has_pending_reviews_for_graph_exec)
+ update_review_processed_status = _(update_review_processed_status)
+
# Notifications - async
clear_all_user_notification_batches = _(clear_all_user_notification_batches)
create_or_add_to_user_notification_batch = _(
@@ -215,6 +225,9 @@ def get_service_type(cls):
# Block error monitoring
get_block_error_stats = _(d.get_block_error_stats)
+ # Human In The Loop
+ has_pending_reviews_for_graph_exec = _(d.has_pending_reviews_for_graph_exec)
+
# User Emails
get_user_email_by_id = _(d.get_user_email_by_id)
@@ -256,6 +269,10 @@ def get_service_type(cls):
get_execution_kv_data = d.get_execution_kv_data
set_execution_kv_data = d.set_execution_kv_data
+ # Human In The Loop
+ get_or_create_human_review = d.get_or_create_human_review
+ update_review_processed_status = d.update_review_processed_status
+
# User Comms
get_active_user_ids_in_timerange = d.get_active_user_ids_in_timerange
get_user_email_by_id = d.get_user_email_by_id
diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py
index f04102a9505d..06ad06e6dcab 100644
--- a/autogpt_platform/backend/backend/executor/manager.py
+++ b/autogpt_platform/backend/backend/executor/manager.py
@@ -164,6 +164,7 @@ async def execute_node(
user_id = data.user_id
graph_exec_id = data.graph_exec_id
graph_id = data.graph_id
+ graph_version = data.graph_version
node_exec_id = data.node_exec_id
node_id = data.node_id
node_block = node.block
@@ -204,6 +205,7 @@ async def execute_node(
# Inject extra execution arguments for the blocks via kwargs
extra_exec_kwargs: dict = {
"graph_id": graph_id,
+ "graph_version": graph_version,
"node_id": node_id,
"graph_exec_id": graph_exec_id,
"node_exec_id": node_exec_id,
@@ -284,6 +286,7 @@ async def _enqueue_next_nodes(
user_id: str,
graph_exec_id: str,
graph_id: str,
+ graph_version: int,
log_metadata: LogMetadata,
nodes_input_masks: Optional[NodesInputMasks],
user_context: UserContext,
@@ -301,6 +304,7 @@ async def add_enqueued_execution(
user_id=user_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
+ graph_version=graph_version,
node_exec_id=node_exec_id,
node_id=node_id,
block_id=block_id,
@@ -334,17 +338,14 @@ async def _register_next_executions(node_link: Link) -> list[NodeExecutionEntry]
# Or the same input to be consumed multiple times.
async with synchronized(f"upsert_input-{next_node_id}-{graph_exec_id}"):
# Add output data to the earliest incomplete execution, or create a new one.
- next_node_exec_id, next_node_input = await db_client.upsert_execution_input(
+ next_node_exec, next_node_input = await db_client.upsert_execution_input(
node_id=next_node_id,
graph_exec_id=graph_exec_id,
input_name=next_input_name,
input_data=next_data,
)
- await async_update_node_execution_status(
- db_client=db_client,
- exec_id=next_node_exec_id,
- status=ExecutionStatus.INCOMPLETE,
- )
+ next_node_exec_id = next_node_exec.node_exec_id
+ await send_async_execution_update(next_node_exec)
# Complete missing static input pins data using the last execution input.
static_link_names = {
@@ -660,6 +661,16 @@ def on_graph_execution(
log_metadata.info(
f"⚙️ Graph execution #{graph_exec.graph_exec_id} is already running, continuing where it left off."
)
+ elif exec_meta.status == ExecutionStatus.REVIEW:
+ exec_meta.status = ExecutionStatus.RUNNING
+ log_metadata.info(
+ f"⚙️ Graph execution #{graph_exec.graph_exec_id} was waiting for review, resuming execution."
+ )
+ update_graph_execution_state(
+ db_client=db_client,
+ graph_exec_id=graph_exec.graph_exec_id,
+ status=ExecutionStatus.RUNNING,
+ )
elif exec_meta.status == ExecutionStatus.FAILED:
exec_meta.status = ExecutionStatus.RUNNING
log_metadata.info(
@@ -697,19 +708,21 @@ def on_graph_execution(
raise status
exec_meta.status = status
- # Activity status handling
- activity_response = asyncio.run_coroutine_threadsafe(
- generate_activity_status_for_execution(
- graph_exec_id=graph_exec.graph_exec_id,
- graph_id=graph_exec.graph_id,
- graph_version=graph_exec.graph_version,
- execution_stats=exec_stats,
- db_client=get_db_async_client(),
- user_id=graph_exec.user_id,
- execution_status=status,
- ),
- self.node_execution_loop,
- ).result(timeout=60.0)
+ if status in [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED]:
+ activity_response = asyncio.run_coroutine_threadsafe(
+ generate_activity_status_for_execution(
+ graph_exec_id=graph_exec.graph_exec_id,
+ graph_id=graph_exec.graph_id,
+ graph_version=graph_exec.graph_version,
+ execution_stats=exec_stats,
+ db_client=get_db_async_client(),
+ user_id=graph_exec.user_id,
+ execution_status=status,
+ ),
+ self.node_execution_loop,
+ ).result(timeout=60.0)
+ else:
+ activity_response = None
if activity_response is not None:
exec_stats.activity_status = activity_response["activity_status"]
exec_stats.correctness_score = activity_response["correctness_score"]
@@ -845,6 +858,7 @@ def _on_graph_execution(
ExecutionStatus.RUNNING,
ExecutionStatus.QUEUED,
ExecutionStatus.TERMINATED,
+ ExecutionStatus.REVIEW,
],
):
node_entry = node_exec.to_node_execution_entry(graph_exec.user_context)
@@ -853,6 +867,7 @@ def _on_graph_execution(
# ------------------------------------------------------------
# Main dispatch / polling loop -----------------------------
# ------------------------------------------------------------
+
while not execution_queue.empty():
if cancel.is_set():
break
@@ -1006,7 +1021,12 @@ def _on_graph_execution(
elif error is not None:
execution_status = ExecutionStatus.FAILED
else:
- execution_status = ExecutionStatus.COMPLETED
+ if db_client.has_pending_reviews_for_graph_exec(
+ graph_exec.graph_exec_id
+ ):
+ execution_status = ExecutionStatus.REVIEW
+ else:
+ execution_status = ExecutionStatus.COMPLETED
if error:
execution_stats.error = str(error) or type(error).__name__
@@ -1142,6 +1162,7 @@ async def _process_node_output(
user_id=graph_exec.user_id,
graph_exec_id=graph_exec.graph_exec_id,
graph_id=graph_exec.graph_id,
+ graph_version=graph_exec.graph_version,
log_metadata=log_metadata,
nodes_input_masks=nodes_input_masks,
user_context=graph_exec.user_context,
diff --git a/autogpt_platform/backend/backend/executor/utils.py b/autogpt_platform/backend/backend/executor/utils.py
index b11ea45cf51b..f8c6da8546e7 100644
--- a/autogpt_platform/backend/backend/executor/utils.py
+++ b/autogpt_platform/backend/backend/executor/utils.py
@@ -30,6 +30,7 @@
GraphExecutionWithNodes,
NodesInputMasks,
UserContext,
+ get_graph_execution,
)
from backend.data.graph import GraphModel, Node
from backend.data.model import CredentialsMetaInput
@@ -764,6 +765,7 @@ async def add_graph_execution(
nodes_input_masks: Optional[NodesInputMasks] = None,
parent_graph_exec_id: Optional[str] = None,
is_sub_graph: bool = False,
+ graph_exec_id: Optional[str] = None,
) -> GraphExecutionWithNodes:
"""
Adds a graph execution to the queue and returns the execution entry.
@@ -779,32 +781,48 @@ async def add_graph_execution(
nodes_input_masks: Node inputs to use in the execution.
parent_graph_exec_id: The ID of the parent graph execution (for nested executions).
is_sub_graph: Whether this is a sub-graph execution.
+ graph_exec_id: If provided, resume this existing execution instead of creating a new one.
Returns:
GraphExecutionEntry: The entry for the graph execution.
Raises:
ValueError: If the graph is not found or if there are validation errors.
+ NotFoundError: If graph_exec_id is provided but execution is not found.
"""
if prisma.is_connected():
edb = execution_db
else:
edb = get_database_manager_async_client()
- graph, starting_nodes_input, compiled_nodes_input_masks = (
- await validate_and_construct_node_execution_input(
- graph_id=graph_id,
+ # Get or create the graph execution
+ if graph_exec_id:
+ # Resume existing execution
+ graph_exec = await get_graph_execution(
user_id=user_id,
- graph_inputs=inputs or {},
- graph_version=graph_version,
- graph_credentials_inputs=graph_credentials_inputs,
- nodes_input_masks=nodes_input_masks,
- is_sub_graph=is_sub_graph,
+ execution_id=graph_exec_id,
+ include_node_executions=True,
+ )
+
+ if not graph_exec:
+ raise NotFoundError(f"Graph execution #{graph_exec_id} not found.")
+
+ # Use existing execution's compiled input masks
+ compiled_nodes_input_masks = graph_exec.nodes_input_masks or {}
+
+ logger.info(f"Resuming graph execution #{graph_exec.id} for graph #{graph_id}")
+ else:
+ # Create new execution
+ graph, starting_nodes_input, compiled_nodes_input_masks = (
+ await validate_and_construct_node_execution_input(
+ graph_id=graph_id,
+ user_id=user_id,
+ graph_inputs=inputs or {},
+ graph_version=graph_version,
+ graph_credentials_inputs=graph_credentials_inputs,
+ nodes_input_masks=nodes_input_masks,
+ is_sub_graph=is_sub_graph,
+ )
)
- )
- graph_exec = None
- try:
- # Sanity check: running add_graph_execution with the properties of
- # the graph_exec created here should create the same execution again.
graph_exec = await edb.create_graph_execution(
user_id=user_id,
graph_id=graph_id,
@@ -817,16 +835,20 @@ async def add_graph_execution(
parent_graph_exec_id=parent_graph_exec_id,
)
+ logger.info(
+ f"Created graph execution #{graph_exec.id} for graph "
+ f"#{graph_id} with {len(starting_nodes_input)} starting nodes"
+ )
+
+ # Common path: publish to queue and update status
+ try:
graph_exec_entry = graph_exec.to_graph_execution_entry(
user_context=await get_user_context(user_id),
compiled_nodes_input_masks=compiled_nodes_input_masks,
parent_graph_exec_id=parent_graph_exec_id,
)
- logger.info(
- f"Created graph execution #{graph_exec.id} for graph "
- f"#{graph_id} with {len(starting_nodes_input)} starting nodes. "
- f"Now publishing to execution queue."
- )
+
+ logger.info(f"Publishing execution {graph_exec.id} to execution queue")
exec_queue = await get_async_execution_queue()
await exec_queue.publish_message(
diff --git a/autogpt_platform/backend/backend/server/rest_api.py b/autogpt_platform/backend/backend/server/rest_api.py
index 15e7485d5d14..556903571c57 100644
--- a/autogpt_platform/backend/backend/server/rest_api.py
+++ b/autogpt_platform/backend/backend/server/rest_api.py
@@ -29,6 +29,7 @@
import backend.server.v2.builder
import backend.server.v2.builder.routes
import backend.server.v2.chat.routes as chat_routes
+import backend.server.v2.executions.review.routes
import backend.server.v2.library.db
import backend.server.v2.library.model
import backend.server.v2.library.routes
@@ -274,6 +275,11 @@ async def validation_error_handler(
tags=["v2", "admin"],
prefix="/api/executions",
)
+app.include_router(
+ backend.server.v2.executions.review.routes.router,
+ tags=["v2", "executions", "review"],
+ prefix="/api/review",
+)
app.include_router(
backend.server.v2.library.routes.router, tags=["v2"], prefix="/api/library"
)
diff --git a/autogpt_platform/backend/backend/server/v2/executions/review/model.py b/autogpt_platform/backend/backend/server/v2/executions/review/model.py
new file mode 100644
index 000000000000..74f72fe1ffcc
--- /dev/null
+++ b/autogpt_platform/backend/backend/server/v2/executions/review/model.py
@@ -0,0 +1,204 @@
+import json
+from datetime import datetime
+from typing import TYPE_CHECKING, Any, Dict, List, Union
+
+from prisma.enums import ReviewStatus
+from pydantic import BaseModel, Field, field_validator, model_validator
+
+if TYPE_CHECKING:
+ from prisma.models import PendingHumanReview
+
+# SafeJson-compatible type alias for review data
+SafeJsonData = Union[Dict[str, Any], List[Any], str, int, float, bool, None]
+
+
+class PendingHumanReviewModel(BaseModel):
+ """Response model for pending human review data.
+
+ Represents a human review request that is awaiting user action.
+ Contains all necessary information for a user to review and approve
+ or reject data from a Human-in-the-Loop block execution.
+
+ Attributes:
+ id: Unique identifier for the review record
+ user_id: ID of the user who must perform the review
+ node_exec_id: ID of the node execution that created this review
+ graph_exec_id: ID of the graph execution containing the node
+ graph_id: ID of the graph template being executed
+ graph_version: Version number of the graph template
+ payload: The actual data payload awaiting review
+ instructions: Instructions or message for the reviewer
+ editable: Whether the reviewer can edit the data
+ status: Current review status (WAITING, APPROVED, or REJECTED)
+ review_message: Optional message from the reviewer
+ created_at: Timestamp when review was created
+ updated_at: Timestamp when review was last modified
+ reviewed_at: Timestamp when review was completed (if applicable)
+ """
+
+ node_exec_id: str = Field(description="Node execution ID (primary key)")
+ user_id: str = Field(description="User ID associated with the review")
+ graph_exec_id: str = Field(description="Graph execution ID")
+ graph_id: str = Field(description="Graph ID")
+ graph_version: int = Field(description="Graph version")
+ payload: SafeJsonData = Field(description="The actual data payload awaiting review")
+ instructions: str | None = Field(
+ description="Instructions or message for the reviewer", default=None
+ )
+ editable: bool = Field(description="Whether the reviewer can edit the data")
+ status: ReviewStatus = Field(description="Review status")
+ review_message: str | None = Field(
+ description="Optional message from the reviewer", default=None
+ )
+ was_edited: bool | None = Field(
+ description="Whether the data was modified during review", default=None
+ )
+ processed: bool = Field(
+ description="Whether the review result has been processed by the execution engine",
+ default=False,
+ )
+ created_at: datetime = Field(description="When the review was created")
+ updated_at: datetime | None = Field(
+ description="When the review was last updated", default=None
+ )
+ reviewed_at: datetime | None = Field(
+ description="When the review was completed", default=None
+ )
+
+ @classmethod
+ def from_db(cls, review: "PendingHumanReview") -> "PendingHumanReviewModel":
+ """
+ Convert a database model to a response model.
+
+ Uses the new flat database structure with separate columns for
+ payload, instructions, and editable flag.
+
+ Handles invalid data gracefully by using safe defaults.
+ """
+ return cls(
+ node_exec_id=review.nodeExecId,
+ user_id=review.userId,
+ graph_exec_id=review.graphExecId,
+ graph_id=review.graphId,
+ graph_version=review.graphVersion,
+ payload=review.payload,
+ instructions=review.instructions,
+ editable=review.editable,
+ status=review.status,
+ review_message=review.reviewMessage,
+ was_edited=review.wasEdited,
+ processed=review.processed,
+ created_at=review.createdAt,
+ updated_at=review.updatedAt,
+ reviewed_at=review.reviewedAt,
+ )
+
+
+class ReviewItem(BaseModel):
+ """Single review item for processing."""
+
+ node_exec_id: str = Field(description="Node execution ID to review")
+ approved: bool = Field(
+ description="Whether this review is approved (True) or rejected (False)"
+ )
+ message: str | None = Field(
+ None, description="Optional review message", max_length=2000
+ )
+ reviewed_data: SafeJsonData | None = Field(
+ None, description="Optional edited data (ignored if approved=False)"
+ )
+
+ @field_validator("reviewed_data")
+ @classmethod
+ def validate_reviewed_data(cls, v):
+ """Validate that reviewed_data is safe and properly structured."""
+ if v is None:
+ return v
+
+ # Validate SafeJson compatibility
+ def validate_safejson_type(obj):
+ """Ensure object only contains SafeJson compatible types."""
+ if obj is None:
+ return True
+ elif isinstance(obj, (str, int, float, bool)):
+ return True
+ elif isinstance(obj, dict):
+ return all(
+ isinstance(k, str) and validate_safejson_type(v)
+ for k, v in obj.items()
+ )
+ elif isinstance(obj, list):
+ return all(validate_safejson_type(item) for item in obj)
+ else:
+ return False
+
+ if not validate_safejson_type(v):
+ raise ValueError("reviewed_data contains non-SafeJson compatible types")
+
+ # Validate data size to prevent DoS attacks
+ try:
+ json_str = json.dumps(v)
+ if len(json_str) > 1000000: # 1MB limit
+ raise ValueError("reviewed_data is too large (max 1MB)")
+ except (TypeError, ValueError) as e:
+ raise ValueError(f"reviewed_data must be JSON serializable: {str(e)}")
+
+ # Ensure no dangerous nested structures (prevent infinite recursion)
+ def check_depth(obj, max_depth=10, current_depth=0):
+ """Recursively check object nesting depth to prevent stack overflow attacks."""
+ if current_depth > max_depth:
+ raise ValueError("reviewed_data has excessive nesting depth")
+
+ if isinstance(obj, dict):
+ for value in obj.values():
+ check_depth(value, max_depth, current_depth + 1)
+ elif isinstance(obj, list):
+ for item in obj:
+ check_depth(item, max_depth, current_depth + 1)
+
+ check_depth(v)
+ return v
+
+ @field_validator("message")
+ @classmethod
+ def validate_message(cls, v):
+ """Validate and sanitize review message."""
+ if v is not None and len(v.strip()) == 0:
+ return None
+ return v
+
+
+class ReviewRequest(BaseModel):
+ """Request model for processing ALL pending reviews for an execution.
+
+ This request must include ALL pending reviews for a graph execution.
+ Each review will be either approved (with optional data modifications)
+ or rejected (data ignored). The execution will resume only after ALL reviews are processed.
+ """
+
+ reviews: List[ReviewItem] = Field(
+ description="All reviews with their approval status, data, and messages"
+ )
+
+ @model_validator(mode="after")
+ def validate_review_completeness(self):
+ """Validate that we have at least one review to process and no duplicates."""
+ if not self.reviews:
+ raise ValueError("At least one review must be provided")
+
+ # Ensure no duplicate node_exec_ids
+ node_ids = [review.node_exec_id for review in self.reviews]
+ if len(node_ids) != len(set(node_ids)):
+ duplicates = [nid for nid in set(node_ids) if node_ids.count(nid) > 1]
+ raise ValueError(f"Duplicate review IDs found: {', '.join(duplicates)}")
+
+ return self
+
+
+class ReviewResponse(BaseModel):
+ """Response from review endpoint."""
+
+ approved_count: int = Field(description="Number of reviews successfully approved")
+ rejected_count: int = Field(description="Number of reviews successfully rejected")
+ failed_count: int = Field(description="Number of reviews that failed processing")
+ error: str | None = Field(None, description="Error message if operation failed")
diff --git a/autogpt_platform/backend/backend/server/v2/executions/review/review_routes_test.py b/autogpt_platform/backend/backend/server/v2/executions/review/review_routes_test.py
new file mode 100644
index 000000000000..3bc0dff923a9
--- /dev/null
+++ b/autogpt_platform/backend/backend/server/v2/executions/review/review_routes_test.py
@@ -0,0 +1,459 @@
+import datetime
+
+import fastapi
+import fastapi.testclient
+import pytest
+import pytest_mock
+from prisma.enums import ReviewStatus
+from pytest_snapshot.plugin import Snapshot
+
+from backend.server.v2.executions.review.model import PendingHumanReviewModel
+from backend.server.v2.executions.review.routes import router
+
+# Using a fixed timestamp for reproducible tests
+FIXED_NOW = datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)
+
+app = fastapi.FastAPI()
+app.include_router(router, prefix="/api/review")
+
+client = fastapi.testclient.TestClient(app)
+
+
+@pytest.fixture(autouse=True)
+def setup_app_auth(mock_jwt_user):
+ """Setup auth overrides for all tests in this module"""
+ from autogpt_libs.auth.jwt_utils import get_jwt_payload
+
+ app.dependency_overrides[get_jwt_payload] = mock_jwt_user["get_jwt_payload"]
+ yield
+ app.dependency_overrides.clear()
+
+
+@pytest.fixture
+def sample_pending_review() -> PendingHumanReviewModel:
+ """Create a sample pending review for testing"""
+ return PendingHumanReviewModel(
+ node_exec_id="test_node_123",
+ user_id="test_user",
+ graph_exec_id="test_graph_exec_456",
+ graph_id="test_graph_789",
+ graph_version=1,
+ payload={"data": "test payload", "value": 42},
+ instructions="Please review this data",
+ editable=True,
+ status=ReviewStatus.WAITING,
+ review_message=None,
+ was_edited=None,
+ processed=False,
+ created_at=FIXED_NOW,
+ updated_at=None,
+ reviewed_at=None,
+ )
+
+
+def test_get_pending_reviews_empty(
+ mocker: pytest_mock.MockFixture,
+ snapshot: Snapshot,
+) -> None:
+ """Test getting pending reviews when none exist"""
+ mock_get_reviews = mocker.patch(
+ "backend.server.v2.executions.review.routes.get_pending_reviews_for_user"
+ )
+ mock_get_reviews.return_value = []
+
+ response = client.get("/api/review/pending")
+
+ assert response.status_code == 200
+ assert response.json() == []
+ mock_get_reviews.assert_called_once_with("test_user", 1, 25)
+
+
+def test_get_pending_reviews_with_data(
+ mocker: pytest_mock.MockFixture,
+ sample_pending_review: PendingHumanReviewModel,
+ snapshot: Snapshot,
+) -> None:
+ """Test getting pending reviews with data"""
+ mock_get_reviews = mocker.patch(
+ "backend.server.v2.executions.review.routes.get_pending_reviews_for_user"
+ )
+ mock_get_reviews.return_value = [sample_pending_review]
+
+ response = client.get("/api/review/pending?page=2&page_size=10")
+
+ assert response.status_code == 200
+ data = response.json()
+ assert len(data) == 1
+ assert data[0]["node_exec_id"] == "test_node_123"
+ assert data[0]["status"] == "WAITING"
+ mock_get_reviews.assert_called_once_with("test_user", 2, 10)
+
+
+def test_get_pending_reviews_for_execution_success(
+ mocker: pytest_mock.MockFixture,
+ sample_pending_review: PendingHumanReviewModel,
+ snapshot: Snapshot,
+) -> None:
+ """Test getting pending reviews for specific execution"""
+ mock_get_graph_execution = mocker.patch(
+ "backend.server.v2.executions.review.routes.get_graph_execution_meta"
+ )
+ mock_get_graph_execution.return_value = {
+ "id": "test_graph_exec_456",
+ "user_id": "test_user",
+ }
+
+ mock_get_reviews = mocker.patch(
+ "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution"
+ )
+ mock_get_reviews.return_value = [sample_pending_review]
+
+ response = client.get("/api/review/execution/test_graph_exec_456")
+
+ assert response.status_code == 200
+ data = response.json()
+ assert len(data) == 1
+ assert data[0]["graph_exec_id"] == "test_graph_exec_456"
+
+
+def test_get_pending_reviews_for_execution_access_denied(
+ mocker: pytest_mock.MockFixture,
+) -> None:
+ """Test access denied when user doesn't own the execution"""
+ mock_get_graph_execution = mocker.patch(
+ "backend.server.v2.executions.review.routes.get_graph_execution_meta"
+ )
+ mock_get_graph_execution.return_value = None
+
+ response = client.get("/api/review/execution/test_graph_exec_456")
+
+ assert response.status_code == 403
+ assert "Access denied" in response.json()["detail"]
+
+
+def test_process_review_action_approve_success(
+ mocker: pytest_mock.MockFixture,
+ sample_pending_review: PendingHumanReviewModel,
+) -> None:
+ """Test successful review approval"""
+ # Mock the validation functions
+ mock_get_pending_review = mocker.patch(
+ "backend.data.human_review.get_pending_review_by_node_exec_id"
+ )
+ mock_get_pending_review.return_value = sample_pending_review
+
+ mock_get_reviews_for_execution = mocker.patch(
+ "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution"
+ )
+ mock_get_reviews_for_execution.return_value = [sample_pending_review]
+
+ mock_process_all_reviews = mocker.patch(
+ "backend.server.v2.executions.review.routes.process_all_reviews_for_execution"
+ )
+ mock_process_all_reviews.return_value = {"test_node_123": sample_pending_review}
+
+ mock_has_pending = mocker.patch(
+ "backend.data.human_review.has_pending_reviews_for_graph_exec"
+ )
+ mock_has_pending.return_value = False
+
+ mocker.patch("backend.executor.utils.add_graph_execution")
+
+ request_data = {
+ "approved_reviews": [
+ {
+ "node_exec_id": "test_node_123",
+ "message": "Looks good",
+ "reviewed_data": {"data": "modified payload", "value": 50},
+ }
+ ],
+ "rejected_review_ids": [],
+ }
+
+ response = client.post("/api/review/action", json=request_data)
+
+ assert response.status_code == 200
+ data = response.json()
+ assert data["approved_count"] == 1
+ assert data["rejected_count"] == 0
+ assert data["failed_count"] == 0
+ assert data["error"] is None
+
+
+def test_process_review_action_reject_success(
+ mocker: pytest_mock.MockFixture,
+ sample_pending_review: PendingHumanReviewModel,
+) -> None:
+ """Test successful review rejection"""
+ # Mock the validation functions
+ mock_get_pending_review = mocker.patch(
+ "backend.data.human_review.get_pending_review_by_node_exec_id"
+ )
+ mock_get_pending_review.return_value = sample_pending_review
+
+ mock_get_reviews_for_execution = mocker.patch(
+ "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution"
+ )
+ mock_get_reviews_for_execution.return_value = [sample_pending_review]
+
+ mock_process_all_reviews = mocker.patch(
+ "backend.server.v2.executions.review.routes.process_all_reviews_for_execution"
+ )
+ rejected_review = PendingHumanReviewModel(
+ node_exec_id="test_node_123",
+ user_id="test_user",
+ graph_exec_id="test_graph_exec_456",
+ graph_id="test_graph_789",
+ graph_version=1,
+ payload={"data": "test payload"},
+ instructions="Please review",
+ editable=True,
+ status=ReviewStatus.REJECTED,
+ review_message="Rejected by user",
+ was_edited=False,
+ processed=False,
+ created_at=FIXED_NOW,
+ updated_at=None,
+ reviewed_at=FIXED_NOW,
+ )
+ mock_process_all_reviews.return_value = {"test_node_123": rejected_review}
+
+ mock_has_pending = mocker.patch(
+ "backend.data.human_review.has_pending_reviews_for_graph_exec"
+ )
+ mock_has_pending.return_value = False
+
+ request_data = {"approved_reviews": [], "rejected_review_ids": ["test_node_123"]}
+
+ response = client.post("/api/review/action", json=request_data)
+
+ assert response.status_code == 200
+ data = response.json()
+ assert data["approved_count"] == 0
+ assert data["rejected_count"] == 1
+ assert data["failed_count"] == 0
+ assert data["error"] is None
+
+
+def test_process_review_action_mixed_success(
+ mocker: pytest_mock.MockFixture,
+ sample_pending_review: PendingHumanReviewModel,
+) -> None:
+ """Test mixed approve/reject operations"""
+ # Create a second review
+ second_review = PendingHumanReviewModel(
+ node_exec_id="test_node_456",
+ user_id="test_user",
+ graph_exec_id="test_graph_exec_456",
+ graph_id="test_graph_789",
+ graph_version=1,
+ payload={"data": "second payload"},
+ instructions="Second review",
+ editable=False,
+ status=ReviewStatus.WAITING,
+ review_message=None,
+ was_edited=None,
+ processed=False,
+ created_at=FIXED_NOW,
+ updated_at=None,
+ reviewed_at=None,
+ )
+
+ # Mock the validation functions
+ mock_get_pending_review = mocker.patch(
+ "backend.data.human_review.get_pending_review_by_node_exec_id"
+ )
+ mock_get_pending_review.side_effect = lambda node_id, user_id: (
+ sample_pending_review if node_id == "test_node_123" else second_review
+ )
+
+ mock_get_reviews_for_execution = mocker.patch(
+ "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution"
+ )
+ mock_get_reviews_for_execution.return_value = [sample_pending_review, second_review]
+
+ mock_process_all_reviews = mocker.patch(
+ "backend.server.v2.executions.review.routes.process_all_reviews_for_execution"
+ )
+ # Create approved version of first review
+ approved_review = PendingHumanReviewModel(
+ node_exec_id="test_node_123",
+ user_id="test_user",
+ graph_exec_id="test_graph_exec_456",
+ graph_id="test_graph_789",
+ graph_version=1,
+ payload={"data": "modified"},
+ instructions="Please review",
+ editable=True,
+ status=ReviewStatus.APPROVED,
+ review_message="Approved",
+ was_edited=True,
+ processed=False,
+ created_at=FIXED_NOW,
+ updated_at=None,
+ reviewed_at=FIXED_NOW,
+ )
+ # Create rejected version of second review
+ rejected_review = PendingHumanReviewModel(
+ node_exec_id="test_node_456",
+ user_id="test_user",
+ graph_exec_id="test_graph_exec_456",
+ graph_id="test_graph_789",
+ graph_version=1,
+ payload={"data": "second payload"},
+ instructions="Second review",
+ editable=False,
+ status=ReviewStatus.REJECTED,
+ review_message="Rejected by user",
+ was_edited=False,
+ processed=False,
+ created_at=FIXED_NOW,
+ updated_at=None,
+ reviewed_at=FIXED_NOW,
+ )
+ mock_process_all_reviews.return_value = {
+ "test_node_123": approved_review,
+ "test_node_456": rejected_review,
+ }
+
+ mock_has_pending = mocker.patch(
+ "backend.data.human_review.has_pending_reviews_for_graph_exec"
+ )
+ mock_has_pending.return_value = False
+
+ request_data = {
+ "approved_reviews": [
+ {
+ "node_exec_id": "test_node_123",
+ "message": "Approved",
+ "reviewed_data": {"data": "modified"},
+ }
+ ],
+ "rejected_review_ids": ["test_node_456"],
+ }
+
+ response = client.post("/api/review/action", json=request_data)
+
+ assert response.status_code == 200
+ data = response.json()
+ assert data["approved_count"] == 1
+ assert data["rejected_count"] == 1
+ assert data["failed_count"] == 0
+ assert data["error"] is None
+
+
+def test_process_review_action_empty_request(
+ mocker: pytest_mock.MockFixture,
+) -> None:
+ """Test error when no reviews provided"""
+ request_data = {"approved_reviews": [], "rejected_review_ids": []}
+
+ response = client.post("/api/review/action", json=request_data)
+
+ assert response.status_code == 400
+ assert "At least one review must be provided" in response.json()["detail"]
+
+
+def test_process_review_action_review_not_found(
+ mocker: pytest_mock.MockFixture,
+) -> None:
+ """Test error when review is not found"""
+ mock_get_pending_review = mocker.patch(
+ "backend.data.human_review.get_pending_review_by_node_exec_id"
+ )
+ mock_get_pending_review.return_value = None
+
+ request_data = {
+ "approved_reviews": [
+ {
+ "node_exec_id": "nonexistent_node",
+ "message": "Test",
+ }
+ ],
+ "rejected_review_ids": [],
+ }
+
+ response = client.post("/api/review/action", json=request_data)
+
+ assert response.status_code == 403
+ assert "not found or access denied" in response.json()["detail"]
+
+
+def test_process_review_action_partial_failure(
+ mocker: pytest_mock.MockFixture,
+ sample_pending_review: PendingHumanReviewModel,
+) -> None:
+ """Test handling of partial failures in review processing"""
+ # Mock successful validation
+ mock_get_pending_review = mocker.patch(
+ "backend.data.human_review.get_pending_review_by_node_exec_id"
+ )
+ mock_get_pending_review.return_value = sample_pending_review
+
+ mock_get_reviews_for_execution = mocker.patch(
+ "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution"
+ )
+ mock_get_reviews_for_execution.return_value = [sample_pending_review]
+
+ # Mock partial failure in processing
+ mock_process_all_reviews = mocker.patch(
+ "backend.server.v2.executions.review.routes.process_all_reviews_for_execution"
+ )
+ mock_process_all_reviews.side_effect = ValueError("Some reviews failed validation")
+
+ request_data = {
+ "approved_reviews": [
+ {
+ "node_exec_id": "test_node_123",
+ "message": "Test",
+ }
+ ],
+ "rejected_review_ids": [],
+ }
+
+ response = client.post("/api/review/action", json=request_data)
+
+ assert response.status_code == 200
+ data = response.json()
+ assert data["approved_count"] == 0
+ assert data["rejected_count"] == 0
+ assert data["failed_count"] == 1
+ assert "Failed to process reviews" in data["error"]
+
+
+def test_process_review_action_complete_failure(
+ mocker: pytest_mock.MockFixture,
+ sample_pending_review: PendingHumanReviewModel,
+) -> None:
+ """Test complete failure scenario"""
+ # Mock successful validation
+ mock_get_pending_review = mocker.patch(
+ "backend.data.human_review.get_pending_review_by_node_exec_id"
+ )
+ mock_get_pending_review.return_value = sample_pending_review
+
+ mock_get_reviews_for_execution = mocker.patch(
+ "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution"
+ )
+ mock_get_reviews_for_execution.return_value = [sample_pending_review]
+
+ # Mock complete failure in processing
+ mock_process_all_reviews = mocker.patch(
+ "backend.server.v2.executions.review.routes.process_all_reviews_for_execution"
+ )
+ mock_process_all_reviews.side_effect = Exception("Database error")
+
+ request_data = {
+ "approved_reviews": [
+ {
+ "node_exec_id": "test_node_123",
+ "message": "Test",
+ }
+ ],
+ "rejected_review_ids": [],
+ }
+
+ response = client.post("/api/review/action", json=request_data)
+
+ assert response.status_code == 500
+ assert "error" in response.json()["detail"].lower()
diff --git a/autogpt_platform/backend/backend/server/v2/executions/review/routes.py b/autogpt_platform/backend/backend/server/v2/executions/review/routes.py
new file mode 100644
index 000000000000..a8afe036356e
--- /dev/null
+++ b/autogpt_platform/backend/backend/server/v2/executions/review/routes.py
@@ -0,0 +1,194 @@
+import logging
+from typing import List
+
+import autogpt_libs.auth as autogpt_auth_lib
+from fastapi import APIRouter, HTTPException, Query, Security, status
+from prisma.enums import ReviewStatus
+
+from backend.data.execution import get_graph_execution_meta
+from backend.data.human_review import (
+ get_pending_reviews_for_execution,
+ get_pending_reviews_for_user,
+ has_pending_reviews_for_graph_exec,
+ process_all_reviews_for_execution,
+)
+from backend.executor.utils import add_graph_execution
+from backend.server.v2.executions.review.model import (
+ PendingHumanReviewModel,
+ ReviewRequest,
+ ReviewResponse,
+)
+
+logger = logging.getLogger(__name__)
+
+
+router = APIRouter(
+ tags=["executions", "review", "private"],
+ dependencies=[Security(autogpt_auth_lib.requires_user)],
+)
+
+
+@router.get(
+ "/pending",
+ summary="Get Pending Reviews",
+ response_model=List[PendingHumanReviewModel],
+ responses={
+ 200: {"description": "List of pending reviews"},
+ 500: {"description": "Server error", "content": {"application/json": {}}},
+ },
+)
+async def list_pending_reviews(
+ user_id: str = Security(autogpt_auth_lib.get_user_id),
+ page: int = Query(1, ge=1, description="Page number (1-indexed)"),
+ page_size: int = Query(25, ge=1, le=100, description="Number of reviews per page"),
+) -> List[PendingHumanReviewModel]:
+ """Get all pending reviews for the current user.
+
+ Retrieves all reviews with status "WAITING" that belong to the authenticated user.
+ Results are ordered by creation time (newest first).
+
+ Args:
+ user_id: Authenticated user ID from security dependency
+
+ Returns:
+ List of pending review objects with status converted to typed literals
+
+ Raises:
+ HTTPException: If authentication fails or database error occurs
+
+ Note:
+ Reviews with invalid status values are logged as warnings but excluded
+ from results rather than failing the entire request.
+ """
+
+ return await get_pending_reviews_for_user(user_id, page, page_size)
+
+
+@router.get(
+ "/execution/{graph_exec_id}",
+ summary="Get Pending Reviews for Execution",
+ response_model=List[PendingHumanReviewModel],
+ responses={
+ 200: {"description": "List of pending reviews for the execution"},
+ 400: {"description": "Invalid graph execution ID"},
+ 403: {"description": "Access denied to graph execution"},
+ 500: {"description": "Server error", "content": {"application/json": {}}},
+ },
+)
+async def list_pending_reviews_for_execution(
+ graph_exec_id: str,
+ user_id: str = Security(autogpt_auth_lib.get_user_id),
+) -> List[PendingHumanReviewModel]:
+ """Get all pending reviews for a specific graph execution.
+
+ Retrieves all reviews with status "WAITING" for the specified graph execution
+ that belong to the authenticated user. Results are ordered by creation time
+ (oldest first) to preserve review order within the execution.
+
+ Args:
+ graph_exec_id: ID of the graph execution to get reviews for
+ user_id: Authenticated user ID from security dependency
+
+ Returns:
+ List of pending review objects for the specified execution
+
+ Raises:
+ HTTPException:
+ - 403: If user doesn't own the graph execution
+ - 500: If authentication fails or database error occurs
+
+ Note:
+ Only returns reviews owned by the authenticated user for security.
+ Reviews with invalid status are excluded with warning logs.
+ """
+
+ # Verify user owns the graph execution before returning reviews
+ graph_exec = await get_graph_execution_meta(
+ user_id=user_id, execution_id=graph_exec_id
+ )
+ if not graph_exec:
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail="Access denied to graph execution",
+ )
+
+ return await get_pending_reviews_for_execution(graph_exec_id, user_id)
+
+
+@router.post("/action", response_model=ReviewResponse)
+async def process_review_action(
+ request: ReviewRequest,
+ user_id: str = Security(autogpt_auth_lib.get_user_id),
+) -> ReviewResponse:
+ """Process reviews with approve or reject actions."""
+
+ # Collect all node exec IDs from the request
+ all_request_node_ids = {review.node_exec_id for review in request.reviews}
+
+ if not all_request_node_ids:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail="At least one review must be provided",
+ )
+
+ # Build review decisions map
+ review_decisions = {}
+ for review in request.reviews:
+ if review.approved:
+ review_decisions[review.node_exec_id] = (
+ ReviewStatus.APPROVED,
+ review.reviewed_data,
+ review.message,
+ )
+ else:
+ review_decisions[review.node_exec_id] = (
+ ReviewStatus.REJECTED,
+ None,
+ review.message,
+ )
+
+ # Process all reviews
+ updated_reviews = await process_all_reviews_for_execution(
+ user_id=user_id,
+ review_decisions=review_decisions,
+ )
+
+ # Count results
+ approved_count = sum(
+ 1
+ for review in updated_reviews.values()
+ if review.status == ReviewStatus.APPROVED
+ )
+ rejected_count = sum(
+ 1
+ for review in updated_reviews.values()
+ if review.status == ReviewStatus.REJECTED
+ )
+
+ # Resume execution if we processed some reviews
+ if updated_reviews:
+ # Get graph execution ID from any processed review
+ first_review = next(iter(updated_reviews.values()))
+ graph_exec_id = first_review.graph_exec_id
+
+ # Check if any pending reviews remain for this execution
+ still_has_pending = await has_pending_reviews_for_graph_exec(graph_exec_id)
+
+ if not still_has_pending:
+ # Resume execution
+ try:
+ await add_graph_execution(
+ graph_id=first_review.graph_id,
+ user_id=user_id,
+ graph_exec_id=graph_exec_id,
+ )
+ logger.info(f"Resumed execution {graph_exec_id}")
+ except Exception as e:
+ logger.error(f"Failed to resume execution {graph_exec_id}: {str(e)}")
+
+ return ReviewResponse(
+ approved_count=approved_count,
+ rejected_count=rejected_count,
+ failed_count=0,
+ error=None,
+ )
diff --git a/autogpt_platform/backend/backend/server/v2/library/routes/agents.py b/autogpt_platform/backend/backend/server/v2/library/routes/agents.py
index 1bdf255ce50e..eeea9d8fb65f 100644
--- a/autogpt_platform/backend/backend/server/v2/library/routes/agents.py
+++ b/autogpt_platform/backend/backend/server/v2/library/routes/agents.py
@@ -22,7 +22,9 @@
@router.get(
"",
summary="List Library Agents",
+ response_model=library_model.LibraryAgentResponse,
responses={
+ 200: {"description": "List of library agents"},
500: {"description": "Server error", "content": {"application/json": {}}},
},
)
@@ -155,7 +157,12 @@ async def get_library_agent_by_graph_id(
@router.get(
"/marketplace/{store_listing_version_id}",
summary="Get Agent By Store ID",
- tags=["store, library"],
+ tags=["store", "library"],
+ response_model=library_model.LibraryAgent | None,
+ responses={
+ 200: {"description": "Library agent found"},
+ 404: {"description": "Agent not found"},
+ },
)
async def get_library_agent_by_store_listing_version_id(
store_listing_version_id: str,
diff --git a/autogpt_platform/backend/backend/util/test.py b/autogpt_platform/backend/backend/util/test.py
index 13b3365446d3..0a2015254be5 100644
--- a/autogpt_platform/backend/backend/util/test.py
+++ b/autogpt_platform/backend/backend/util/test.py
@@ -140,6 +140,7 @@ async def async_mock(
"graph_exec_id": str(uuid.uuid4()),
"node_exec_id": str(uuid.uuid4()),
"user_id": str(uuid.uuid4()),
+ "graph_version": 1, # Default version for tests
"user_context": UserContext(timezone="UTC"), # Default for tests
}
input_model = cast(type[BlockSchema], block.input_schema)
diff --git a/autogpt_platform/backend/migrations/20251117102522_add_human_in_the_loop_table/migration.sql b/autogpt_platform/backend/migrations/20251117102522_add_human_in_the_loop_table/migration.sql
new file mode 100644
index 000000000000..5a2cc2f722b6
--- /dev/null
+++ b/autogpt_platform/backend/migrations/20251117102522_add_human_in_the_loop_table/migration.sql
@@ -0,0 +1,44 @@
+-- CreateEnum
+CREATE TYPE "ReviewStatus" AS ENUM ('WAITING', 'APPROVED', 'REJECTED');
+
+-- AlterEnum
+ALTER TYPE "AgentExecutionStatus" ADD VALUE 'REVIEW';
+
+-- CreateTable
+CREATE TABLE "PendingHumanReview" (
+ "nodeExecId" TEXT NOT NULL,
+ "userId" TEXT NOT NULL,
+ "graphExecId" TEXT NOT NULL,
+ "graphId" TEXT NOT NULL,
+ "graphVersion" INTEGER NOT NULL,
+ "payload" JSONB NOT NULL,
+ "instructions" TEXT,
+ "editable" BOOLEAN NOT NULL DEFAULT true,
+ "status" "ReviewStatus" NOT NULL DEFAULT 'WAITING',
+ "reviewMessage" TEXT,
+ "wasEdited" BOOLEAN,
+ "processed" BOOLEAN NOT NULL DEFAULT false,
+ "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ "updatedAt" TIMESTAMP(3),
+ "reviewedAt" TIMESTAMP(3),
+
+ CONSTRAINT "PendingHumanReview_pkey" PRIMARY KEY ("nodeExecId")
+);
+
+-- CreateIndex
+CREATE INDEX "PendingHumanReview_userId_status_idx" ON "PendingHumanReview"("userId", "status");
+
+-- CreateIndex
+CREATE INDEX "PendingHumanReview_graphExecId_status_idx" ON "PendingHumanReview"("graphExecId", "status");
+
+-- CreateIndex
+CREATE UNIQUE INDEX "PendingHumanReview_nodeExecId_key" ON "PendingHumanReview"("nodeExecId");
+
+-- AddForeignKey
+ALTER TABLE "PendingHumanReview" ADD CONSTRAINT "PendingHumanReview_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;
+
+-- AddForeignKey
+ALTER TABLE "PendingHumanReview" ADD CONSTRAINT "PendingHumanReview_nodeExecId_fkey" FOREIGN KEY ("nodeExecId") REFERENCES "AgentNodeExecution"("id") ON DELETE CASCADE ON UPDATE CASCADE;
+
+-- AddForeignKey
+ALTER TABLE "PendingHumanReview" ADD CONSTRAINT "PendingHumanReview_graphExecId_fkey" FOREIGN KEY ("graphExecId") REFERENCES "AgentGraphExecution"("id") ON DELETE CASCADE ON UPDATE CASCADE;
diff --git a/autogpt_platform/backend/schema.prisma b/autogpt_platform/backend/schema.prisma
index e8755d99ab73..ca015a3cb930 100644
--- a/autogpt_platform/backend/schema.prisma
+++ b/autogpt_platform/backend/schema.prisma
@@ -59,6 +59,7 @@ model User {
APIKeys APIKey[]
IntegrationWebhooks IntegrationWebhook[]
NotificationBatches UserNotificationBatch[]
+ PendingHumanReviews PendingHumanReview[]
}
enum OnboardingStep {
@@ -351,6 +352,7 @@ enum AgentExecutionStatus {
COMPLETED
TERMINATED
FAILED
+ REVIEW
}
// This model describes the execution of an AgentGraph.
@@ -393,6 +395,8 @@ model AgentGraphExecution {
shareToken String? @unique
sharedAt DateTime?
+ PendingHumanReviews PendingHumanReview[]
+
@@index([agentGraphId, agentGraphVersion])
@@index([userId, isDeleted, createdAt])
@@index([createdAt])
@@ -423,6 +427,8 @@ model AgentNodeExecution {
stats Json?
+ PendingHumanReview PendingHumanReview?
+
@@index([agentGraphExecutionId, agentNodeId, executionStatus])
@@index([agentNodeId, executionStatus])
@@index([addedTime, queuedTime])
@@ -464,6 +470,39 @@ model AgentNodeExecutionKeyValueData {
@@id([userId, key])
}
+enum ReviewStatus {
+ WAITING
+ APPROVED
+ REJECTED
+}
+
+// Pending human reviews for Human-in-the-loop blocks
+model PendingHumanReview {
+ nodeExecId String @id
+ userId String
+ graphExecId String
+ graphId String
+ graphVersion Int
+ payload Json // The actual payload data to be reviewed
+ instructions String? // Instructions/message for the reviewer
+ editable Boolean @default(true) // Whether the reviewer can edit the data
+ status ReviewStatus @default(WAITING)
+ reviewMessage String? // Optional message from the reviewer
+ wasEdited Boolean? // Whether the data was modified during review
+ processed Boolean @default(false) // Whether the review result has been processed by the execution engine
+ createdAt DateTime @default(now())
+ updatedAt DateTime? @updatedAt
+ reviewedAt DateTime?
+
+ User User @relation(fields: [userId], references: [id], onDelete: Cascade)
+ NodeExecution AgentNodeExecution @relation(fields: [nodeExecId], references: [id], onDelete: Cascade)
+ GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade)
+
+ @@unique([nodeExecId]) // One pending review per node execution
+ @@index([userId, status])
+ @@index([graphExecId, status])
+}
+
// Webhook that is registered with a provider and propagates to one or more nodes
model IntegrationWebhook {
id String @id @default(uuid())
diff --git a/autogpt_platform/frontend/src/app/(platform)/build/components/BuilderActions/components/RunGraph/RunGraph.tsx b/autogpt_platform/frontend/src/app/(platform)/build/components/BuilderActions/components/RunGraph/RunGraph.tsx
index 3a0c7aab4a3d..f4c1a7331f9b 100644
--- a/autogpt_platform/frontend/src/app/(platform)/build/components/BuilderActions/components/RunGraph/RunGraph.tsx
+++ b/autogpt_platform/frontend/src/app/(platform)/build/components/BuilderActions/components/RunGraph/RunGraph.tsx
@@ -18,6 +18,7 @@ export const RunGraph = ({ flowID }: { flowID: string | null }) => {
openRunInputDialog,
setOpenRunInputDialog,
isExecutingGraph,
+ isTerminatingGraph,
isSaving,
} = useRunGraph();
const isGraphRunning = useGraphStore(
@@ -34,8 +35,8 @@ export const RunGraph = ({ flowID }: { flowID: string | null }) => {
"border-red-500 bg-gradient-to-br from-red-400 to-red-500 shadow-[inset_0_2px_0_0_rgba(255,255,255,0.5),0_2px_4px_0_rgba(0,0,0,0.2)]",
)}
onClick={isGraphRunning ? handleStopGraph : handleRunGraph}
- disabled={!flowID || isExecutingGraph}
- isLoading={isExecutingGraph || isSaving}
+ disabled={!flowID || isExecutingGraph || isTerminatingGraph}
+ isLoading={isExecutingGraph || isTerminatingGraph || isSaving}
>
{!isGraphRunning ? (