diff --git a/autogpt_platform/backend/backend/blocks/agent.py b/autogpt_platform/backend/backend/blocks/agent.py index 1abb5ade5319..0efc0a336980 100644 --- a/autogpt_platform/backend/backend/blocks/agent.py +++ b/autogpt_platform/backend/backend/blocks/agent.py @@ -11,7 +11,7 @@ BlockType, get_block, ) -from backend.data.execution import ExecutionStatus, NodesInputMasks +from backend.data.execution import ExecutionContext, ExecutionStatus, NodesInputMasks from backend.data.model import NodeExecutionStats, SchemaField from backend.util.json import validate_with_jsonschema from backend.util.retry import func_retry @@ -72,9 +72,9 @@ async def run( input_data: Input, *, graph_exec_id: str, + execution_context: ExecutionContext, **kwargs, ) -> BlockOutput: - from backend.executor import utils as execution_utils graph_exec = await execution_utils.add_graph_execution( @@ -83,8 +83,9 @@ async def run( user_id=input_data.user_id, inputs=input_data.inputs, nodes_input_masks=input_data.nodes_input_masks, - parent_graph_exec_id=graph_exec_id, - is_sub_graph=True, # AgentExecutorBlock executions are always sub-graphs + execution_context=execution_context.model_copy( + update={"parent_execution_id": graph_exec_id}, + ), ) logger = execution_utils.LogMetadata( diff --git a/autogpt_platform/backend/backend/blocks/human_in_the_loop.py b/autogpt_platform/backend/backend/blocks/human_in_the_loop.py index 1dd5dbac9deb..42c98b514646 100644 --- a/autogpt_platform/backend/backend/blocks/human_in_the_loop.py +++ b/autogpt_platform/backend/backend/blocks/human_in_the_loop.py @@ -9,8 +9,9 @@ BlockOutput, BlockSchemaInput, BlockSchemaOutput, + BlockType, ) -from backend.data.execution import ExecutionStatus +from backend.data.execution import ExecutionContext, ExecutionStatus from backend.data.human_review import ReviewResult from backend.data.model import SchemaField from backend.executor.manager import async_update_node_execution_status @@ -61,15 +62,15 @@ def __init__(self): categories={BlockCategory.BASIC}, input_schema=HumanInTheLoopBlock.Input, output_schema=HumanInTheLoopBlock.Output, + block_type=BlockType.HUMAN_IN_THE_LOOP, test_input={ "data": {"name": "John Doe", "age": 30}, "name": "User profile data", "editable": True, }, test_output=[ - ("reviewed_data", {"name": "John Doe", "age": 30}), ("status", "approved"), - ("review_message", ""), + ("reviewed_data", {"name": "John Doe", "age": 30}), ], test_mock={ "get_or_create_human_review": lambda *_args, **_kwargs: ReviewResult( @@ -80,9 +81,25 @@ def __init__(self): node_exec_id="test-node-exec-id", ), "update_node_execution_status": lambda *_args, **_kwargs: None, + "update_review_processed_status": lambda *_args, **_kwargs: None, }, ) + async def get_or_create_human_review(self, **kwargs): + return await get_database_manager_async_client().get_or_create_human_review( + **kwargs + ) + + async def update_node_execution_status(self, **kwargs): + return await async_update_node_execution_status( + db_client=get_database_manager_async_client(), **kwargs + ) + + async def update_review_processed_status(self, node_exec_id: str, processed: bool): + return await get_database_manager_async_client().update_review_processed_status( + node_exec_id, processed + ) + async def run( self, input_data: Input, @@ -92,20 +109,20 @@ async def run( graph_exec_id: str, graph_id: str, graph_version: int, + execution_context: ExecutionContext, **kwargs, ) -> BlockOutput: - """ - Execute the Human In The Loop block. + if not execution_context.safe_mode: + logger.info( + f"HITL block skipping review for node {node_exec_id} - safe mode disabled" + ) + yield "status", "approved" + yield "reviewed_data", input_data.data + yield "review_message", "Auto-approved (safe mode disabled)" + return - This method uses one function to handle the complete workflow - checking existing reviews - and creating pending ones as needed. - """ try: - logger.debug(f"HITL block executing for node {node_exec_id}") - - # Use the data layer to handle the complete workflow - db_client = get_database_manager_async_client() - result = await db_client.get_or_create_human_review( + result = await self.get_or_create_human_review( user_id=user_id, node_exec_id=node_exec_id, graph_exec_id=graph_exec_id, @@ -119,21 +136,15 @@ async def run( logger.error(f"Error in HITL block for node {node_exec_id}: {str(e)}") raise - # Check if we're waiting for human input if result is None: logger.info( f"HITL block pausing execution for node {node_exec_id} - awaiting human review" ) try: - # Set node status to REVIEW so execution manager can't mark it as COMPLETED - # The VALID_STATUS_TRANSITIONS will then prevent any unwanted status changes - # Use the proper wrapper function to ensure websocket events are published - await async_update_node_execution_status( - db_client=db_client, + await self.update_node_execution_status( exec_id=node_exec_id, status=ExecutionStatus.REVIEW, ) - # Execution pauses here until API routes process the review return except Exception as e: logger.error( @@ -141,10 +152,8 @@ async def run( ) raise - # Review is complete (approved or rejected) - check if unprocessed if not result.processed: - # Mark as processed before yielding - await db_client.update_review_processed_status( + await self.update_review_processed_status( node_exec_id=node_exec_id, processed=True ) diff --git a/autogpt_platform/backend/backend/blocks/time_blocks.py b/autogpt_platform/backend/backend/blocks/time_blocks.py index 611dfa8281d2..3a1f4c678e93 100644 --- a/autogpt_platform/backend/backend/blocks/time_blocks.py +++ b/autogpt_platform/backend/backend/blocks/time_blocks.py @@ -14,7 +14,7 @@ BlockSchemaInput, BlockSchemaOutput, ) -from backend.data.execution import UserContext +from backend.data.execution import ExecutionContext from backend.data.model import SchemaField # Shared timezone literal type for all time/date blocks @@ -188,10 +188,9 @@ def __init__(self): ) async def run( - self, input_data: Input, *, user_context: UserContext, **kwargs + self, input_data: Input, *, execution_context: ExecutionContext, **kwargs ) -> BlockOutput: - # Extract timezone from user_context (always present) - effective_timezone = user_context.timezone + effective_timezone = execution_context.user_timezone # Get the appropriate timezone tz = _get_timezone(input_data.format_type, effective_timezone) @@ -298,10 +297,10 @@ def __init__(self): ], ) - async def run(self, input_data: Input, **kwargs) -> BlockOutput: - # Extract timezone from user_context (required keyword argument) - user_context: UserContext = kwargs["user_context"] - effective_timezone = user_context.timezone + async def run( + self, input_data: Input, *, execution_context: ExecutionContext, **kwargs + ) -> BlockOutput: + effective_timezone = execution_context.user_timezone try: offset = int(input_data.offset) @@ -404,10 +403,10 @@ def __init__(self): ], ) - async def run(self, input_data: Input, **kwargs) -> BlockOutput: - # Extract timezone from user_context (required keyword argument) - user_context: UserContext = kwargs["user_context"] - effective_timezone = user_context.timezone + async def run( + self, input_data: Input, *, execution_context: ExecutionContext, **kwargs + ) -> BlockOutput: + effective_timezone = execution_context.user_timezone # Get the appropriate timezone tz = _get_timezone(input_data.format_type, effective_timezone) diff --git a/autogpt_platform/backend/backend/data/block.py b/autogpt_platform/backend/backend/data/block.py index 2f05f444ae70..762e9b37ef53 100644 --- a/autogpt_platform/backend/backend/data/block.py +++ b/autogpt_platform/backend/backend/data/block.py @@ -71,6 +71,7 @@ class BlockType(Enum): AGENT = "Agent" AI = "AI" AYRSHARE = "Ayrshare" + HUMAN_IN_THE_LOOP = "Human In The Loop" class BlockCategory(Enum): @@ -796,3 +797,12 @@ def get_io_block_ids() -> Sequence[str]: for id, B in get_blocks().items() if B().block_type in (BlockType.INPUT, BlockType.OUTPUT) ] + + +@cached(ttl_seconds=3600) +def get_human_in_the_loop_block_ids() -> Sequence[str]: + return [ + id + for id, B in get_blocks().items() + if B().block_type == BlockType.HUMAN_IN_THE_LOOP + ] diff --git a/autogpt_platform/backend/backend/data/credit_test.py b/autogpt_platform/backend/backend/data/credit_test.py index 6f604975cfd4..391a373b86a7 100644 --- a/autogpt_platform/backend/backend/data/credit_test.py +++ b/autogpt_platform/backend/backend/data/credit_test.py @@ -7,7 +7,7 @@ from backend.blocks.llm import AITextGeneratorBlock from backend.data.block import get_block from backend.data.credit import BetaUserCredit, UsageTransactionMetadata -from backend.data.execution import NodeExecutionEntry, UserContext +from backend.data.execution import ExecutionContext, NodeExecutionEntry from backend.data.user import DEFAULT_USER_ID from backend.executor.utils import block_usage_cost from backend.integrations.credentials_store import openai_credentials @@ -86,7 +86,7 @@ async def test_block_credit_usage(server: SpinTestServer): "type": openai_credentials.type, }, }, - user_context=UserContext(timezone="UTC"), + execution_context=ExecutionContext(user_timezone="UTC"), ), ) assert spending_amount_1 > 0 @@ -101,7 +101,7 @@ async def test_block_credit_usage(server: SpinTestServer): node_exec_id="test_node_exec", block_id=AITextGeneratorBlock().id, inputs={"model": "gpt-4-turbo", "api_key": "owned_api_key"}, - user_context=UserContext(timezone="UTC"), + execution_context=ExecutionContext(user_timezone="UTC"), ), ) assert spending_amount_2 == 0 diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index b78633cf5895..c7c54ef26810 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -71,6 +71,18 @@ config = Config() +class ExecutionContext(BaseModel): + """ + Unified context that carries execution-level data throughout the entire execution flow. + This includes information needed by blocks, sub-graphs, and execution management. + """ + + safe_mode: bool = True + user_timezone: str = "UTC" + root_execution_id: Optional[str] = None + parent_execution_id: Optional[str] = None + + # -------------------------- Models -------------------------- # @@ -365,9 +377,8 @@ def from_db(_graph_exec: AgentGraphExecution): def to_graph_execution_entry( self, - user_context: "UserContext", + execution_context: ExecutionContext, compiled_nodes_input_masks: Optional[NodesInputMasks] = None, - parent_graph_exec_id: Optional[str] = None, ): return GraphExecutionEntry( user_id=self.user_id, @@ -375,8 +386,7 @@ def to_graph_execution_entry( graph_version=self.graph_version or 0, graph_exec_id=self.id, nodes_input_masks=compiled_nodes_input_masks, - user_context=user_context, - parent_graph_exec_id=parent_graph_exec_id, + execution_context=execution_context, ) @@ -449,7 +459,7 @@ def from_db(_node_exec: AgentNodeExecution, user_id: Optional[str] = None): ) def to_node_execution_entry( - self, user_context: "UserContext" + self, execution_context: ExecutionContext ) -> "NodeExecutionEntry": return NodeExecutionEntry( user_id=self.user_id, @@ -460,7 +470,7 @@ def to_node_execution_entry( node_id=self.node_id, block_id=self.block_id, inputs=self.input_data, - user_context=user_context, + execution_context=execution_context, ) @@ -1099,20 +1109,13 @@ async def get_latest_node_execution( # ----------------- Execution Infrastructure ----------------- # -class UserContext(BaseModel): - """Generic user context for graph execution containing user-specific settings.""" - - timezone: str - - class GraphExecutionEntry(BaseModel): user_id: str graph_exec_id: str graph_id: str graph_version: int nodes_input_masks: Optional[NodesInputMasks] = None - user_context: UserContext - parent_graph_exec_id: Optional[str] = None + execution_context: ExecutionContext class NodeExecutionEntry(BaseModel): @@ -1124,7 +1127,7 @@ class NodeExecutionEntry(BaseModel): node_id: str block_id: str inputs: BlockInput - user_context: UserContext + execution_context: ExecutionContext class ExecutionQueue(Generic[T]): diff --git a/autogpt_platform/backend/backend/data/graph.py b/autogpt_platform/backend/backend/data/graph.py index 53993fd7e197..0757a86f4aa2 100644 --- a/autogpt_platform/backend/backend/data/graph.py +++ b/autogpt_platform/backend/backend/data/graph.py @@ -61,6 +61,10 @@ logger = logging.getLogger(__name__) +class GraphSettings(BaseModel): + human_in_the_loop_safe_mode: bool | None = None + + class Link(BaseDbModel): source_id: str sink_id: str @@ -225,6 +229,15 @@ def output_schema(self) -> dict[str, Any]: def has_external_trigger(self) -> bool: return self.webhook_input_node is not None + @computed_field + @property + def has_human_in_the_loop(self) -> bool: + return any( + node.block_id + for node in self.nodes + if node.block.block_type == BlockType.HUMAN_IN_THE_LOOP + ) + @property def webhook_input_node(self) -> Node | None: return next( @@ -1105,6 +1118,28 @@ async def delete_graph(graph_id: str, user_id: str) -> int: return entries_count +async def get_graph_settings(user_id: str, graph_id: str) -> GraphSettings: + lib = await LibraryAgent.prisma().find_first( + where={ + "userId": user_id, + "agentGraphId": graph_id, + "isDeleted": False, + "isArchived": False, + }, + order={"agentGraphVersion": "desc"}, + ) + if not lib or not lib.settings: + return GraphSettings() + + try: + return GraphSettings.model_validate(lib.settings) + except Exception: + logger.warning( + f"Malformed settings for LibraryAgent user={user_id} graph={graph_id}" + ) + return GraphSettings() + + async def validate_graph_execution_permissions( user_id: str, graph_id: str, graph_version: int, is_sub_graph: bool = False ) -> None: diff --git a/autogpt_platform/backend/backend/data/human_review.py b/autogpt_platform/backend/backend/data/human_review.py index 2b0b2dbfb725..df0b4b21e892 100644 --- a/autogpt_platform/backend/backend/data/human_review.py +++ b/autogpt_platform/backend/backend/data/human_review.py @@ -32,32 +32,6 @@ class ReviewResult(BaseModel): node_exec_id: str -async def get_pending_review_by_node_exec_id( - node_exec_id: str, user_id: str -) -> Optional["PendingHumanReviewModel"]: - """ - Get a pending review by node execution ID with user ownership validation. - - Args: - node_exec_id: The node execution ID to check - user_id: The user ID to validate ownership - - Returns: - The existing review if found and owned by the user, None otherwise - """ - review = await PendingHumanReview.prisma().find_first( - where={ - "nodeExecId": node_exec_id, - "userId": user_id, - } - ) - - if review: - return PendingHumanReviewModel.from_db(review) - - return None - - async def get_or_create_human_review( user_id: str, node_exec_id: str, @@ -121,27 +95,17 @@ async def get_or_create_human_review( if review.processed: return None - if review.status == ReviewStatus.APPROVED: - # Return the approved review result - return ReviewResult( - data=review.payload, - status=ReviewStatus.APPROVED, - message=review.reviewMessage or "", - processed=review.processed, - node_exec_id=review.nodeExecId, - ) - elif review.status == ReviewStatus.REJECTED: - # Return the rejected review result + # If pending, return None to continue waiting, otherwise return the review result + if review.status == ReviewStatus.WAITING: + return None + else: return ReviewResult( - data=None, - status=ReviewStatus.REJECTED, + data=review.payload if review.status == ReviewStatus.APPROVED else None, + status=review.status, message=review.reviewMessage or "", processed=review.processed, node_exec_id=review.nodeExecId, ) - else: - # Review is pending - return None to continue waiting - return None async def has_pending_reviews_for_graph_exec(graph_exec_id: str) -> bool: diff --git a/autogpt_platform/backend/backend/data/human_review_test.py b/autogpt_platform/backend/backend/data/human_review_test.py index fe6c9057c14a..c349fdde46bf 100644 --- a/autogpt_platform/backend/backend/data/human_review_test.py +++ b/autogpt_platform/backend/backend/data/human_review_test.py @@ -7,7 +7,6 @@ from backend.data.human_review import ( get_or_create_human_review, - get_pending_review_by_node_exec_id, get_pending_reviews_for_execution, get_pending_reviews_for_user, has_pending_reviews_for_graph_exec, @@ -20,7 +19,7 @@ def sample_db_review(): """Create a sample database review object""" mock_review = Mock() mock_review.nodeExecId = "test_node_123" - mock_review.userId = "test_user" + mock_review.userId = "test-user-123" mock_review.graphExecId = "test_graph_exec_456" mock_review.graphId = "test_graph_789" mock_review.graphVersion = 1 @@ -37,40 +36,6 @@ def sample_db_review(): return mock_review -@pytest.mark.asyncio -async def test_get_pending_review_by_node_exec_id_found( - mocker: pytest_mock.MockFixture, - sample_db_review, -): - """Test finding an existing pending review""" - mock_find_first = mocker.patch( - "backend.data.human_review.PendingHumanReview.prisma" - ) - mock_find_first.return_value.find_first = AsyncMock(return_value=sample_db_review) - - result = await get_pending_review_by_node_exec_id("test_node_123", "test_user") - - assert result is not None - assert result.node_exec_id == "test_node_123" - assert result.user_id == "test_user" - assert result.status == ReviewStatus.WAITING - - -@pytest.mark.asyncio -async def test_get_pending_review_by_node_exec_id_not_found( - mocker: pytest_mock.MockFixture, -): - """Test when review is not found""" - mock_find_first = mocker.patch( - "backend.data.human_review.PendingHumanReview.prisma" - ) - mock_find_first.return_value.find_first = AsyncMock(return_value=None) - - result = await get_pending_review_by_node_exec_id("nonexistent", "test_user") - - assert result is None - - @pytest.mark.asyncio async def test_get_or_create_human_review_new( mocker: pytest_mock.MockFixture, @@ -85,7 +50,7 @@ async def test_get_or_create_human_review_new( mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review) result = await get_or_create_human_review( - user_id="test_user", + user_id="test-user-123", node_exec_id="test_node_123", graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", @@ -114,7 +79,7 @@ async def test_get_or_create_human_review_approved( mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review) result = await get_or_create_human_review( - user_id="test_user", + user_id="test-user-123", node_exec_id="test_node_123", graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", @@ -186,7 +151,9 @@ async def test_get_pending_reviews_for_execution( mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma") mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review]) - result = await get_pending_reviews_for_execution("test_graph_exec_456", "test_user") + result = await get_pending_reviews_for_execution( + "test_graph_exec_456", "test-user-123" + ) assert len(result) == 1 assert result[0].graph_exec_id == "test_graph_exec_456" @@ -194,7 +161,7 @@ async def test_get_pending_reviews_for_execution( # Verify it filters by execution and user call_args = mock_find_many.return_value.find_many.call_args where_clause = call_args.kwargs["where"] - assert where_clause["userId"] == "test_user" + assert where_clause["userId"] == "test-user-123" assert where_clause["graphExecId"] == "test_graph_exec_456" assert where_clause["status"] == ReviewStatus.WAITING @@ -206,13 +173,13 @@ async def test_process_all_reviews_for_execution_success( ): """Test successful processing of reviews for an execution""" # Mock finding reviews - mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma") - mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review]) + mock_prisma = mocker.patch("backend.data.human_review.PendingHumanReview.prisma") + mock_prisma.return_value.find_many = AsyncMock(return_value=[sample_db_review]) # Mock updating reviews updated_review = Mock() updated_review.nodeExecId = "test_node_123" - updated_review.userId = "test_user" + updated_review.userId = "test-user-123" updated_review.graphExecId = "test_graph_exec_456" updated_review.graphId = "test_graph_789" updated_review.graphVersion = 1 @@ -226,8 +193,7 @@ async def test_process_all_reviews_for_execution_success( updated_review.createdAt = datetime.datetime.now(datetime.timezone.utc) updated_review.updatedAt = datetime.datetime.now(datetime.timezone.utc) updated_review.reviewedAt = datetime.datetime.now(datetime.timezone.utc) - mock_update = mocker.patch("backend.data.human_review.PendingHumanReview.prisma") - mock_update.return_value.update = AsyncMock(return_value=updated_review) + mock_prisma.return_value.update = AsyncMock(return_value=updated_review) # Mock gather to simulate parallel updates mocker.patch( @@ -236,7 +202,7 @@ async def test_process_all_reviews_for_execution_success( ) result = await process_all_reviews_for_execution( - user_id="test_user", + user_id="test-user-123", review_decisions={ "test_node_123": (ReviewStatus.APPROVED, {"data": "modified"}, "Approved") }, @@ -260,7 +226,7 @@ async def test_process_all_reviews_for_execution_validation_errors( with pytest.raises(ValueError, match="Reviews not found"): await process_all_reviews_for_execution( - user_id="test_user", + user_id="test-user-123", review_decisions={ "nonexistent_node": (ReviewStatus.APPROVED, {"data": "test"}, "message") }, @@ -282,7 +248,7 @@ async def test_process_all_reviews_edit_permission_error( with pytest.raises(ValueError, match="not editable"): await process_all_reviews_for_execution( - user_id="test_user", + user_id="test-user-123", review_decisions={ "test_node_123": ( ReviewStatus.APPROVED, @@ -302,7 +268,7 @@ async def test_process_all_reviews_mixed_approval_rejection( # Create second review for rejection second_review = Mock() second_review.nodeExecId = "test_node_456" - second_review.userId = "test_user" + second_review.userId = "test-user-123" second_review.graphExecId = "test_graph_exec_456" second_review.graphId = "test_graph_789" second_review.graphVersion = 1 @@ -326,7 +292,7 @@ async def test_process_all_reviews_mixed_approval_rejection( # Mock updating reviews approved_review = Mock() approved_review.nodeExecId = "test_node_123" - approved_review.userId = "test_user" + approved_review.userId = "test-user-123" approved_review.graphExecId = "test_graph_exec_456" approved_review.graphId = "test_graph_789" approved_review.graphVersion = 1 @@ -343,7 +309,7 @@ async def test_process_all_reviews_mixed_approval_rejection( rejected_review = Mock() rejected_review.nodeExecId = "test_node_456" - rejected_review.userId = "test_user" + rejected_review.userId = "test-user-123" rejected_review.graphExecId = "test_graph_exec_456" rejected_review.graphId = "test_graph_789" rejected_review.graphVersion = 1 @@ -364,7 +330,7 @@ async def test_process_all_reviews_mixed_approval_rejection( ) result = await process_all_reviews_for_execution( - user_id="test_user", + user_id="test-user-123", review_decisions={ "test_node_123": (ReviewStatus.APPROVED, {"data": "modified"}, "Approved"), "test_node_456": (ReviewStatus.REJECTED, None, "Rejected"), diff --git a/autogpt_platform/backend/backend/data/integrations.py b/autogpt_platform/backend/backend/data/integrations.py index 68bdbe085feb..0f328e81b76a 100644 --- a/autogpt_platform/backend/backend/data/integrations.py +++ b/autogpt_platform/backend/backend/data/integrations.py @@ -1,5 +1,5 @@ import logging -from typing import AsyncGenerator, Literal, Optional, overload +from typing import TYPE_CHECKING, AsyncGenerator, Literal, Optional, overload from prisma.models import AgentNode, AgentPreset, IntegrationWebhook from prisma.types import ( @@ -19,10 +19,12 @@ from backend.integrations.providers import ProviderName from backend.integrations.webhooks import get_webhook_manager from backend.integrations.webhooks.utils import webhook_ingress_url -from backend.server.v2.library.model import LibraryAgentPreset from backend.util.exceptions import NotFoundError from backend.util.json import SafeJson +if TYPE_CHECKING: + from backend.server.v2.library.model import LibraryAgentPreset + from .db import BaseDbModel from .graph import NodeModel @@ -64,7 +66,7 @@ def from_db(webhook: IntegrationWebhook): class WebhookWithRelations(Webhook): triggered_nodes: list[NodeModel] - triggered_presets: list[LibraryAgentPreset] + triggered_presets: list["LibraryAgentPreset"] @staticmethod def from_db(webhook: IntegrationWebhook): @@ -73,6 +75,12 @@ def from_db(webhook: IntegrationWebhook): "AgentNodes and AgentPresets must be included in " "IntegrationWebhook query with relations" ) + # LibraryAgentPreset import is moved to TYPE_CHECKING to avoid circular import: + # integrations.py → library/model.py → integrations.py (for Webhook) + # Runtime import is used in WebhookWithRelations.from_db() method instead + # Import at runtime to avoid circular dependency + from backend.server.v2.library.model import LibraryAgentPreset + return WebhookWithRelations( **Webhook.from_db(webhook).model_dump(), triggered_nodes=[NodeModel.from_db(node) for node in webhook.AgentNodes], diff --git a/autogpt_platform/backend/backend/data/model.py b/autogpt_platform/backend/backend/data/model.py index 1c32a9f4444b..ca4d330301b7 100644 --- a/autogpt_platform/backend/backend/data/model.py +++ b/autogpt_platform/backend/backend/data/model.py @@ -46,6 +46,7 @@ # Type alias for any provider name (including custom ones) AnyProviderName = str # Will be validated as ProviderName at runtime +USER_TIMEZONE_NOT_SET = "not-set" class User(BaseModel): @@ -98,7 +99,7 @@ class User(BaseModel): # User timezone for scheduling and time display timezone: str = Field( - default="not-set", + default=USER_TIMEZONE_NOT_SET, description="User timezone (IANA timezone identifier or 'not-set')", ) @@ -155,7 +156,7 @@ def from_db(cls, prisma_user: "PrismaUser") -> "User": notify_on_daily_summary=prisma_user.notifyOnDailySummary or True, notify_on_weekly_summary=prisma_user.notifyOnWeeklySummary or True, notify_on_monthly_summary=prisma_user.notifyOnMonthlySummary or True, - timezone=prisma_user.timezone or "not-set", + timezone=prisma_user.timezone or USER_TIMEZONE_NOT_SET, ) diff --git a/autogpt_platform/backend/backend/executor/database.py b/autogpt_platform/backend/backend/executor/database.py index 27e8b0104398..c2cb0dd2e7dd 100644 --- a/autogpt_platform/backend/backend/executor/database.py +++ b/autogpt_platform/backend/backend/executor/database.py @@ -28,6 +28,7 @@ get_connected_output_nodes, get_graph, get_graph_metadata, + get_graph_settings, get_node, validate_graph_execution_permissions, ) @@ -150,6 +151,7 @@ def _( get_graph = _(get_graph) get_connected_output_nodes = _(get_connected_output_nodes) get_graph_metadata = _(get_graph_metadata) + get_graph_settings = _(get_graph_settings) # Credits spend_credits = _(_spend_credits, name="spend_credits") @@ -254,6 +256,7 @@ def get_service_type(cls): get_latest_node_execution = d.get_latest_node_execution get_graph = d.get_graph get_graph_metadata = d.get_graph_metadata + get_graph_settings = d.get_graph_settings get_graph_execution_meta = d.get_graph_execution_meta get_node = d.get_node get_node_execution = d.get_node_execution diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index 06ad06e6dcab..bfec94176d97 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -29,6 +29,7 @@ from backend.data.credit import UsageTransactionMetadata from backend.data.dynamic_fields import parse_execution_output from backend.data.execution import ( + ExecutionContext, ExecutionQueue, ExecutionStatus, GraphExecution, @@ -36,7 +37,6 @@ NodeExecutionEntry, NodeExecutionResult, NodesInputMasks, - UserContext, ) from backend.data.graph import Link, Node from backend.data.model import GraphExecutionStats, NodeExecutionStats @@ -168,6 +168,7 @@ async def execute_node( node_exec_id = data.node_exec_id node_id = data.node_id node_block = node.block + execution_context = data.execution_context log_metadata = LogMetadata( logger=_logger, @@ -210,11 +211,9 @@ async def execute_node( "graph_exec_id": graph_exec_id, "node_exec_id": node_exec_id, "user_id": user_id, + "execution_context": execution_context, } - # Add user context from NodeExecutionEntry - extra_exec_kwargs["user_context"] = data.user_context - # Last-minute fetch credentials + acquire a system-wide read-write lock to prevent # changes during execution. ⚠️ This means a set of credentials can only be used by # one (running) block at a time; simultaneous execution of blocks using same @@ -243,8 +242,8 @@ async def execute_node( scope.set_tag("node_id", node_id) scope.set_tag("block_name", node_block.name) scope.set_tag("block_id", node_block.id) - for k, v in (data.user_context or UserContext(timezone="UTC")).model_dump().items(): - scope.set_tag(f"user_context.{k}", v) + for k, v in execution_context.model_dump().items(): + scope.set_tag(f"execution_context.{k}", v) try: async for output_name, output_data in node_block.execute( @@ -289,7 +288,7 @@ async def _enqueue_next_nodes( graph_version: int, log_metadata: LogMetadata, nodes_input_masks: Optional[NodesInputMasks], - user_context: UserContext, + execution_context: ExecutionContext, ) -> list[NodeExecutionEntry]: async def add_enqueued_execution( node_exec_id: str, node_id: str, block_id: str, data: BlockInput @@ -309,7 +308,7 @@ async def add_enqueued_execution( node_id=node_id, block_id=block_id, inputs=data, - user_context=user_context, + execution_context=execution_context, ) async def register_next_executions(node_link: Link) -> list[NodeExecutionEntry]: @@ -861,7 +860,9 @@ def _on_graph_execution( ExecutionStatus.REVIEW, ], ): - node_entry = node_exec.to_node_execution_entry(graph_exec.user_context) + node_entry = node_exec.to_node_execution_entry( + graph_exec.execution_context + ) execution_queue.add(node_entry) # ------------------------------------------------------------ @@ -1165,7 +1166,7 @@ async def _process_node_output( graph_version=graph_exec.graph_version, log_metadata=log_metadata, nodes_input_masks=nodes_input_masks, - user_context=graph_exec.user_context, + execution_context=graph_exec.execution_context, ): execution_queue.add(next_execution) @@ -1555,36 +1556,32 @@ def _republish_to_back(): graph_exec_id = graph_exec_entry.graph_exec_id user_id = graph_exec_entry.user_id graph_id = graph_exec_entry.graph_id - parent_graph_exec_id = graph_exec_entry.parent_graph_exec_id + root_exec_id = graph_exec_entry.execution_context.root_execution_id + parent_exec_id = graph_exec_entry.execution_context.parent_execution_id logger.info( f"[{self.service_name}] Received RUN for graph_exec_id={graph_exec_id}, user_id={user_id}, executor_id={self.executor_id}" - + (f", parent={parent_graph_exec_id}" if parent_graph_exec_id else "") + + (f", root={root_exec_id}" if root_exec_id else "") + + (f", parent={parent_exec_id}" if parent_exec_id else "") ) - # Check if parent execution is already terminated (prevents orphaned child executions) - if parent_graph_exec_id: - try: - parent_exec = get_db_client().get_graph_execution_meta( - execution_id=parent_graph_exec_id, - user_id=user_id, + # Check if root execution is already terminated (prevents orphaned child executions) + if root_exec_id and root_exec_id != graph_exec_id: + parent_exec = get_db_client().get_graph_execution_meta( + execution_id=root_exec_id, + user_id=user_id, + ) + if parent_exec and parent_exec.status == ExecutionStatus.TERMINATED: + logger.info( + f"[{self.service_name}] Skipping execution {graph_exec_id} - parent {root_exec_id} is TERMINATED" ) - if parent_exec and parent_exec.status == ExecutionStatus.TERMINATED: - logger.info( - f"[{self.service_name}] Skipping execution {graph_exec_id} - parent {parent_graph_exec_id} is TERMINATED" - ) - # Mark this child as terminated since parent was stopped - get_db_client().update_graph_execution_stats( - graph_exec_id=graph_exec_id, - status=ExecutionStatus.TERMINATED, - ) - _ack_message(reject=False, requeue=False) - return - except Exception as e: - logger.warning( - f"[{self.service_name}] Could not check parent status for {graph_exec_id}: {e}" + # Mark this child as terminated since parent was stopped + get_db_client().update_graph_execution_stats( + graph_exec_id=graph_exec_id, + status=ExecutionStatus.TERMINATED, ) - # Continue execution if parent check fails (don't block on errors) + _ack_message(reject=False, requeue=False) + return # Check user rate limit before processing try: diff --git a/autogpt_platform/backend/backend/executor/utils.py b/autogpt_platform/backend/backend/executor/utils.py index f8c6da8546e7..bcd3dcf3b6ee 100644 --- a/autogpt_platform/backend/backend/executor/utils.py +++ b/autogpt_platform/backend/backend/executor/utils.py @@ -10,6 +10,7 @@ from backend.data import execution as execution_db from backend.data import graph as graph_db +from backend.data import user as user_db from backend.data.block import ( Block, BlockCostType, @@ -24,19 +25,17 @@ # Import dynamic field utilities from centralized location from backend.data.dynamic_fields import merge_execution_input from backend.data.execution import ( + ExecutionContext, ExecutionStatus, GraphExecutionMeta, GraphExecutionStats, GraphExecutionWithNodes, NodesInputMasks, - UserContext, get_graph_execution, ) from backend.data.graph import GraphModel, Node -from backend.data.model import CredentialsMetaInput +from backend.data.model import USER_TIMEZONE_NOT_SET, CredentialsMetaInput from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig -from backend.data.user import get_user_by_id -from backend.util.cache import cached from backend.util.clients import ( get_async_execution_event_bus, get_async_execution_queue, @@ -52,32 +51,6 @@ from backend.util.settings import Config from backend.util.type import convert - -@cached(maxsize=1000, ttl_seconds=3600) -async def get_user_context(user_id: str) -> UserContext: - """ - Get UserContext for a user, always returns a valid context with timezone. - Defaults to UTC if user has no timezone set. - """ - user_context = UserContext(timezone="UTC") # Default to UTC - try: - if prisma.is_connected(): - user = await get_user_by_id(user_id) - else: - user = await get_database_manager_async_client().get_user_by_id(user_id) - - if user and user.timezone and user.timezone != "not-set": - user_context.timezone = user.timezone - logger.debug(f"Retrieved user context: timezone={user.timezone}") - else: - logger.debug("User has no timezone set, using UTC") - except Exception as e: - logger.warning(f"Could not fetch user timezone: {e}") - # Continue with UTC as default - - return user_context - - config = Config() logger = TruncatedLogger(logging.getLogger(__name__), prefix="[GraphExecutorUtil]") @@ -495,7 +468,6 @@ async def validate_and_construct_node_execution_input( graph_version: The version of the graph to use. graph_credentials_inputs: Credentials inputs to use. nodes_input_masks: Node inputs to use. - is_sub_graph: Whether this is a sub-graph execution. Returns: GraphModel: Full graph object for the given `graph_id`. @@ -763,8 +735,7 @@ async def add_graph_execution( graph_version: Optional[int] = None, graph_credentials_inputs: Optional[Mapping[str, CredentialsMetaInput]] = None, nodes_input_masks: Optional[NodesInputMasks] = None, - parent_graph_exec_id: Optional[str] = None, - is_sub_graph: bool = False, + execution_context: Optional[ExecutionContext] = None, graph_exec_id: Optional[str] = None, ) -> GraphExecutionWithNodes: """ @@ -780,7 +751,6 @@ async def add_graph_execution( Keys should map to the keys generated by `GraphModel.aggregate_credentials_inputs`. nodes_input_masks: Node inputs to use in the execution. parent_graph_exec_id: The ID of the parent graph execution (for nested executions). - is_sub_graph: Whether this is a sub-graph execution. graph_exec_id: If provided, resume this existing execution instead of creating a new one. Returns: GraphExecutionEntry: The entry for the graph execution. @@ -790,8 +760,10 @@ async def add_graph_execution( """ if prisma.is_connected(): edb = execution_db + udb = user_db + gdb = graph_db else: - edb = get_database_manager_async_client() + edb = udb = gdb = get_database_manager_async_client() # Get or create the graph execution if graph_exec_id: @@ -810,6 +782,10 @@ async def add_graph_execution( logger.info(f"Resuming graph execution #{graph_exec.id} for graph #{graph_id}") else: + parent_exec_id = ( + execution_context.parent_execution_id if execution_context else None + ) + # Create new execution graph, starting_nodes_input, compiled_nodes_input_masks = ( await validate_and_construct_node_execution_input( @@ -819,7 +795,7 @@ async def add_graph_execution( graph_version=graph_version, graph_credentials_inputs=graph_credentials_inputs, nodes_input_masks=nodes_input_masks, - is_sub_graph=is_sub_graph, + is_sub_graph=parent_exec_id is not None, ) ) @@ -832,7 +808,7 @@ async def add_graph_execution( nodes_input_masks=nodes_input_masks, starting_nodes_input=starting_nodes_input, preset_id=preset_id, - parent_graph_exec_id=parent_graph_exec_id, + parent_graph_exec_id=parent_exec_id, ) logger.info( @@ -840,14 +816,28 @@ async def add_graph_execution( f"#{graph_id} with {len(starting_nodes_input)} starting nodes" ) - # Common path: publish to queue and update status + # Generate execution context if it's not provided + if execution_context is None: + user = await udb.get_user_by_id(user_id) + settings = await gdb.get_graph_settings(user_id=user_id, graph_id=graph_id) + + execution_context = ExecutionContext( + safe_mode=( + settings.human_in_the_loop_safe_mode + if settings.human_in_the_loop_safe_mode is not None + else True + ), + user_timezone=( + user.timezone if user.timezone != USER_TIMEZONE_NOT_SET else "UTC" + ), + root_execution_id=graph_exec.id, + ) + try: graph_exec_entry = graph_exec.to_graph_execution_entry( - user_context=await get_user_context(user_id), compiled_nodes_input_masks=compiled_nodes_input_masks, - parent_graph_exec_id=parent_graph_exec_id, + execution_context=execution_context, ) - logger.info(f"Publishing execution {graph_exec.id} to execution queue") exec_queue = await get_async_execution_queue() diff --git a/autogpt_platform/backend/backend/executor/utils_test.py b/autogpt_platform/backend/backend/executor/utils_test.py index b418a069aa7e..8854214e14e7 100644 --- a/autogpt_platform/backend/backend/executor/utils_test.py +++ b/autogpt_platform/backend/backend/executor/utils_test.py @@ -348,9 +348,6 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture): mock_graph_exec.node_executions = [] # Add this to avoid AttributeError mock_graph_exec.to_graph_execution_entry.return_value = mocker.MagicMock() - # Mock user context - mock_user_context = {"user_id": user_id, "context": "test_context"} - # Mock the queue and event bus mock_queue = mocker.AsyncMock() mock_event_bus = mocker.MagicMock() @@ -362,7 +359,8 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture): ) mock_edb = mocker.patch("backend.executor.utils.execution_db") mock_prisma = mocker.patch("backend.executor.utils.prisma") - mock_get_user_context = mocker.patch("backend.executor.utils.get_user_context") + mock_udb = mocker.patch("backend.executor.utils.user_db") + mock_gdb = mocker.patch("backend.executor.utils.graph_db") mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue") mock_get_event_bus = mocker.patch( "backend.executor.utils.get_async_execution_event_bus" @@ -380,7 +378,14 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture): return_value=mock_graph_exec ) mock_edb.update_node_execution_status_batch = mocker.AsyncMock() - mock_get_user_context.return_value = mock_user_context + # Mock user and settings data + mock_user = mocker.MagicMock() + mock_user.timezone = "UTC" + mock_settings = mocker.MagicMock() + mock_settings.human_in_the_loop_safe_mode = True + + mock_udb.get_user_by_id = mocker.AsyncMock(return_value=mock_user) + mock_gdb.get_graph_settings = mocker.AsyncMock(return_value=mock_settings) mock_get_queue.return_value = mock_queue mock_get_event_bus.return_value = mock_event_bus diff --git a/autogpt_platform/backend/backend/server/routers/v1.py b/autogpt_platform/backend/backend/server/routers/v1.py index e38dd77ac597..7ad812af8a93 100644 --- a/autogpt_platform/backend/backend/server/routers/v1.py +++ b/autogpt_platform/backend/backend/server/routers/v1.py @@ -44,7 +44,7 @@ get_user_credit_model, set_auto_top_up, ) -from backend.data.execution import UserContext +from backend.data.graph import GraphSettings from backend.data.model import CredentialsMetaInput from backend.data.notifications import NotificationPreference, NotificationPreferenceDTO from backend.data.onboarding import ( @@ -387,19 +387,15 @@ async def execute_graph_block( if not obj: raise HTTPException(status_code=404, detail=f"Block #{block_id} not found.") - # Get user context for block execution user = await get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="User not found.") - user_context = UserContext(timezone=user.timezone) - start_time = time.time() try: output = defaultdict(list) async for name, data in obj.execute( data, - user_context=user_context, user_id=user_id, # Note: graph_exec_id and graph_id are not available for direct block execution ): @@ -842,9 +838,18 @@ async def update_graph( if new_graph_version.is_active: # Keep the library agent up to date with the new active version - await library_db.update_agent_version_in_library( + library = await library_db.update_agent_version_in_library( user_id, graph.id, graph.version ) + if ( + new_graph_version.has_human_in_the_loop + and library.settings.human_in_the_loop_safe_mode is None + ): + await library_db.update_library_agent_settings( + user_id=user_id, + agent_id=library.id, + settings=GraphSettings(human_in_the_loop_safe_mode=True), + ) # Handle activation of the new graph first to ensure continuity new_graph_version = await on_graph_activate(new_graph_version, user_id=user_id) @@ -901,15 +906,54 @@ async def set_graph_active_version( ) # Keep the library agent up to date with the new active version - await library_db.update_agent_version_in_library( + library = await library_db.update_agent_version_in_library( user_id, new_active_graph.id, new_active_graph.version ) + if ( + new_active_graph.has_human_in_the_loop + and library.settings.human_in_the_loop_safe_mode is None + ): + await library_db.update_library_agent_settings( + user_id=user_id, + agent_id=library.id, + settings=GraphSettings(human_in_the_loop_safe_mode=True), + ) if current_active_graph and current_active_graph.version != new_active_version: # Handle deactivation of the previously active version await on_graph_deactivate(current_active_graph, user_id=user_id) +@v1_router.patch( + path="/graphs/{graph_id}/settings", + summary="Update graph settings", + tags=["graphs"], + dependencies=[Security(requires_user)], +) +async def update_graph_settings( + graph_id: str, + settings: GraphSettings, + user_id: Annotated[str, Security(get_user_id)], +) -> GraphSettings: + """Update graph settings for the user's library agent.""" + # Get the library agent for this graph + library_agent = await library_db.get_library_agent_by_graph_id( + graph_id=graph_id, user_id=user_id + ) + if not library_agent: + raise HTTPException(404, f"Graph #{graph_id} not found in user's library") + + # Update the library agent settings + updated_agent = await library_db.update_library_agent_settings( + user_id=user_id, + agent_id=library_agent.id, + settings=settings, + ) + + # Return the updated settings + return GraphSettings.model_validate(updated_agent.settings) + + @v1_router.post( path="/graphs/{graph_id}/execute/{graph_version}", summary="Execute graph agent", diff --git a/autogpt_platform/backend/backend/server/v2/executions/review/review_routes_test.py b/autogpt_platform/backend/backend/server/v2/executions/review/review_routes_test.py index 3bc0dff923a9..2e62641ad3ea 100644 --- a/autogpt_platform/backend/backend/server/v2/executions/review/review_routes_test.py +++ b/autogpt_platform/backend/backend/server/v2/executions/review/review_routes_test.py @@ -7,6 +7,7 @@ from prisma.enums import ReviewStatus from pytest_snapshot.plugin import Snapshot +from backend.server.rest_api import handle_internal_http_error from backend.server.v2.executions.review.model import PendingHumanReviewModel from backend.server.v2.executions.review.routes import router @@ -15,6 +16,7 @@ app = fastapi.FastAPI() app.include_router(router, prefix="/api/review") +app.add_exception_handler(ValueError, handle_internal_http_error(400)) client = fastapi.testclient.TestClient(app) @@ -30,11 +32,11 @@ def setup_app_auth(mock_jwt_user): @pytest.fixture -def sample_pending_review() -> PendingHumanReviewModel: +def sample_pending_review(test_user_id: str) -> PendingHumanReviewModel: """Create a sample pending review for testing""" return PendingHumanReviewModel( node_exec_id="test_node_123", - user_id="test_user", + user_id=test_user_id, graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", graph_version=1, @@ -54,6 +56,7 @@ def sample_pending_review() -> PendingHumanReviewModel: def test_get_pending_reviews_empty( mocker: pytest_mock.MockFixture, snapshot: Snapshot, + test_user_id: str, ) -> None: """Test getting pending reviews when none exist""" mock_get_reviews = mocker.patch( @@ -65,13 +68,14 @@ def test_get_pending_reviews_empty( assert response.status_code == 200 assert response.json() == [] - mock_get_reviews.assert_called_once_with("test_user", 1, 25) + mock_get_reviews.assert_called_once_with(test_user_id, 1, 25) def test_get_pending_reviews_with_data( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, snapshot: Snapshot, + test_user_id: str, ) -> None: """Test getting pending reviews with data""" mock_get_reviews = mocker.patch( @@ -86,13 +90,14 @@ def test_get_pending_reviews_with_data( assert len(data) == 1 assert data[0]["node_exec_id"] == "test_node_123" assert data[0]["status"] == "WAITING" - mock_get_reviews.assert_called_once_with("test_user", 2, 10) + mock_get_reviews.assert_called_once_with(test_user_id, 2, 10) def test_get_pending_reviews_for_execution_success( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, snapshot: Snapshot, + test_user_id: str, ) -> None: """Test getting pending reviews for specific execution""" mock_get_graph_execution = mocker.patch( @@ -100,7 +105,7 @@ def test_get_pending_reviews_for_execution_success( ) mock_get_graph_execution.return_value = { "id": "test_graph_exec_456", - "user_id": "test_user", + "user_id": test_user_id, } mock_get_reviews = mocker.patch( @@ -118,6 +123,7 @@ def test_get_pending_reviews_for_execution_success( def test_get_pending_reviews_for_execution_access_denied( mocker: pytest_mock.MockFixture, + test_user_id: str, ) -> None: """Test access denied when user doesn't own the execution""" mock_get_graph_execution = mocker.patch( @@ -134,13 +140,10 @@ def test_get_pending_reviews_for_execution_access_denied( def test_process_review_action_approve_success( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, + test_user_id: str, ) -> None: """Test successful review approval""" - # Mock the validation functions - mock_get_pending_review = mocker.patch( - "backend.data.human_review.get_pending_review_by_node_exec_id" - ) - mock_get_pending_review.return_value = sample_pending_review + # Mock the route functions mock_get_reviews_for_execution = mocker.patch( "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution" @@ -150,24 +153,42 @@ def test_process_review_action_approve_success( mock_process_all_reviews = mocker.patch( "backend.server.v2.executions.review.routes.process_all_reviews_for_execution" ) - mock_process_all_reviews.return_value = {"test_node_123": sample_pending_review} + # Create approved review for return + approved_review = PendingHumanReviewModel( + node_exec_id="test_node_123", + user_id=test_user_id, + graph_exec_id="test_graph_exec_456", + graph_id="test_graph_789", + graph_version=1, + payload={"data": "modified payload", "value": 50}, + instructions="Please review this data", + editable=True, + status=ReviewStatus.APPROVED, + review_message="Looks good", + was_edited=True, + processed=False, + created_at=FIXED_NOW, + updated_at=FIXED_NOW, + reviewed_at=FIXED_NOW, + ) + mock_process_all_reviews.return_value = {"test_node_123": approved_review} mock_has_pending = mocker.patch( - "backend.data.human_review.has_pending_reviews_for_graph_exec" + "backend.server.v2.executions.review.routes.has_pending_reviews_for_graph_exec" ) mock_has_pending.return_value = False - mocker.patch("backend.executor.utils.add_graph_execution") + mocker.patch("backend.server.v2.executions.review.routes.add_graph_execution") request_data = { - "approved_reviews": [ + "reviews": [ { "node_exec_id": "test_node_123", + "approved": True, "message": "Looks good", "reviewed_data": {"data": "modified payload", "value": 50}, } - ], - "rejected_review_ids": [], + ] } response = client.post("/api/review/action", json=request_data) @@ -183,13 +204,10 @@ def test_process_review_action_approve_success( def test_process_review_action_reject_success( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, + test_user_id: str, ) -> None: """Test successful review rejection""" - # Mock the validation functions - mock_get_pending_review = mocker.patch( - "backend.data.human_review.get_pending_review_by_node_exec_id" - ) - mock_get_pending_review.return_value = sample_pending_review + # Mock the route functions mock_get_reviews_for_execution = mocker.patch( "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution" @@ -201,7 +219,7 @@ def test_process_review_action_reject_success( ) rejected_review = PendingHumanReviewModel( node_exec_id="test_node_123", - user_id="test_user", + user_id=test_user_id, graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", graph_version=1, @@ -219,11 +237,19 @@ def test_process_review_action_reject_success( mock_process_all_reviews.return_value = {"test_node_123": rejected_review} mock_has_pending = mocker.patch( - "backend.data.human_review.has_pending_reviews_for_graph_exec" + "backend.server.v2.executions.review.routes.has_pending_reviews_for_graph_exec" ) mock_has_pending.return_value = False - request_data = {"approved_reviews": [], "rejected_review_ids": ["test_node_123"]} + request_data = { + "reviews": [ + { + "node_exec_id": "test_node_123", + "approved": False, + "message": None, + } + ] + } response = client.post("/api/review/action", json=request_data) @@ -238,12 +264,13 @@ def test_process_review_action_reject_success( def test_process_review_action_mixed_success( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, + test_user_id: str, ) -> None: """Test mixed approve/reject operations""" # Create a second review second_review = PendingHumanReviewModel( node_exec_id="test_node_456", - user_id="test_user", + user_id=test_user_id, graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", graph_version=1, @@ -259,13 +286,7 @@ def test_process_review_action_mixed_success( reviewed_at=None, ) - # Mock the validation functions - mock_get_pending_review = mocker.patch( - "backend.data.human_review.get_pending_review_by_node_exec_id" - ) - mock_get_pending_review.side_effect = lambda node_id, user_id: ( - sample_pending_review if node_id == "test_node_123" else second_review - ) + # Mock the route functions mock_get_reviews_for_execution = mocker.patch( "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution" @@ -278,7 +299,7 @@ def test_process_review_action_mixed_success( # Create approved version of first review approved_review = PendingHumanReviewModel( node_exec_id="test_node_123", - user_id="test_user", + user_id=test_user_id, graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", graph_version=1, @@ -296,7 +317,7 @@ def test_process_review_action_mixed_success( # Create rejected version of second review rejected_review = PendingHumanReviewModel( node_exec_id="test_node_456", - user_id="test_user", + user_id=test_user_id, graph_exec_id="test_graph_exec_456", graph_id="test_graph_789", graph_version=1, @@ -317,19 +338,24 @@ def test_process_review_action_mixed_success( } mock_has_pending = mocker.patch( - "backend.data.human_review.has_pending_reviews_for_graph_exec" + "backend.server.v2.executions.review.routes.has_pending_reviews_for_graph_exec" ) mock_has_pending.return_value = False request_data = { - "approved_reviews": [ + "reviews": [ { "node_exec_id": "test_node_123", + "approved": True, "message": "Approved", "reviewed_data": {"data": "modified"}, - } - ], - "rejected_review_ids": ["test_node_456"], + }, + { + "node_exec_id": "test_node_456", + "approved": False, + "message": None, + }, + ] } response = client.post("/api/review/action", json=request_data) @@ -344,52 +370,64 @@ def test_process_review_action_mixed_success( def test_process_review_action_empty_request( mocker: pytest_mock.MockFixture, + test_user_id: str, ) -> None: """Test error when no reviews provided""" - request_data = {"approved_reviews": [], "rejected_review_ids": []} + request_data = {"reviews": []} response = client.post("/api/review/action", json=request_data) - assert response.status_code == 400 - assert "At least one review must be provided" in response.json()["detail"] + assert response.status_code == 422 + response_data = response.json() + # Pydantic validation error format + assert isinstance(response_data["detail"], list) + assert len(response_data["detail"]) > 0 + assert "At least one review must be provided" in response_data["detail"][0]["msg"] def test_process_review_action_review_not_found( mocker: pytest_mock.MockFixture, + test_user_id: str, ) -> None: """Test error when review is not found""" - mock_get_pending_review = mocker.patch( - "backend.data.human_review.get_pending_review_by_node_exec_id" + # Mock the functions that extract graph execution ID from the request + mock_get_reviews_for_execution = mocker.patch( + "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution" + ) + mock_get_reviews_for_execution.return_value = [] # No reviews found + + # Mock process_all_reviews to simulate not finding reviews + mock_process_all_reviews = mocker.patch( + "backend.server.v2.executions.review.routes.process_all_reviews_for_execution" + ) + # This should raise a ValueError with "Reviews not found" message based on the data/human_review.py logic + mock_process_all_reviews.side_effect = ValueError( + "Reviews not found or access denied for IDs: nonexistent_node" ) - mock_get_pending_review.return_value = None request_data = { - "approved_reviews": [ + "reviews": [ { "node_exec_id": "nonexistent_node", + "approved": True, "message": "Test", } - ], - "rejected_review_ids": [], + ] } response = client.post("/api/review/action", json=request_data) - assert response.status_code == 403 - assert "not found or access denied" in response.json()["detail"] + assert response.status_code == 400 + assert "Reviews not found" in response.json()["detail"] def test_process_review_action_partial_failure( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, + test_user_id: str, ) -> None: """Test handling of partial failures in review processing""" - # Mock successful validation - mock_get_pending_review = mocker.patch( - "backend.data.human_review.get_pending_review_by_node_exec_id" - ) - mock_get_pending_review.return_value = sample_pending_review - + # Mock the route functions mock_get_reviews_for_execution = mocker.patch( "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution" ) @@ -402,58 +440,53 @@ def test_process_review_action_partial_failure( mock_process_all_reviews.side_effect = ValueError("Some reviews failed validation") request_data = { - "approved_reviews": [ + "reviews": [ { "node_exec_id": "test_node_123", + "approved": True, "message": "Test", } - ], - "rejected_review_ids": [], + ] } response = client.post("/api/review/action", json=request_data) - assert response.status_code == 200 - data = response.json() - assert data["approved_count"] == 0 - assert data["rejected_count"] == 0 - assert data["failed_count"] == 1 - assert "Failed to process reviews" in data["error"] + assert response.status_code == 400 + assert "Some reviews failed validation" in response.json()["detail"] -def test_process_review_action_complete_failure( +def test_process_review_action_invalid_node_exec_id( mocker: pytest_mock.MockFixture, sample_pending_review: PendingHumanReviewModel, + test_user_id: str, ) -> None: - """Test complete failure scenario""" - # Mock successful validation - mock_get_pending_review = mocker.patch( - "backend.data.human_review.get_pending_review_by_node_exec_id" - ) - mock_get_pending_review.return_value = sample_pending_review - + """Test failure when trying to process review with invalid node execution ID""" + # Mock the route functions mock_get_reviews_for_execution = mocker.patch( "backend.server.v2.executions.review.routes.get_pending_reviews_for_execution" ) mock_get_reviews_for_execution.return_value = [sample_pending_review] - # Mock complete failure in processing + # Mock validation failure - this should return 400, not 500 mock_process_all_reviews = mocker.patch( "backend.server.v2.executions.review.routes.process_all_reviews_for_execution" ) - mock_process_all_reviews.side_effect = Exception("Database error") + mock_process_all_reviews.side_effect = ValueError( + "Invalid node execution ID format" + ) request_data = { - "approved_reviews": [ + "reviews": [ { - "node_exec_id": "test_node_123", + "node_exec_id": "invalid-node-format", + "approved": True, "message": "Test", } - ], - "rejected_review_ids": [], + ] } response = client.post("/api/review/action", json=request_data) - assert response.status_code == 500 - assert "error" in response.json()["detail"].lower() + # Should be a 400 Bad Request, not 500 Internal Server Error + assert response.status_code == 400 + assert "Invalid node execution ID format" in response.json()["detail"] diff --git a/autogpt_platform/backend/backend/server/v2/library/db.py b/autogpt_platform/backend/backend/server/v2/library/db.py index 1609929ecd9d..f9e74a091258 100644 --- a/autogpt_platform/backend/backend/server/v2/library/db.py +++ b/autogpt_platform/backend/backend/server/v2/library/db.py @@ -17,6 +17,7 @@ from backend.data.block import BlockInput from backend.data.db import transaction from backend.data.execution import get_graph_execution +from backend.data.graph import GraphSettings from backend.data.includes import AGENT_PRESET_INCLUDE, library_agent_include from backend.data.model import CredentialsMetaInput from backend.integrations.creds_manager import IntegrationCredentialsManager @@ -374,6 +375,24 @@ async def add_generated_agent_image( ) +def _initialize_graph_settings(graph: graph_db.GraphModel) -> GraphSettings: + """ + Initialize GraphSettings based on graph content. + + Args: + graph: The graph to analyze + + Returns: + GraphSettings with appropriate human_in_the_loop_safe_mode value + """ + if graph.has_human_in_the_loop: + # Graph has HITL blocks - set safe mode to True by default + return GraphSettings(human_in_the_loop_safe_mode=True) + else: + # Graph has no HITL blocks - keep None + return GraphSettings(human_in_the_loop_safe_mode=None) + + async def create_library_agent( graph: graph_db.GraphModel, user_id: str, @@ -420,6 +439,9 @@ async def create_library_agent( } } }, + settings=SafeJson( + _initialize_graph_settings(graph_entry).model_dump() + ), ), include=library_agent_include( user_id, include_nodes=False, include_executions=False @@ -440,7 +462,7 @@ async def update_agent_version_in_library( user_id: str, agent_graph_id: str, agent_graph_version: int, -) -> None: +) -> library_model.LibraryAgent: """ Updates the agent version in the library if useGraphIsActiveVersion is True. @@ -464,7 +486,7 @@ async def update_agent_version_in_library( "useGraphIsActiveVersion": True, }, ) - await prisma.models.LibraryAgent.prisma().update( + lib = await prisma.models.LibraryAgent.prisma().update( where={"id": library_agent.id}, data={ "AgentGraph": { @@ -477,6 +499,10 @@ async def update_agent_version_in_library( }, }, ) + if lib is None: + raise NotFoundError(f"Library agent {library_agent.id} not found") + + return library_model.LibraryAgent.from_db(lib) except prisma.errors.PrismaError as e: logger.error(f"Database error updating agent version in library: {e}") raise DatabaseError("Failed to update agent version in library") from e @@ -489,6 +515,7 @@ async def update_library_agent( is_favorite: Optional[bool] = None, is_archived: Optional[bool] = None, is_deleted: Optional[Literal[False]] = None, + settings: Optional[GraphSettings] = None, ) -> library_model.LibraryAgent: """ Updates the specified LibraryAgent record. @@ -499,6 +526,7 @@ async def update_library_agent( auto_update_version: Whether the agent should auto-update to active version. is_favorite: Whether this agent is marked as a favorite. is_archived: Whether this agent is archived. + settings: User-specific settings for this library agent. Returns: The updated LibraryAgent. @@ -510,7 +538,7 @@ async def update_library_agent( logger.debug( f"Updating library agent {library_agent_id} for user {user_id} with " f"auto_update_version={auto_update_version}, is_favorite={is_favorite}, " - f"is_archived={is_archived}" + f"is_archived={is_archived}, settings={settings}" ) update_fields: prisma.types.LibraryAgentUpdateManyMutationInput = {} if auto_update_version is not None: @@ -525,6 +553,8 @@ async def update_library_agent( "Use delete_library_agent() to (soft-)delete library agents" ) update_fields["isDeleted"] = is_deleted + if settings is not None: + update_fields["settings"] = SafeJson(settings.model_dump()) if not update_fields: raise ValueError("No values were passed to update") @@ -545,6 +575,33 @@ async def update_library_agent( raise DatabaseError("Failed to update library agent") from e +async def update_library_agent_settings( + user_id: str, + agent_id: str, + settings: GraphSettings, +) -> library_model.LibraryAgent: + """ + Updates the settings for a specific LibraryAgent. + + Args: + user_id: The owner of the LibraryAgent. + agent_id: The ID of the LibraryAgent to update. + settings: New GraphSettings to apply. + + Returns: + The updated LibraryAgent. + + Raises: + NotFoundError: If the specified LibraryAgent does not exist. + DatabaseError: If there's an error in the update operation. + """ + return await update_library_agent( + library_agent_id=agent_id, + user_id=user_id, + settings=settings, + ) + + async def delete_library_agent( library_agent_id: str, user_id: str, soft_delete: bool = True ) -> None: @@ -681,6 +738,18 @@ async def add_store_agent_to_library( graph = store_listing_version.AgentGraph + # Convert to GraphModel to check for HITL blocks + graph_model = await graph_db.get_graph( + graph_id=graph.id, + version=graph.version, + user_id=user_id, + include_subgraphs=False, + ) + if not graph_model: + raise store_exceptions.AgentNotFoundError( + f"Graph #{graph.id} v{graph.version} not found or accessible" + ) + # Check if user already has this agent existing_library_agent = await prisma.models.LibraryAgent.prisma().find_unique( where={ @@ -715,6 +784,9 @@ async def add_store_agent_to_library( } }, "isCreatedByUser": False, + "settings": SafeJson( + _initialize_graph_settings(graph_model).model_dump() + ), }, include=library_agent_include( user_id, include_nodes=False, include_executions=False diff --git a/autogpt_platform/backend/backend/server/v2/library/db_test.py b/autogpt_platform/backend/backend/server/v2/library/db_test.py index 2d42d26cfa7a..cb0095fb3959 100644 --- a/autogpt_platform/backend/backend/server/v2/library/db_test.py +++ b/autogpt_platform/backend/backend/server/v2/library/db_test.py @@ -32,6 +32,7 @@ async def test_get_library_agents(mocker): id="ua1", userId="test-user", agentGraphId="agent2", + settings="{}", # type: ignore agentGraphVersion=1, isCreatedByUser=False, isDeleted=False, @@ -123,6 +124,7 @@ async def test_add_agent_to_library(mocker): id="ua1", userId="test-user", agentGraphId=mock_store_listing_data.agentGraphId, + settings="{}", # type: ignore agentGraphVersion=1, isCreatedByUser=False, isDeleted=False, @@ -148,6 +150,14 @@ async def test_add_agent_to_library(mocker): return_value=mock_library_agent_data ) + # Mock graph_db.get_graph function that's called to check for HITL blocks + mock_graph_db = mocker.patch("backend.server.v2.library.db.graph_db") + mock_graph_model = mocker.Mock() + mock_graph_model.nodes = ( + [] + ) # Empty list so _has_human_in_the_loop_blocks returns False + mock_graph_db.get_graph = mocker.AsyncMock(return_value=mock_graph_model) + # Mock the model conversion mock_from_db = mocker.patch("backend.server.v2.library.model.LibraryAgent.from_db") mock_from_db.return_value = mocker.Mock() @@ -169,17 +179,29 @@ async def test_add_agent_to_library(mocker): }, include={"AgentGraph": True}, ) - mock_library_agent.return_value.create.assert_called_once_with( - data={ - "User": {"connect": {"id": "test-user"}}, - "AgentGraph": { - "connect": {"graphVersionId": {"id": "agent1", "version": 1}} - }, - "isCreatedByUser": False, - }, - include=library_agent_include( - "test-user", include_nodes=False, include_executions=False - ), + # Check that create was called with the expected data including settings + create_call_args = mock_library_agent.return_value.create.call_args + assert create_call_args is not None + + # Verify the main structure + expected_data = { + "User": {"connect": {"id": "test-user"}}, + "AgentGraph": {"connect": {"graphVersionId": {"id": "agent1", "version": 1}}}, + "isCreatedByUser": False, + } + + actual_data = create_call_args[1]["data"] + # Check that all expected fields are present + for key, value in expected_data.items(): + assert actual_data[key] == value + + # Check that settings field is present and is a SafeJson object + assert "settings" in actual_data + assert hasattr(actual_data["settings"], "__class__") # Should be a SafeJson object + + # Check include parameter + assert create_call_args[1]["include"] == library_agent_include( + "test-user", include_nodes=False, include_executions=False ) diff --git a/autogpt_platform/backend/backend/server/v2/library/model.py b/autogpt_platform/backend/backend/server/v2/library/model.py index f4c1a35177eb..8d47b7e07824 100644 --- a/autogpt_platform/backend/backend/server/v2/library/model.py +++ b/autogpt_platform/backend/backend/server/v2/library/model.py @@ -6,8 +6,8 @@ import prisma.models import pydantic -import backend.data.block as block_model -import backend.data.graph as graph_model +from backend.data.block import BlockInput +from backend.data.graph import GraphModel, GraphSettings, GraphTriggerInfo from backend.data.model import CredentialsMetaInput, is_credentials_field_name from backend.util.models import Pagination @@ -54,7 +54,7 @@ class LibraryAgent(pydantic.BaseModel): has_external_trigger: bool = pydantic.Field( description="Whether the agent has an external trigger (e.g. webhook) node" ) - trigger_setup_info: Optional[graph_model.GraphTriggerInfo] = None + trigger_setup_info: Optional[GraphTriggerInfo] = None # Indicates whether there's a new output (based on recent runs) new_output: bool @@ -71,6 +71,9 @@ class LibraryAgent(pydantic.BaseModel): # Recommended schedule cron (from marketplace agents) recommended_schedule_cron: str | None = None + # User-specific settings for this library agent + settings: GraphSettings = pydantic.Field(default_factory=GraphSettings) + @staticmethod def from_db( agent: prisma.models.LibraryAgent, @@ -83,7 +86,7 @@ def from_db( if not agent.AgentGraph: raise ValueError("Associated Agent record is required.") - graph = graph_model.GraphModel.from_db(agent.AgentGraph, sub_graphs=sub_graphs) + graph = GraphModel.from_db(agent.AgentGraph, sub_graphs=sub_graphs) agent_updated_at = agent.AgentGraph.updatedAt lib_agent_updated_at = agent.updatedAt @@ -140,6 +143,7 @@ def from_db( is_latest_version=is_latest_version, is_favorite=agent.isFavorite, recommended_schedule_cron=agent.AgentGraph.recommendedScheduleCron, + settings=GraphSettings.model_validate(agent.settings), ) @@ -207,7 +211,7 @@ class LibraryAgentPresetCreatable(pydantic.BaseModel): graph_id: str graph_version: int - inputs: block_model.BlockInput + inputs: BlockInput credentials: dict[str, CredentialsMetaInput] name: str @@ -236,7 +240,7 @@ class LibraryAgentPresetUpdatable(pydantic.BaseModel): Request model used when updating a preset for a library agent. """ - inputs: Optional[block_model.BlockInput] = None + inputs: Optional[BlockInput] = None credentials: Optional[dict[str, CredentialsMetaInput]] = None name: Optional[str] = None @@ -279,7 +283,7 @@ def from_db(cls, preset: prisma.models.AgentPreset) -> "LibraryAgentPreset": "Webhook must be included in AgentPreset query when webhookId is set" ) - input_data: block_model.BlockInput = {} + input_data: BlockInput = {} input_credentials: dict[str, CredentialsMetaInput] = {} for preset_input in preset.InputPresets: @@ -345,3 +349,6 @@ class LibraryAgentUpdateRequest(pydantic.BaseModel): is_archived: Optional[bool] = pydantic.Field( default=None, description="Archive the agent" ) + settings: Optional[GraphSettings] = pydantic.Field( + default=None, description="User-specific settings for this library agent" + ) diff --git a/autogpt_platform/backend/backend/server/v2/library/routes/agents.py b/autogpt_platform/backend/backend/server/v2/library/routes/agents.py index eeea9d8fb65f..1235ca07d077 100644 --- a/autogpt_platform/backend/backend/server/v2/library/routes/agents.py +++ b/autogpt_platform/backend/backend/server/v2/library/routes/agents.py @@ -276,6 +276,7 @@ async def update_library_agent( auto_update_version=payload.auto_update_version, is_favorite=payload.is_favorite, is_archived=payload.is_archived, + settings=payload.settings, ) except NotFoundError as e: raise HTTPException( diff --git a/autogpt_platform/backend/backend/util/cache.py b/autogpt_platform/backend/backend/util/cache.py index c718d4ef90b2..757ba45b422e 100644 --- a/autogpt_platform/backend/backend/util/cache.py +++ b/autogpt_platform/backend/backend/util/cache.py @@ -27,6 +27,7 @@ P = ParamSpec("P") R = TypeVar("R") R_co = TypeVar("R_co", covariant=True) +T = TypeVar("T") logger = logging.getLogger(__name__) settings = Settings() @@ -143,7 +144,7 @@ def cached( ttl_seconds: int, shared_cache: bool = False, refresh_ttl_on_get: bool = False, -) -> Callable[[Callable], CachedFunction]: +) -> Callable[[Callable[P, R]], CachedFunction[P, R]]: """ Thundering herd safe cache decorator for both sync and async functions. @@ -169,7 +170,7 @@ async def expensive_async_operation(param: str) -> dict: return {"result": param} """ - def decorator(target_func): + def decorator(target_func: Callable[P, R]) -> CachedFunction[P, R]: cache_storage: dict[tuple, CachedValue] = {} _event_loop_locks: dict[Any, asyncio.Lock] = {} @@ -386,7 +387,7 @@ def cache_delete(*args, **kwargs) -> bool: setattr(wrapper, "cache_info", cache_info) setattr(wrapper, "cache_delete", cache_delete) - return cast(CachedFunction, wrapper) + return cast(CachedFunction[P, R], wrapper) return decorator diff --git a/autogpt_platform/backend/backend/util/service.py b/autogpt_platform/backend/backend/util/service.py index b2c9ac060ee1..00b938c17050 100644 --- a/autogpt_platform/backend/backend/util/service.py +++ b/autogpt_platform/backend/backend/util/service.py @@ -28,6 +28,7 @@ import httpx import uvicorn from fastapi import FastAPI, Request, responses +from prisma.errors import DataError from pydantic import BaseModel, TypeAdapter, create_model import backend.util.exceptions as exceptions @@ -193,6 +194,7 @@ def __init__(self, status_code: int, message: str): e.__name__: e for e in [ ValueError, + DataError, RuntimeError, TimeoutError, ConnectionError, @@ -411,6 +413,9 @@ def run(self): self.fastapi_app.add_exception_handler( ValueError, self._handle_internal_http_error(400) ) + self.fastapi_app.add_exception_handler( + DataError, self._handle_internal_http_error(400) + ) self.fastapi_app.add_exception_handler( Exception, self._handle_internal_http_error(500) ) @@ -472,6 +477,7 @@ def _maybe_retry(fn: Callable[..., R]) -> Callable[..., R]: exclude_exceptions=( # Don't retry these specific exceptions that won't be fixed by retrying ValueError, # Invalid input/parameters + DataError, # Prisma data integrity errors (foreign key, unique constraints) KeyError, # Missing required data TypeError, # Wrong data types AttributeError, # Missing attributes diff --git a/autogpt_platform/backend/backend/util/test.py b/autogpt_platform/backend/backend/util/test.py index 0a2015254be5..dda62e7f9fc9 100644 --- a/autogpt_platform/backend/backend/util/test.py +++ b/autogpt_platform/backend/backend/util/test.py @@ -9,9 +9,9 @@ from backend.data import db from backend.data.block import Block, BlockSchema, initialize_blocks from backend.data.execution import ( + ExecutionContext, ExecutionStatus, NodeExecutionResult, - UserContext, get_graph_execution, ) from backend.data.model import _BaseCredentials @@ -141,7 +141,7 @@ async def async_mock( "node_exec_id": str(uuid.uuid4()), "user_id": str(uuid.uuid4()), "graph_version": 1, # Default version for tests - "user_context": UserContext(timezone="UTC"), # Default for tests + "execution_context": ExecutionContext(), } input_model = cast(type[BlockSchema], block.input_schema) credentials_input_fields = input_model.get_credentials_fields() diff --git a/autogpt_platform/backend/backend/util/timezone_utils.py b/autogpt_platform/backend/backend/util/timezone_utils.py index 6a6c438085e4..76614a835743 100644 --- a/autogpt_platform/backend/backend/util/timezone_utils.py +++ b/autogpt_platform/backend/backend/util/timezone_utils.py @@ -10,6 +10,8 @@ from croniter import croniter +from backend.data.model import USER_TIMEZONE_NOT_SET + logger = logging.getLogger(__name__) @@ -138,7 +140,7 @@ def get_user_timezone_or_utc(user_timezone: Optional[str]) -> str: Returns: Valid timezone string (user's preference or UTC fallback) """ - if not user_timezone or user_timezone == "not-set": + if not user_timezone or user_timezone == USER_TIMEZONE_NOT_SET: return "UTC" if validate_timezone(user_timezone): diff --git a/autogpt_platform/backend/migrations/20251128112407_add_library_agent_settings/migration.sql b/autogpt_platform/backend/migrations/20251128112407_add_library_agent_settings/migration.sql new file mode 100644 index 000000000000..a9cf141ce2d3 --- /dev/null +++ b/autogpt_platform/backend/migrations/20251128112407_add_library_agent_settings/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "LibraryAgent" ADD COLUMN "settings" JSONB NOT NULL DEFAULT '{}'; diff --git a/autogpt_platform/backend/schema.prisma b/autogpt_platform/backend/schema.prisma index ca015a3cb930..3693035802ed 100644 --- a/autogpt_platform/backend/schema.prisma +++ b/autogpt_platform/backend/schema.prisma @@ -266,6 +266,8 @@ model LibraryAgent { isArchived Boolean @default(false) isDeleted Boolean @default(false) + settings Json @default("{}") + @@unique([userId, agentGraphId, agentGraphVersion]) @@index([agentGraphId, agentGraphVersion]) @@index([creatorId]) @@ -478,26 +480,26 @@ enum ReviewStatus { // Pending human reviews for Human-in-the-loop blocks model PendingHumanReview { - nodeExecId String @id - userId String - graphExecId String - graphId String - graphVersion Int - payload Json // The actual payload data to be reviewed - instructions String? // Instructions/message for the reviewer - editable Boolean @default(true) // Whether the reviewer can edit the data - status ReviewStatus @default(WAITING) - reviewMessage String? // Optional message from the reviewer - wasEdited Boolean? // Whether the data was modified during review - processed Boolean @default(false) // Whether the review result has been processed by the execution engine - createdAt DateTime @default(now()) - updatedAt DateTime? @updatedAt - reviewedAt DateTime? - - User User @relation(fields: [userId], references: [id], onDelete: Cascade) - NodeExecution AgentNodeExecution @relation(fields: [nodeExecId], references: [id], onDelete: Cascade) - GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade) - + nodeExecId String @id + userId String + graphExecId String + graphId String + graphVersion Int + payload Json // The actual payload data to be reviewed + instructions String? // Instructions/message for the reviewer + editable Boolean @default(true) // Whether the reviewer can edit the data + status ReviewStatus @default(WAITING) + reviewMessage String? // Optional message from the reviewer + wasEdited Boolean? // Whether the data was modified during review + processed Boolean @default(false) // Whether the review result has been processed by the execution engine + createdAt DateTime @default(now()) + updatedAt DateTime? @updatedAt + reviewedAt DateTime? + + User User @relation(fields: [userId], references: [id], onDelete: Cascade) + NodeExecution AgentNodeExecution @relation(fields: [nodeExecId], references: [id], onDelete: Cascade) + GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade) + @@unique([nodeExecId]) // One pending review per node execution @@index([userId, status]) @@index([graphExecId, status]) @@ -704,7 +706,7 @@ view StoreAgent { agent_video String? agent_image String[] - featured Boolean @default(false) + featured Boolean @default(false) creator_username String? creator_avatar String? sub_heading String @@ -714,8 +716,8 @@ view StoreAgent { runs Int rating Float versions String[] - is_available Boolean @default(true) - useForOnboarding Boolean @default(false) + is_available Boolean @default(true) + useForOnboarding Boolean @default(false) // Materialized views used (refreshed every 15 minutes via pg_cron): // - mv_agent_run_counts - Pre-aggregated agent execution counts by agentGraphId diff --git a/autogpt_platform/backend/snapshots/grph_single b/autogpt_platform/backend/snapshots/grph_single index d9207eb205d6..7ba26f6171e8 100644 --- a/autogpt_platform/backend/snapshots/grph_single +++ b/autogpt_platform/backend/snapshots/grph_single @@ -9,6 +9,7 @@ "forked_from_id": null, "forked_from_version": null, "has_external_trigger": false, + "has_human_in_the_loop": false, "id": "graph-123", "input_schema": { "properties": {}, diff --git a/autogpt_platform/backend/snapshots/grphs_all b/autogpt_platform/backend/snapshots/grphs_all index 42f4174d7b7e..d54df2bc18dc 100644 --- a/autogpt_platform/backend/snapshots/grphs_all +++ b/autogpt_platform/backend/snapshots/grphs_all @@ -9,6 +9,7 @@ "forked_from_id": null, "forked_from_version": null, "has_external_trigger": false, + "has_human_in_the_loop": false, "id": "graph-123", "input_schema": { "properties": {}, diff --git a/autogpt_platform/backend/snapshots/lib_agts_search b/autogpt_platform/backend/snapshots/lib_agts_search index 4da74a6ab49f..27e9e6ec09e0 100644 --- a/autogpt_platform/backend/snapshots/lib_agts_search +++ b/autogpt_platform/backend/snapshots/lib_agts_search @@ -30,7 +30,10 @@ "can_access_graph": true, "is_latest_version": true, "is_favorite": false, - "recommended_schedule_cron": null + "recommended_schedule_cron": null, + "settings": { + "human_in_the_loop_safe_mode": null + } }, { "id": "test-agent-2", @@ -62,7 +65,10 @@ "can_access_graph": false, "is_latest_version": true, "is_favorite": false, - "recommended_schedule_cron": null + "recommended_schedule_cron": null, + "settings": { + "human_in_the_loop_safe_mode": null + } } ], "pagination": { diff --git a/autogpt_platform/backend/test_requeue_integration.py b/autogpt_platform/backend/test_requeue_integration.py index 95deb1f183bd..da1e00e357d9 100644 --- a/autogpt_platform/backend/test_requeue_integration.py +++ b/autogpt_platform/backend/test_requeue_integration.py @@ -59,10 +59,9 @@ def create_test_message(self, message_id: str, user_id: str = "test-user") -> st "graph_exec_id": f"exec-{message_id}", "graph_id": f"graph-{message_id}", "user_id": user_id, - "user_context": {"timezone": "UTC"}, + "execution_context": {"timezone": "UTC"}, "nodes_input_masks": {}, "starting_nodes_input": [], - "parent_graph_exec_id": None, } ) diff --git a/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/Flow/Flow.tsx b/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/Flow/Flow.tsx index 86b1e23871ab..d23bfb384949 100644 --- a/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/Flow/Flow.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/build/components/FlowEditor/Flow/Flow.tsx @@ -16,13 +16,28 @@ import { useCopyPaste } from "./useCopyPaste"; import { FloatingReviewsPanel } from "@/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel"; import { parseAsString, useQueryStates } from "nuqs"; import { CustomControls } from "./components/CustomControl"; +import { FloatingSafeModeToggle } from "@/components/molecules/FloatingSafeModeToggle/FloatingSafeModeToggle"; +import { useGetV1GetSpecificGraph } from "@/app/api/__generated__/endpoints/graphs/graphs"; +import { GraphModel } from "@/app/api/__generated__/models/graphModel"; +import { okData } from "@/app/api/helpers"; export const Flow = () => { - const [{ flowExecutionID }] = useQueryStates({ + const [{ flowID, flowExecutionID }] = useQueryStates({ flowID: parseAsString, flowExecutionID: parseAsString, }); + const { data: graph } = useGetV1GetSpecificGraph( + flowID ?? "", + {}, + { + query: { + select: okData, + enabled: !!flowID, + }, + }, + ); + const nodes = useNodeStore(useShallow((state) => state.nodes)); const onNodesChange = useNodeStore( useShallow((state) => state.onNodesChange), @@ -79,9 +94,19 @@ export const Flow = () => { {} {isGraphRunning && } + {graph && ( + + )} - + ); }; diff --git a/autogpt_platform/frontend/src/app/(platform)/build/components/legacy-builder/Flow/Flow.tsx b/autogpt_platform/frontend/src/app/(platform)/build/components/legacy-builder/Flow/Flow.tsx index 3e733eab964e..f80a48054262 100644 --- a/autogpt_platform/frontend/src/app/(platform)/build/components/legacy-builder/Flow/Flow.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/build/components/legacy-builder/Flow/Flow.tsx @@ -65,6 +65,7 @@ import NewControlPanel from "@/app/(platform)/build/components/NewControlPanel/N import { Flag, useGetFlag } from "@/services/feature-flags/use-get-flag"; import { BuildActionBar } from "../BuildActionBar"; import { FloatingReviewsPanel } from "@/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel"; +import { FloatingSafeModeToggle } from "@/components/molecules/FloatingSafeModeToggle/FloatingSafeModeToggle"; // This is for the history, this is the minimum distance a block must move before it is logged // It helps to prevent spamming the history with small movements especially when pressing on a input in a block @@ -927,6 +928,13 @@ const FlowEditor: React.FC<{ > + {savedAgent && ( + + )} {isNewBlockEnabled ? ( diff --git a/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/NewAgentLibraryView/components/selected-views/RunDetailHeader/RunDetailHeader.tsx b/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/NewAgentLibraryView/components/selected-views/RunDetailHeader/RunDetailHeader.tsx index af19d94cd5b9..7f9c5065d116 100644 --- a/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/NewAgentLibraryView/components/selected-views/RunDetailHeader/RunDetailHeader.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/library/agents/[id]/components/NewAgentLibraryView/components/selected-views/RunDetailHeader/RunDetailHeader.tsx @@ -14,6 +14,7 @@ import moment from "moment"; import { AgentActionsDropdown } from "../AgentActionsDropdown"; import { RunStatusBadge } from "../SelectedRunView/components/RunStatusBadge"; import { ShareRunButton } from "../ShareRunButton/ShareRunButton"; +import { FloatingSafeModeToggle } from "@/components/molecules/FloatingSafeModeToggle/FloatingSafeModeToggle"; import { useRunDetailHeader } from "./useRunDetailHeader"; type Props = { @@ -79,6 +80,11 @@ export function RunDetailHeader({ shareToken={run.share_token} /> )} + {!isRunning ? ( + + +
+
+ Safe Mode: {currentSafeMode! ? "ON" : "OFF"} +
+
+ {currentSafeMode! + ? "HITL blocks require manual review" + : "HITL blocks proceed automatically"} +
+
+
+ + + ); +} diff --git a/autogpt_platform/frontend/src/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel.tsx b/autogpt_platform/frontend/src/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel.tsx index feb4da96fe3d..183fd8599e85 100644 --- a/autogpt_platform/frontend/src/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel.tsx +++ b/autogpt_platform/frontend/src/components/organisms/FloatingReviewsPanel/FloatingReviewsPanel.tsx @@ -5,36 +5,50 @@ import { Button } from "@/components/atoms/Button/Button"; import { ClockIcon, XIcon } from "@phosphor-icons/react"; import { cn } from "@/lib/utils"; import { Text } from "@/components/atoms/Text/Text"; +import { useGetV1GetExecutionDetails } from "@/app/api/__generated__/endpoints/graphs/graphs"; import { AgentExecutionStatus } from "@/app/api/__generated__/models/agentExecutionStatus"; -import { useGraphStore } from "@/app/(platform)/build/stores/graphStore"; interface FloatingReviewsPanelProps { executionId?: string; + graphId?: string; className?: string; } export function FloatingReviewsPanel({ executionId, + graphId, className, }: FloatingReviewsPanelProps) { const [isOpen, setIsOpen] = useState(false); - const executionStatus = useGraphStore((state) => state.graphExecutionStatus); + const { data: executionDetails } = useGetV1GetExecutionDetails( + graphId || "", + executionId || "", + { + query: { + enabled: !!(graphId && executionId), + }, + }, + ); + + const executionStatus = + executionDetails?.status === 200 ? executionDetails.data.status : undefined; const { pendingReviews, isLoading, refetch } = usePendingReviewsForExecution( executionId || "", ); useEffect(() => { - if (executionStatus === AgentExecutionStatus.REVIEW && executionId) { + if (executionId) { refetch(); } }, [executionStatus, executionId, refetch]); if ( !executionId || - (!isLoading && pendingReviews.length === 0) || - executionStatus !== AgentExecutionStatus.REVIEW + (!isLoading && + pendingReviews.length === 0 && + executionStatus !== AgentExecutionStatus.REVIEW) ) { return null; }