From b8cb2444cccbb468920a937acbb7b07a494ac8fc Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Fri, 14 Nov 2025 12:39:20 +0700 Subject: [PATCH 01/47] feat(platform): add Human In The Loop block with review workflow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit implements a comprehensive Human In The Loop (HITL) block that allows agents to pause execution and wait for human approval/modification of data before continuing. Key features: - Added WAITING_FOR_REVIEW status to AgentExecutionStatus enum - Created PendingHumanReview database table for storing review requests - Implemented HumanInTheLoopBlock that extracts input data and creates review entries - Added API endpoints at /api/executions/review for fetching and reviewing pending data - Updated execution manager to properly handle waiting status and resume after approval - Created comprehensive frontend UI components: - PendingReviewCard for individual review handling - PendingReviewsList for multiple reviews - FloatingReviewsPanel for graph builder integration - Integrated review UI into 3 locations: legacy library, new library, and graph builder - Added proper type safety throughout with SafeJson handling - Optimized database queries using count functions instead of full data fetching 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../backend/blocks/human_in_the_loop.py | 145 +++++++++++ .../backend/backend/data/execution.py | 36 +++ .../backend/backend/executor/database.py | 4 + .../backend/backend/executor/manager.py | 36 ++- .../backend/backend/server/rest_api.py | 6 + .../backend/server/v2/executions/__init__.py | 0 .../server/v2/executions/review/__init__.py | 0 .../server/v2/executions/review/model.py | 41 +++ .../server/v2/executions/review/routes.py | 193 ++++++++++++++ autogpt_platform/backend/schema.prisma | 30 +++ .../build/components/FlowEditor/Flow/Flow.tsx | 6 + .../components/NodeExecutionBadge.tsx | 3 +- .../FlowEditor/nodes/CustomNode/helpers.ts | 1 + .../components/legacy-builder/Flow/Flow.tsx | 5 + .../RunsSidebar/components/RunListItem.tsx | 5 + .../SelectedRunView/SelectedRunView.tsx | 29 +++ .../components/RunStatusBadge.tsx | 9 +- .../components/agent-run-status-chip.tsx | 1 + .../frontend/src/app/api/openapi.json | 244 +++++++++++++++++- .../FloatingReviewsPanel.tsx | 93 +++++++ .../PendingReviewCard/PendingReviewCard.tsx | 169 ++++++++++++ .../PendingReviewsList/PendingReviewsList.tsx | 50 ++++ .../frontend/src/hooks/useAgentGraph.tsx | 11 +- .../frontend/src/hooks/usePendingReviews.ts | 32 +++ .../src/lib/autogpt-server-api/types.ts | 6 +- 25 files changed, 1143 insertions(+), 12 deletions(-) create mode 100644 autogpt_platform/backend/backend/blocks/human_in_the_loop.py create mode 100644 autogpt_platform/backend/backend/server/v2/executions/__init__.py create mode 100644 autogpt_platform/backend/backend/server/v2/executions/review/__init__.py create mode 100644 autogpt_platform/backend/backend/server/v2/executions/review/model.py create mode 100644 autogpt_platform/backend/backend/server/v2/executions/review/routes.py create mode 100644 autogpt_platform/frontend/src/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel.tsx create mode 100644 autogpt_platform/frontend/src/components/organisms/PendingReviewCard/PendingReviewCard.tsx create mode 100644 autogpt_platform/frontend/src/components/organisms/PendingReviewsList/PendingReviewsList.tsx create mode 100644 autogpt_platform/frontend/src/hooks/usePendingReviews.ts diff --git a/autogpt_platform/backend/backend/blocks/human_in_the_loop.py b/autogpt_platform/backend/backend/blocks/human_in_the_loop.py new file mode 100644 index 000000000000..8d45adf30fa9 --- /dev/null +++ b/autogpt_platform/backend/backend/blocks/human_in_the_loop.py @@ -0,0 +1,145 @@ +from typing import Any + +from prisma.models import PendingHumanReview + +from backend.data.block import ( + Block, + BlockCategory, + BlockOutput, + BlockSchemaInput, + BlockSchemaOutput, +) +from backend.data.model import SchemaField +from backend.util.json import SafeJson +from backend.util.type import convert + + +class HumanInTheLoopBlock(Block): + """ + This block pauses execution and waits for human approval or modification of the data. + + When executed, it creates a pending review entry and sets the node execution status + to WAITING_FOR_REVIEW. The execution will remain paused until a human user either: + - Approves the data (with or without modifications) + - Rejects the data + + This is useful for workflows that require human validation or intervention before + proceeding to the next steps. + """ + + class Input(BlockSchemaInput): + data: Any = SchemaField(description="The data to be reviewed by a human user") + message: str = SchemaField( + description="Instructions or message for the human reviewer", + default="Please review and approve or modify the following data:", + ) + editable: bool = SchemaField( + description="Whether the human reviewer can edit the data", + default=True, + advanced=True, + ) + + class Output(BlockSchemaOutput): + reviewed_data: Any = SchemaField( + description="The data after human review (may be modified)" + ) + status: str = SchemaField( + description="Status of the review: 'approved' or 'rejected'" + ) + review_message: str = SchemaField( + description="Any message provided by the reviewer", default="" + ) + + def __init__(self): + super().__init__( + id="8b2a7b3c-6e9d-4a5f-8c1b-2e3f4a5b6c7d", + description="Pause execution and wait for human approval or modification of data", + categories={BlockCategory.BASIC}, + input_schema=HumanInTheLoopBlock.Input, + output_schema=HumanInTheLoopBlock.Output, + test_input={ + "data": {"name": "John Doe", "age": 30}, + "message": "Please verify this user data", + "editable": True, + }, + test_output=[ + ("reviewed_data", {"name": "John Doe", "age": 30}), + ("status", "approved"), + ("review_message", ""), + ], + ) + + async def run( + self, + input_data: Input, + *, + user_id: str, + node_exec_id: str, + graph_exec_id: str, + graph_id: str, + graph_version: int, + **kwargs + ) -> BlockOutput: + # Check if there's already an approved review for this node execution + existing_review = await PendingHumanReview.prisma().find_unique( + where={"nodeExecId": node_exec_id} + ) + + if existing_review and existing_review.status == "APPROVED": + # Return the approved data (which may have been modified by the reviewer) + # The data field now contains the approved/modified data from the review + if ( + isinstance(existing_review.data, dict) + and "data" in existing_review.data + ): + # Extract the actual data from the review data structure + approved_data = existing_review.data["data"] + else: + # Fallback to the stored data directly + approved_data = existing_review.data + + approved_data = convert(approved_data, type(input_data.data)) + yield "reviewed_data", approved_data + yield "status", "approved" + yield "review_message", existing_review.reviewMessage or "" + + # Clean up the review record as it's been processed + await PendingHumanReview.prisma().delete(where={"id": existing_review.id}) + return + + elif existing_review and existing_review.status == "REJECTED": + # Return rejection status without data + yield "status", "rejected" + yield "review_message", existing_review.reviewMessage or "" + + # Clean up the review record + await PendingHumanReview.prisma().delete(where={"id": existing_review.id}) + return + + # No existing approved review, create a pending review + review_data = { + "data": input_data.data, + "message": input_data.message, + "editable": input_data.editable, + } + + await PendingHumanReview.prisma().upsert( + where={"nodeExecId": node_exec_id}, + data={ + "create": { + "userId": user_id, + "nodeExecId": node_exec_id, + "graphExecId": graph_exec_id, + "graphId": graph_id, + "graphVersion": graph_version, + "data": SafeJson(review_data), + "status": "WAITING", + }, + "update": {"data": SafeJson(review_data), "status": "WAITING"}, + }, + ) + + # This will effectively pause the execution here + # The execution will be resumed when the review is approved + # The manager will detect the pending review and set the status to WAITING_FOR_REVIEW + return diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index eb36e0e7cdac..64f5d59cb3f7 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -101,6 +101,7 @@ def error_rate(self) -> float: ExecutionStatus.INCOMPLETE, ExecutionStatus.QUEUED, ExecutionStatus.TERMINATED, # For resuming halted execution + ExecutionStatus.WAITING_FOR_REVIEW, # For resuming after review ], ExecutionStatus.COMPLETED: [ ExecutionStatus.RUNNING, @@ -115,6 +116,9 @@ def error_rate(self) -> float: ExecutionStatus.QUEUED, ExecutionStatus.RUNNING, ], + ExecutionStatus.WAITING_FOR_REVIEW: [ + ExecutionStatus.RUNNING, + ], } @@ -1002,6 +1006,38 @@ async def get_node_executions( return res +async def get_node_executions_count( + graph_exec_id: str | None = None, + node_id: str | None = None, + block_ids: list[str] | None = None, + statuses: list[ExecutionStatus] | None = None, + created_time_gte: datetime | None = None, + created_time_lte: datetime | None = None, +) -> int: + """ + Get count of node executions with optional filters. + ⚠️ No `user_id` check: DO NOT USE without check in user-facing endpoints. + """ + where_clause: AgentNodeExecutionWhereInput = {} + if graph_exec_id: + where_clause["agentGraphExecutionId"] = graph_exec_id + if node_id: + where_clause["agentNodeId"] = node_id + if block_ids: + where_clause["Node"] = {"is": {"agentBlockId": {"in": block_ids}}} + if statuses: + where_clause["OR"] = [{"executionStatus": status} for status in statuses] + + if created_time_gte or created_time_lte: + where_clause["addedTime"] = { + "gte": created_time_gte or datetime.min.replace(tzinfo=timezone.utc), + "lte": created_time_lte or datetime.max.replace(tzinfo=timezone.utc), + } + + count = await AgentNodeExecution.prisma().count(where=where_clause) + return count + + async def get_latest_node_execution( node_id: str, graph_eid: str ) -> NodeExecutionResult | None: diff --git a/autogpt_platform/backend/backend/executor/database.py b/autogpt_platform/backend/backend/executor/database.py index df581e0de408..79733f4a17cc 100644 --- a/autogpt_platform/backend/backend/executor/database.py +++ b/autogpt_platform/backend/backend/executor/database.py @@ -15,6 +15,7 @@ get_latest_node_execution, get_node_execution, get_node_executions, + get_node_executions_count, set_execution_kv_data, update_graph_execution_start_time, update_graph_execution_stats, @@ -129,6 +130,7 @@ def _( create_graph_execution = _(create_graph_execution) get_node_execution = _(get_node_execution) get_node_executions = _(get_node_executions) + get_node_executions_count = _(get_node_executions_count) get_latest_node_execution = _(get_latest_node_execution) update_node_execution_status = _(update_node_execution_status) update_node_execution_status_batch = _(update_node_execution_status_batch) @@ -200,6 +202,7 @@ def get_service_type(cls): get_graph_executions_count = _(d.get_graph_executions_count) get_graph_execution_meta = _(d.get_graph_execution_meta) get_node_executions = _(d.get_node_executions) + get_node_executions_count = _(d.get_node_executions_count) update_node_execution_status = _(d.update_node_execution_status) update_graph_execution_start_time = _(d.update_graph_execution_start_time) update_graph_execution_stats = _(d.update_graph_execution_stats) @@ -245,6 +248,7 @@ def get_service_type(cls): get_node = d.get_node get_node_execution = d.get_node_execution get_node_executions = d.get_node_executions + get_node_executions_count = d.get_node_executions_count get_user_by_id = d.get_user_by_id get_user_integrations = d.get_user_integrations upsert_execution_input = d.upsert_execution_input diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index d4c362e6aac7..ab991c0ecfc7 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -573,7 +573,21 @@ async def persist_output(output_name: str, output_data: Any) -> None: await persist_output(output_name, output_data) log_metadata.info(f"Finished node execution {node_exec.node_exec_id}") - status = ExecutionStatus.COMPLETED + + # Check if this node has pending reviews (Human In The Loop block) + try: + from prisma.models import PendingHumanReview + + pending_review = await PendingHumanReview.prisma().find_first( + where={"nodeExecId": node_exec.node_exec_id, "status": "WAITING"} + ) + if pending_review: + status = ExecutionStatus.WAITING_FOR_REVIEW + else: + status = ExecutionStatus.COMPLETED + except Exception: + # If there's any issue checking for pending reviews, default to COMPLETED + status = ExecutionStatus.COMPLETED except BaseException as e: stats.error = e @@ -660,6 +674,16 @@ def on_graph_execution( log_metadata.info( f"⚙️ Graph execution #{graph_exec.graph_exec_id} is already running, continuing where it left off." ) + elif exec_meta.status == ExecutionStatus.WAITING_FOR_REVIEW: + exec_meta.status = ExecutionStatus.RUNNING + log_metadata.info( + f"⚙️ Graph execution #{graph_exec.graph_exec_id} was waiting for review, resuming execution." + ) + update_graph_execution_state( + db_client=db_client, + graph_exec_id=graph_exec.graph_exec_id, + status=ExecutionStatus.RUNNING, + ) elif exec_meta.status == ExecutionStatus.FAILED: exec_meta.status = ExecutionStatus.RUNNING log_metadata.info( @@ -1006,7 +1030,15 @@ def _on_graph_execution( elif error is not None: execution_status = ExecutionStatus.FAILED else: - execution_status = ExecutionStatus.COMPLETED + # Check if there are any nodes waiting for review + waiting_nodes_count = db_client.get_node_executions_count( + graph_exec_id=graph_exec.graph_exec_id, + statuses=[ExecutionStatus.WAITING_FOR_REVIEW], + ) + if waiting_nodes_count > 0: + execution_status = ExecutionStatus.WAITING_FOR_REVIEW + else: + execution_status = ExecutionStatus.COMPLETED if error: execution_stats.error = str(error) or type(error).__name__ diff --git a/autogpt_platform/backend/backend/server/rest_api.py b/autogpt_platform/backend/backend/server/rest_api.py index b572d8406002..58e7b9c6d9a7 100644 --- a/autogpt_platform/backend/backend/server/rest_api.py +++ b/autogpt_platform/backend/backend/server/rest_api.py @@ -29,6 +29,7 @@ import backend.server.v2.builder import backend.server.v2.builder.routes import backend.server.v2.chat.routes as chat_routes +import backend.server.v2.executions.review.routes import backend.server.v2.library.db import backend.server.v2.library.model import backend.server.v2.library.routes @@ -286,6 +287,11 @@ async def validation_error_handler( tags=["v2", "turnstile"], prefix="/api/turnstile", ) +app.include_router( + backend.server.v2.executions.review.routes.router, + tags=["v2", "executions"], + prefix="/api/executions", +) app.include_router( backend.server.routers.postmark.postmark.router, diff --git a/autogpt_platform/backend/backend/server/v2/executions/__init__.py b/autogpt_platform/backend/backend/server/v2/executions/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/autogpt_platform/backend/backend/server/v2/executions/review/__init__.py b/autogpt_platform/backend/backend/server/v2/executions/review/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/autogpt_platform/backend/backend/server/v2/executions/review/model.py b/autogpt_platform/backend/backend/server/v2/executions/review/model.py new file mode 100644 index 000000000000..5564c77c0662 --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/executions/review/model.py @@ -0,0 +1,41 @@ +from datetime import datetime +from typing import Any, Literal + +from pydantic import BaseModel, Field + + +class PendingHumanReviewResponse(BaseModel): + """Response model for pending human review.""" + + id: str = Field(description="Unique ID of the pending review") + user_id: str = Field(description="User ID associated with the review") + node_exec_id: str = Field(description="Node execution ID") + graph_exec_id: str = Field(description="Graph execution ID") + graph_id: str = Field(description="Graph ID") + graph_version: int = Field(description="Graph version") + data: Any = Field(description="Data waiting for review") + status: Literal["WAITING", "APPROVED", "REJECTED"] = Field( + description="Review status" + ) + review_message: str | None = Field( + description="Optional message from the reviewer", default=None + ) + created_at: datetime = Field(description="When the review was created") + updated_at: datetime | None = Field( + description="When the review was last updated", default=None + ) + reviewed_at: datetime | None = Field( + description="When the review was completed", default=None + ) + + +class ReviewActionRequest(BaseModel): + """Request model for reviewing data.""" + + action: Literal["approve", "reject"] = Field(description="Action to take") + reviewed_data: Any | None = Field( + description="Modified data (only for approve action)", default=None + ) + message: str | None = Field( + description="Optional message from the reviewer", default=None + ) diff --git a/autogpt_platform/backend/backend/server/v2/executions/review/routes.py b/autogpt_platform/backend/backend/server/v2/executions/review/routes.py new file mode 100644 index 000000000000..5d9324e777cb --- /dev/null +++ b/autogpt_platform/backend/backend/server/v2/executions/review/routes.py @@ -0,0 +1,193 @@ +import logging +from datetime import datetime, timezone +from typing import Any, List, Literal, cast + +import autogpt_libs.auth as autogpt_auth_lib +from fastapi import APIRouter, HTTPException, Security, status + +from backend.server.v2.executions.review.model import ( + PendingHumanReviewResponse, + ReviewActionRequest, +) + +logger = logging.getLogger(__name__) + +router = APIRouter( + prefix="/review", + tags=["execution-review", "private"], + dependencies=[Security(autogpt_auth_lib.requires_user)], +) + + +@router.get( + "/pending", + summary="Get Pending Reviews", + responses={ + 500: {"description": "Server error", "content": {"application/json": {}}}, + }, +) +async def list_pending_reviews( + user_id: str = Security(autogpt_auth_lib.get_user_id), +) -> List[PendingHumanReviewResponse]: + """Get all pending reviews for the current user.""" + from prisma.models import PendingHumanReview + + reviews = await PendingHumanReview.prisma().find_many( + where={"userId": user_id, "status": "WAITING"}, + order={"createdAt": "desc"}, + ) + + return [ + PendingHumanReviewResponse( + id=review.id, + user_id=review.userId, + node_exec_id=review.nodeExecId, + graph_exec_id=review.graphExecId, + graph_id=review.graphId, + graph_version=review.graphVersion, + data=review.data, + status=cast(Literal["WAITING", "APPROVED", "REJECTED"], review.status), + review_message=review.reviewMessage, + created_at=review.createdAt, + updated_at=review.updatedAt, + reviewed_at=review.reviewedAt, + ) + for review in reviews + ] + + +@router.get( + "/execution/{graph_exec_id}", + summary="Get Pending Reviews for Execution", + responses={ + 500: {"description": "Server error", "content": {"application/json": {}}}, + }, +) +async def list_pending_reviews_for_execution( + graph_exec_id: str, + user_id: str = Security(autogpt_auth_lib.get_user_id), +) -> List[PendingHumanReviewResponse]: + """Get all pending reviews for a specific graph execution.""" + from prisma.models import PendingHumanReview + + reviews = await PendingHumanReview.prisma().find_many( + where={"userId": user_id, "graphExecId": graph_exec_id, "status": "WAITING"}, + order={"createdAt": "asc"}, + ) + + return [ + PendingHumanReviewResponse( + id=review.id, + user_id=review.userId, + node_exec_id=review.nodeExecId, + graph_exec_id=review.graphExecId, + graph_id=review.graphId, + graph_version=review.graphVersion, + data=review.data, + status=cast(Literal["WAITING", "APPROVED", "REJECTED"], review.status), + review_message=review.reviewMessage, + created_at=review.createdAt, + updated_at=review.updatedAt, + reviewed_at=review.reviewedAt, + ) + for review in reviews + ] + + +@router.post( + "/{review_id}/action", + summary="Review Data", + responses={ + 404: {"description": "Review not found"}, + 500: {"description": "Server error", "content": {"application/json": {}}}, + }, +) +async def review_data( + review_id: str, + request: ReviewActionRequest, + user_id: str = Security(autogpt_auth_lib.get_user_id), +) -> dict[str, Any]: + """Approve or reject pending review data.""" + from prisma.models import PendingHumanReview + + # Find the review and verify ownership + review = await PendingHumanReview.prisma().find_unique(where={"id": review_id}) + + if not review: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Review not found" + ) + + if review.userId != user_id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, detail="Access denied" + ) + + if review.status != "WAITING": + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Review is already {review.status.lower()}", + ) + + # Update the review + now = datetime.now(timezone.utc) + update_status = "APPROVED" if request.action == "approve" else "REJECTED" + + # Handle reviewed_data for approve action + review_data = review.data + if request.action == "approve" and request.reviewed_data is not None: + # Update only the data part while preserving the structure + if isinstance(review.data, dict) and "data" in review.data: + review_data = { + **review.data, + "data": request.reviewed_data, # Update just the data field + } + else: + # Fallback: replace entire data + review_data = request.reviewed_data + + # Simple update without complex type handling + from backend.util.json import SafeJson + + await PendingHumanReview.prisma().update( + where={"id": review_id}, + data={ + "status": update_status, + "data": SafeJson(review_data), # Store the (possibly modified) data + "reviewMessage": request.message, + "reviewedAt": now, + }, + ) + + # If approved, trigger graph execution resume + if request.action == "approve": + await _resume_graph_execution(review.graphExecId) + + return {"status": "success", "action": request.action} + + +async def _resume_graph_execution(graph_exec_id: str) -> None: + """Resume a graph execution by updating its status.""" + try: + from backend.data.execution import ExecutionStatus, update_graph_execution_stats + from backend.util.clients import get_database_manager_async_client + + # Get the graph execution details + db = get_database_manager_async_client() + graph_exec = await db.get_graph_execution_meta( + user_id="", execution_id=graph_exec_id # We'll validate user_id separately + ) + + if not graph_exec: + logger.error(f"Graph execution {graph_exec_id} not found") + return + + # Update the graph execution status to QUEUED so the scheduler picks it up + await update_graph_execution_stats( + graph_exec_id=graph_exec_id, status=ExecutionStatus.QUEUED + ) + + logger.info(f"Resumed graph execution {graph_exec_id}") + + except Exception as e: + logger.error(f"Failed to resume graph execution {graph_exec_id}: {e}") diff --git a/autogpt_platform/backend/schema.prisma b/autogpt_platform/backend/schema.prisma index e8755d99ab73..e91f5ce48856 100644 --- a/autogpt_platform/backend/schema.prisma +++ b/autogpt_platform/backend/schema.prisma @@ -59,6 +59,7 @@ model User { APIKeys APIKey[] IntegrationWebhooks IntegrationWebhook[] NotificationBatches UserNotificationBatch[] + PendingHumanReviews PendingHumanReview[] } enum OnboardingStep { @@ -351,6 +352,7 @@ enum AgentExecutionStatus { COMPLETED TERMINATED FAILED + WAITING_FOR_REVIEW } // This model describes the execution of an AgentGraph. @@ -393,6 +395,8 @@ model AgentGraphExecution { shareToken String? @unique sharedAt DateTime? + PendingHumanReviews PendingHumanReview[] + @@index([agentGraphId, agentGraphVersion]) @@index([userId, isDeleted, createdAt]) @@index([createdAt]) @@ -423,6 +427,8 @@ model AgentNodeExecution { stats Json? + PendingHumanReview PendingHumanReview? + @@index([agentGraphExecutionId, agentNodeId, executionStatus]) @@index([agentNodeId, executionStatus]) @@index([addedTime, queuedTime]) @@ -464,6 +470,30 @@ model AgentNodeExecutionKeyValueData { @@id([userId, key]) } +// Pending human reviews for Human-in-the-loop blocks +model PendingHumanReview { + id String @id @default(uuid()) + userId String + nodeExecId String + graphExecId String + graphId String + graphVersion Int + data Json // The data waiting for review + status String @default("WAITING") // WAITING, APPROVED, REJECTED + reviewMessage String? // Optional message from the reviewer + createdAt DateTime @default(now()) + updatedAt DateTime? @updatedAt + reviewedAt DateTime? + + User User @relation(fields: [userId], references: [id], onDelete: Cascade) + NodeExecution AgentNodeExecution @relation(fields: [nodeExecId], references: [id], onDelete: Cascade) + GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade) + + @@unique([nodeExecId]) // One pending review per node execution + @@index([userId, status]) + @@index([graphExecId, status]) +} + // Webhook that is registered with a provider and propagates to one or more nodes model IntegrationWebhook { id String @id @default(uuid()) diff --git a/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/Flow/Flow.tsx b/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/Flow/Flow.tsx index 8d4963dd63cf..beb1acc1fd3c 100644 --- a/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/Flow/Flow.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/Flow/Flow.tsx @@ -13,8 +13,13 @@ import { BuilderActions } from "../../BuilderActions/BuilderActions"; import { RunningBackground } from "./components/RunningBackground"; import { useGraphStore } from "../../../stores/graphStore"; import { useCopyPaste } from "./useCopyPaste"; +import { FloatingReviewsPanel } from "@/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel"; +import { useSearchParams } from "next/navigation"; export const Flow = () => { + const searchParams = useSearchParams(); + const flowExecutionID = searchParams.get("flowExecutionID") || undefined; + const nodes = useNodeStore(useShallow((state) => state.nodes)); const onNodesChange = useNodeStore( useShallow((state) => state.onNodesChange), @@ -69,6 +74,7 @@ export const Flow = () => { {isGraphRunning && } + ); }; diff --git a/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/nodes/CustomNode/components/NodeExecutionBadge.tsx b/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/nodes/CustomNode/components/NodeExecutionBadge.tsx index d47ba4ea7d0f..f2e5fce9a6c2 100644 --- a/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/nodes/CustomNode/components/NodeExecutionBadge.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/nodes/CustomNode/components/NodeExecutionBadge.tsx @@ -7,6 +7,7 @@ const statusStyles: Record = { INCOMPLETE: "text-slate-700 border-slate-400", QUEUED: "text-blue-700 border-blue-400", RUNNING: "text-amber-700 border-amber-400", + WAITING_FOR_REVIEW: "text-purple-700 border-purple-400", COMPLETED: "text-green-700 border-green-400", TERMINATED: "text-orange-700 border-orange-400", FAILED: "text-red-700 border-red-400", @@ -22,7 +23,7 @@ export const NodeExecutionBadge = ({ - {status} + {status === "WAITING_FOR_REVIEW" ? "Waiting for Review" : status} {status === AgentExecutionStatus.RUNNING && ( )} diff --git a/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/nodes/CustomNode/helpers.ts b/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/nodes/CustomNode/helpers.ts index 554746882849..8583689904ad 100644 --- a/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/nodes/CustomNode/helpers.ts +++ b/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/nodes/CustomNode/helpers.ts @@ -4,6 +4,7 @@ export const nodeStyleBasedOnStatus: Record = { INCOMPLETE: "ring-slate-300 bg-slate-300", QUEUED: " ring-blue-300 bg-blue-300", RUNNING: "ring-amber-300 bg-amber-300", + WAITING_FOR_REVIEW: "ring-purple-300 bg-purple-300", COMPLETED: "ring-green-300 bg-green-300", TERMINATED: "ring-orange-300 bg-orange-300 ", FAILED: "ring-red-300 bg-red-300", diff --git a/autogpt_platform/frontend/src/app/(platform)/build/components/legacy-builder/Flow/Flow.tsx b/autogpt_platform/frontend/src/app/(platform)/build/components/legacy-builder/Flow/Flow.tsx index 52f7564f837a..6e0e57cb9074 100644 --- a/autogpt_platform/frontend/src/app/(platform)/build/components/legacy-builder/Flow/Flow.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/build/components/legacy-builder/Flow/Flow.tsx @@ -64,6 +64,7 @@ import { useCopyPaste } from "../useCopyPaste"; import NewControlPanel from "@/app/(platform)/build/components/NewControlPanel/NewControlPanel"; import { Flag, useGetFlag } from "@/services/feature-flags/use-get-flag"; import { BuildActionBar } from "../BuildActionBar"; +import { FloatingReviewsPanel } from "@/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel"; // This is for the history, this is the minimum distance a block must move before it is logged // It helps to prevent spamming the history with small movements especially when pressing on a input in a block @@ -1008,6 +1009,10 @@ const FlowEditor: React.FC<{ saveAndRun={saveAndRun} /> )} + = { ), + WAITING_FOR_REVIEW: ( + + + + ), COMPLETED: ( diff --git a/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/AgentRunsView/components/SelectedRunView/SelectedRunView.tsx b/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/AgentRunsView/components/SelectedRunView/SelectedRunView.tsx index 0ab027d99fc7..35d03e87668a 100644 --- a/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/AgentRunsView/components/SelectedRunView/SelectedRunView.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/AgentRunsView/components/SelectedRunView/SelectedRunView.tsx @@ -15,6 +15,8 @@ import { Skeleton } from "@/components/__legacy__/ui/skeleton"; import { AgentInputsReadOnly } from "../AgentInputsReadOnly/AgentInputsReadOnly"; import { RunDetailCard } from "../RunDetailCard/RunDetailCard"; import { RunOutputs } from "./components/RunOutputs"; +import { PendingReviewsList } from "@/components/organisms/PendingReviewsList/PendingReviewsList"; +import { usePendingReviewsForExecution } from "@/hooks/usePendingReviews"; interface Props { agent: LibraryAgent; @@ -34,6 +36,12 @@ export function SelectedRunView({ runId, ); + const { + pendingReviews, + isLoading: reviewsLoading, + refetch: refetchReviews, + } = usePendingReviewsForExecution(runId); + if (responseError || httpError) { return ( Output Your input + {pendingReviews.length > 0 && ( + + Reviews ({pendingReviews.length}) + + )} @@ -92,6 +105,22 @@ export function SelectedRunView({ /> + + {pendingReviews.length > 0 && ( + + + {reviewsLoading ? ( +
Loading reviews…
+ ) : ( + + )} +
+
+ )} ); diff --git a/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/AgentRunsView/components/SelectedRunView/components/RunStatusBadge.tsx b/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/AgentRunsView/components/SelectedRunView/components/RunStatusBadge.tsx index f9534541c6f5..ad9ea284543f 100644 --- a/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/AgentRunsView/components/SelectedRunView/components/RunStatusBadge.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/AgentRunsView/components/SelectedRunView/components/RunStatusBadge.tsx @@ -36,6 +36,11 @@ const statusIconMap: Record = { bgColor: "bg-yellow-50", textColor: "!text-yellow-700", }, + WAITING_FOR_REVIEW: { + icon: , + bgColor: "bg-blue-50", + textColor: "!text-blue-700", + }, COMPLETED: { icon: ( @@ -72,7 +77,9 @@ export function RunStatusBadge({ status }: Props) { variant="small-medium" className={cn(statusIconMap[status].textColor, "capitalize")} > - {status.toLowerCase()} + {status === "WAITING_FOR_REVIEW" + ? "Waiting for Review" + : status.toLowerCase()} ); diff --git a/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/OldAgentLibraryView/components/agent-run-status-chip.tsx b/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/OldAgentLibraryView/components/agent-run-status-chip.tsx index 46bd50d26ce5..16ed84268e43 100644 --- a/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/OldAgentLibraryView/components/agent-run-status-chip.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/OldAgentLibraryView/components/agent-run-status-chip.tsx @@ -23,6 +23,7 @@ export const agentRunStatusMap: Record< QUEUED: "queued", RUNNING: "running", TERMINATED: "stopped", + WAITING_FOR_REVIEW: "queued", // Map to queued for now // TODO: implement "draft" - https://github.com/Significant-Gravitas/AutoGPT/issues/9168 }; diff --git a/autogpt_platform/frontend/src/app/api/openapi.json b/autogpt_platform/frontend/src/app/api/openapi.json index fa5b225c4692..b00d1e2a5015 100644 --- a/autogpt_platform/frontend/src/app/api/openapi.json +++ b/autogpt_platform/frontend/src/app/api/openapi.json @@ -4753,6 +4753,141 @@ } } }, + "/api/executions/review/pending": { + "get": { + "tags": ["v2", "executions", "execution-review", "private"], + "summary": "Get Pending Reviews", + "description": "Get all pending reviews for the current user.", + "operationId": "getV2Get pending reviews", + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "items": { + "$ref": "#/components/schemas/PendingHumanReviewResponse" + }, + "type": "array", + "title": "Response Getv2Get Pending Reviews" + } + } + } + }, + "500": { + "description": "Server error", + "content": { "application/json": {} } + }, + "401": { + "$ref": "#/components/responses/HTTP401NotAuthenticatedError" + } + }, + "security": [{ "HTTPBearerJWT": [] }] + } + }, + "/api/executions/review/execution/{graph_exec_id}": { + "get": { + "tags": ["v2", "executions", "execution-review", "private"], + "summary": "Get Pending Reviews for Execution", + "description": "Get all pending reviews for a specific graph execution.", + "operationId": "getV2Get pending reviews for execution", + "security": [{ "HTTPBearerJWT": [] }], + "parameters": [ + { + "name": "graph_exec_id", + "in": "path", + "required": true, + "schema": { "type": "string", "title": "Graph Exec Id" } + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/PendingHumanReviewResponse" + }, + "title": "Response Getv2Get Pending Reviews For Execution" + } + } + } + }, + "500": { + "description": "Server error", + "content": { "application/json": {} } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { "$ref": "#/components/schemas/HTTPValidationError" } + } + } + }, + "401": { + "$ref": "#/components/responses/HTTP401NotAuthenticatedError" + } + } + } + }, + "/api/executions/review/{review_id}/action": { + "post": { + "tags": ["v2", "executions", "execution-review", "private"], + "summary": "Review Data", + "description": "Approve or reject pending review data.", + "operationId": "postV2Review data", + "security": [{ "HTTPBearerJWT": [] }], + "parameters": [ + { + "name": "review_id", + "in": "path", + "required": true, + "schema": { "type": "string", "title": "Review Id" } + } + ], + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { "$ref": "#/components/schemas/ReviewActionRequest" } + } + } + }, + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "type": "object", + "additionalProperties": true, + "title": "Response Postv2Review Data" + } + } + } + }, + "404": { "description": "Review not found" }, + "500": { + "description": "Server error", + "content": { "application/json": {} } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { "$ref": "#/components/schemas/HTTPValidationError" } + } + } + }, + "401": { + "$ref": "#/components/responses/HTTP401NotAuthenticatedError" + } + } + } + }, "/api/email/unsubscribe": { "post": { "tags": ["v1", "email"], @@ -5151,7 +5286,8 @@ "RUNNING", "COMPLETED", "TERMINATED", - "FAILED" + "FAILED", + "WAITING_FOR_REVIEW" ], "title": "AgentExecutionStatus" }, @@ -7307,6 +7443,88 @@ "required": ["total_items", "total_pages", "current_page", "page_size"], "title": "Pagination" }, + "PendingHumanReviewResponse": { + "properties": { + "id": { + "type": "string", + "title": "Id", + "description": "Unique ID of the pending review" + }, + "user_id": { + "type": "string", + "title": "User Id", + "description": "User ID associated with the review" + }, + "node_exec_id": { + "type": "string", + "title": "Node Exec Id", + "description": "Node execution ID" + }, + "graph_exec_id": { + "type": "string", + "title": "Graph Exec Id", + "description": "Graph execution ID" + }, + "graph_id": { + "type": "string", + "title": "Graph Id", + "description": "Graph ID" + }, + "graph_version": { + "type": "integer", + "title": "Graph Version", + "description": "Graph version" + }, + "data": { "title": "Data", "description": "Data waiting for review" }, + "status": { + "type": "string", + "enum": ["WAITING", "APPROVED", "REJECTED"], + "title": "Status", + "description": "Review status" + }, + "review_message": { + "anyOf": [{ "type": "string" }, { "type": "null" }], + "title": "Review Message", + "description": "Optional message from the reviewer" + }, + "created_at": { + "type": "string", + "format": "date-time", + "title": "Created At", + "description": "When the review was created" + }, + "updated_at": { + "anyOf": [ + { "type": "string", "format": "date-time" }, + { "type": "null" } + ], + "title": "Updated At", + "description": "When the review was last updated" + }, + "reviewed_at": { + "anyOf": [ + { "type": "string", "format": "date-time" }, + { "type": "null" } + ], + "title": "Reviewed At", + "description": "When the review was completed" + } + }, + "type": "object", + "required": [ + "id", + "user_id", + "node_exec_id", + "graph_exec_id", + "graph_id", + "graph_version", + "data", + "status", + "created_at" + ], + "title": "PendingHumanReviewResponse", + "description": "Response model for pending human review." + }, "PostmarkBounceEnum": { "type": "integer", "enum": [ @@ -7776,6 +7994,30 @@ "required": ["credit_amount"], "title": "RequestTopUp" }, + "ReviewActionRequest": { + "properties": { + "action": { + "type": "string", + "enum": ["approve", "reject"], + "title": "Action", + "description": "Action to take" + }, + "reviewed_data": { + "anyOf": [{}, { "type": "null" }], + "title": "Reviewed Data", + "description": "Modified data (only for approve action)" + }, + "message": { + "anyOf": [{ "type": "string" }, { "type": "null" }], + "title": "Message", + "description": "Optional message from the reviewer" + } + }, + "type": "object", + "required": ["action"], + "title": "ReviewActionRequest", + "description": "Request model for reviewing data." + }, "ReviewSubmissionRequest": { "properties": { "store_listing_version_id": { diff --git a/autogpt_platform/frontend/src/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel.tsx b/autogpt_platform/frontend/src/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel.tsx new file mode 100644 index 000000000000..bfc7b2fdccc5 --- /dev/null +++ b/autogpt_platform/frontend/src/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel.tsx @@ -0,0 +1,93 @@ +import { useState } from "react"; +import { PendingReviewsList } from "@/components/organisms/PendingReviewsList/PendingReviewsList"; +import { usePendingReviewsForExecution } from "@/hooks/usePendingReviews"; +import { Button } from "@/components/atoms/Button/Button"; +import { ClockIcon, XIcon } from "@phosphor-icons/react"; +import { cn } from "@/lib/utils"; +import { Text } from "@/components/atoms/Text/Text"; + +interface FloatingReviewsPanelProps { + executionId?: string; + className?: string; +} + +export function FloatingReviewsPanel({ + executionId, + className, +}: FloatingReviewsPanelProps) { + const [isOpen, setIsOpen] = useState(false); + + const { pendingReviews, isLoading, refetch } = usePendingReviewsForExecution( + executionId || "", + ); + + // Don't show anything if there's no execution ID or no pending reviews + if (!executionId || (!isLoading && pendingReviews.length === 0)) { + return null; + } + + function handleReviewComplete() { + refetch(); + // Close panel if no more reviews + if (pendingReviews.length <= 1) { + setIsOpen(false); + } + } + + return ( +
+ {/* Trigger Button */} + {!isOpen && pendingReviews.length > 0 && ( + + )} + + {/* Reviews Panel */} + {isOpen && ( +
+ {/* Header */} +
+
+ + Pending Reviews +
+ +
+ + {/* Content */} +
+ {isLoading ? ( +
+ + Loading reviews... + +
+ ) : ( + + )} +
+
+ )} +
+ ); +} diff --git a/autogpt_platform/frontend/src/components/organisms/PendingReviewCard/PendingReviewCard.tsx b/autogpt_platform/frontend/src/components/organisms/PendingReviewCard/PendingReviewCard.tsx new file mode 100644 index 000000000000..145bef6a45d0 --- /dev/null +++ b/autogpt_platform/frontend/src/components/organisms/PendingReviewCard/PendingReviewCard.tsx @@ -0,0 +1,169 @@ +import { useState } from "react"; +import { PendingHumanReviewResponse } from "@/app/api/__generated__/models/pendingHumanReviewResponse"; +import { ReviewActionRequest } from "@/app/api/__generated__/models/reviewActionRequest"; +import { Button } from "@/components/atoms/Button/Button"; +import { Text } from "@/components/atoms/Text/Text"; +import { Card } from "@/components/atoms/Card/Card"; +import { Textarea } from "@/components/__legacy__/ui/textarea"; +import { useToast } from "@/components/molecules/Toast/use-toast"; +import { CheckIcon, XIcon } from "@phosphor-icons/react"; +import { usePostV2ReviewData } from "@/app/api/__generated__/endpoints/execution-review/execution-review"; + +interface PendingReviewCardProps { + review: PendingHumanReviewResponse; + onReviewComplete?: () => void; +} + +export function PendingReviewCard({ + review, + onReviewComplete, +}: PendingReviewCardProps) { + const [reviewData, setReviewData] = useState( + typeof review.data === "object" && review.data && "data" in review.data + ? JSON.stringify((review.data as any).data, null, 2) + : JSON.stringify(review.data, null, 2), + ); + const [reviewMessage, setReviewMessage] = useState(""); + const { toast } = useToast(); + + const reviewActionMutation = usePostV2ReviewData({ + mutation: { + onSuccess: () => { + toast({ + title: "Review submitted successfully", + variant: "default", + }); + onReviewComplete?.(); + }, + onError: (error: any) => { + toast({ + title: "Failed to submit review", + description: error.message || "An error occurred", + variant: "destructive", + }); + }, + }, + }); + + function handleApprove() { + let parsedData; + try { + parsedData = JSON.parse(reviewData); + } catch (_error) { + toast({ + title: "Invalid JSON", + description: "Please fix the JSON format before approving", + variant: "destructive", + }); + return; + } + + const requestData: ReviewActionRequest = { + action: "approve", + reviewed_data: parsedData, + message: reviewMessage || undefined, + }; + + reviewActionMutation.mutate({ + reviewId: review.id, + data: requestData, + }); + } + + function handleReject() { + const requestData: ReviewActionRequest = { + action: "reject", + message: reviewMessage || "Rejected by user", + }; + + reviewActionMutation.mutate({ + reviewId: review.id, + data: requestData, + }); + } + + return ( + +
+
+ Pending Review + + {new Date(review.created_at).toLocaleString()} + +
+ {/* Review Message */} + {typeof review.data === "object" && + review.data && + "message" in review.data && + (review.data as any).message && ( +
+ + Instructions: + + {(review.data as any).message} +
+ )} + + {/* Data Editor */} +
+ + Data to Review: + +