From db0052b57bdaf227dbfe4cbd4c88424641d4730e Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Mon, 6 Oct 2025 12:00:16 +0100 Subject: [PATCH] Add recipe for human in the loop workflow to AI cookbook. --- cookbook/human-in-the-loop-python.mdx | 611 ++++++++++++++++++++++++++ 1 file changed, 611 insertions(+) create mode 100644 cookbook/human-in-the-loop-python.mdx diff --git a/cookbook/human-in-the-loop-python.mdx b/cookbook/human-in-the-loop-python.mdx new file mode 100644 index 0000000000..f2d61ccd06 --- /dev/null +++ b/cookbook/human-in-the-loop-python.mdx @@ -0,0 +1,611 @@ +--- +title: Human-in-the-Loop AI Agent +description: Build an AI agent that pauses for human approval before executing high-stakes actions using Temporal Signals. +tags: [agents, signals, python] +source: https://github.com/temporalio/ai-cookbook/tree/main/agents/human_in_the_loop_python +--- + +This recipe demonstrates how to build an AI agent that requires human approval before executing certain +actions. This pattern is essential for AI systems that make high-stakes decisions, such as financial +transactions, content publishing, or infrastructure changes. + +The key challenge in human-in-the-loop systems is maintaining state while waiting for human input, which +can take seconds, minutes, or even days. Temporal's durable execution ensures the workflow state persists +reliably during these waiting periods, and Signals provide a clean mechanism for external systems to send +approval decisions back to the running workflow. + +This recipe highlights these key design decisions: + +- Using Temporal Signals to receive human approval decisions asynchronously +- Implementing timeouts for approval requests to prevent workflows from waiting indefinitely +- Structuring workflows to clearly separate AI decision-making from human oversight +- Handling approval, rejection, and timeout scenarios gracefully +- Maintaining a complete audit trail of both AI decisions and human approvals + +## Understanding the Pattern + +In a human-in-the-loop AI agent, the workflow follows this pattern: + +1. **AI Analysis**: The LLM analyzes the request and proposes an action +2. **Approval Request**: The workflow pauses and waits for human approval via Signal +3. **Human Decision**: An external system (UI, API, etc.) sends approval/rejection via Signal +4. **Action Execution**: If approved, the workflow executes the action; if rejected or timed out, it + handles accordingly + +The workflow remains running and durable throughout the approval waiting period, even if the worker +restarts. + +## Create the Data Models + +We define structured data models for approval requests and responses to ensure type safety and clear +communication between the AI agent and human reviewers. + +*File: models/approval.py* +```python +from pydantic import BaseModel + +class ProposedAction(BaseModel): + """Action proposed by the AI agent for human review.""" + action_type: str + description: str + reasoning: str + +class ApprovalRequest(BaseModel): + """Request sent to human reviewer.""" + request_id: str + proposed_action: ProposedAction + context: str + requested_at: str + +class ApprovalDecision(BaseModel): + """Decision received from human reviewer.""" + request_id: str + approved: bool + reviewer_notes: str | None = None + decided_at: str +``` + +## Create the LLM Activity + +We create a generic activity for invoking the OpenAI API to analyze requests and propose actions. The activity +returns the text output from the LLM, which we'll parse into our structured models in the workflow. + +*File: activities/openai_responses.py* +```python +from temporalio import activity +from openai import AsyncOpenAI +from dataclasses import dataclass + +@dataclass +class OpenAIResponsesRequest: + model: str + instructions: str + input: str + +@activity.defn +async def create(request: OpenAIResponsesRequest) -> str: + # Temporal best practice: Disable retry logic in OpenAI API client library. + client = AsyncOpenAI(max_retries=0) + + resp = await client.responses.create( + model=request.model, + instructions=request.instructions, + input=request.input, + timeout=30, + ) + + return resp.output_text +``` + +## Create the Action Execution Activity + +This activity executes the approved action. In a real system, this might call external APIs, modify +databases, or trigger other workflows. For this example, we'll simulate an action with a simple logging +activity. + +*File: activities/execute_action.py* +```python +from temporalio import activity +from models.approval import ProposedAction +import asyncio + +@activity.defn +async def execute_action(action: ProposedAction) -> str: + """Execute the approved action. + + In a real system, this would call external APIs, modify databases, + or trigger other workflows based on the action_type. + """ + activity.logger.info( + f"Executing action: {action.action_type}", + extra={"action": action.model_dump()} + ) + + return f"Successfully executed: {action.action_type}" +``` + +## Create the Notification Activity + +This activity notifies external systems (like a UI or messaging system) that approval is needed. In a real +implementation, this might send emails, Slack messages, or push notifications. + +*File: activities/notify_approval_needed.py* +```python +from temporalio import activity +from models.approval import ApprovalRequest + +@activity.defn +async def notify_approval_needed(request: ApprovalRequest) -> None: + """Notify external systems that human approval is needed. + + In a real system, this would send notifications via: + - Email + - Slack/Teams messages + - Push notifications to mobile apps + - Updates to approval queue UI + """ + # Get workflow ID from activity context + workflow_id = activity.info().workflow_id + + activity.logger.info( + f"Approval needed for request: {request.request_id}", + extra={"request": request.model_dump()} + ) + + # In a real implementation, you would call notification services here + print(f"\n{'='*60}") + print(f"APPROVAL REQUIRED") + print(f"{'='*60}") + print(f"Workflow ID: {workflow_id}") + print(f"Request ID: {request.request_id}") + print(f"Action: {request.proposed_action.action_type}") + print(f"Description: {request.proposed_action.description}") + print(f"Reasoning: {request.proposed_action.reasoning}") + print(f"\nTo approve or reject, use the send_approval script:") + print(f" uv run python -m send_approval {workflow_id} {request.request_id} approve 'Looks good'") + print(f" uv run python -m send_approval {workflow_id} {request.request_id} reject 'Too risky'") + print(f"{'='*60}\n") +``` + +## Create the Human-in-the-Loop Workflow + +The workflow orchestrates the entire human-in-the-loop process. It uses Temporal Signals to receive +approval decisions and implements a timeout to prevent indefinite waiting. + +Key features: +- **Signal Handler**: Receives approval decisions asynchronously +- **Condition with Timeout**: Waits for approval with a configurable timeout +- **State Management**: Tracks approval status durably across worker restarts +- **Audit Trail**: Logs all decisions for compliance and debugging + +*File: workflows/human_in_the_loop_workflow.py* +```python +from temporalio import workflow +from datetime import timedelta +from typing import Optional +import asyncio + +with workflow.unsafe.imports_passed_through(): + from models.approval import ProposedAction, ApprovalRequest, ApprovalDecision + from activities import openai_responses, execute_action, notify_approval_needed + + +@workflow.defn +class HumanInTheLoopWorkflow: + def __init__(self): + self.current_decision: Optional[ApprovalDecision] = None + self.pending_request_id: Optional[str] = None + + @workflow.signal + async def approval_decision(self, decision: ApprovalDecision): + """Signal handler for receiving approval decisions from humans.""" + # Verify this decision is for the current pending request + if decision.request_id == self.pending_request_id: + self.current_decision = decision + workflow.logger.info( + f"Received approval decision: {'approved' if decision.approved else 'rejected'}", + extra={"decision": decision.model_dump()} + ) + else: + workflow.logger.warning( + f"Received decision for wrong request ID: {decision.request_id}, expected: {self.pending_request_id}" + ) + + @workflow.run + async def run(self, user_request: str, approval_timeout_seconds: int = 300) -> str: + """Execute an AI agent workflow with human-in-the-loop approval. + + Args: + user_request: The user's request for the AI agent + approval_timeout_seconds: How long to wait for approval (default: 5 minutes) + + Returns: + Result of the workflow execution + """ + workflow.logger.info(f"Starting human-in-the-loop workflow for request: {user_request}") + + # Step 1: AI analyzes the request and proposes an action + proposed_action = await self._analyze_and_propose_action(user_request) + + workflow.logger.info( + f"AI proposed action: {proposed_action.action_type}", + extra={"proposed_action": proposed_action.model_dump()} + ) + + # Step 2: Request human approval + approval_result = await self._request_approval( + proposed_action, + user_request, + timeout_seconds=approval_timeout_seconds + ) + + # Step 3: Handle the approval result + if approval_result == "approved": + workflow.logger.info("Action approved, proceeding with execution") + result = await workflow.execute_activity( + execute_action.execute_action, + proposed_action, + start_to_close_timeout=timedelta(seconds=60), + ) + return f"Action completed successfully: {result}" + + elif approval_result == "rejected": + workflow.logger.info("Action rejected by human reviewer") + reviewer_notes = self.current_decision.reviewer_notes or 'None provided' + return f"Action rejected. Reviewer notes: {reviewer_notes}" + + else: # timeout + workflow.logger.warning("Approval request timed out") + timeout_msg = f"Action cancelled: approval request timed out after {approval_timeout_seconds} seconds" + return timeout_msg + + async def _analyze_and_propose_action(self, user_request: str) -> ProposedAction: + """Use LLM to analyze request and propose an action.""" + system_instructions = """ +You are an AI assistant that analyzes user requests and proposes actions. +For each request, you should: +1. Determine what action needs to be taken +2. Provide a clear description of the action +3. Explain your reasoning for why this action addresses the request + +Be thorough and clear in your analysis. + +Respond with a JSON string in this structure: + +{ + "action_type": "A short name for the action (e.g., \\"delete_test_data\\")", + "description": "A clear description of what the action will do", + "reasoning": "Your explanation for why this action addresses the request" +} +""" + + result = await workflow.execute_activity( + openai_responses.create, + openai_responses.OpenAIResponsesRequest( + model="gpt-4o-mini", + instructions=system_instructions, + input=user_request, + ), + start_to_close_timeout=timedelta(seconds=30), + ) + + # Parse the JSON output into our ProposedAction model + return ProposedAction.model_validate_json(result) + + async def _request_approval( + self, + proposed_action: ProposedAction, + context: str, + timeout_seconds: int + ) -> str: + """Request human approval and wait for response. + + Returns: + "approved", "rejected", or "timeout" + """ + # Generate unique request ID using workflow's deterministic UUID + self.current_decision = None + self.pending_request_id = str(workflow.uuid4()) + + # Create approval request + approval_request = ApprovalRequest( + request_id=self.pending_request_id, + proposed_action=proposed_action, + context=context, + requested_at=workflow.now().isoformat(), + ) + + # Send notification to external systems + await workflow.execute_activity( + notify_approval_needed.notify_approval_needed, + approval_request, + start_to_close_timeout=timedelta(seconds=10), + ) + + # Wait for approval decision with timeout + try: + await workflow.wait_condition( + lambda: self.current_decision is not None, + timeout=timedelta(seconds=timeout_seconds), + ) + + # Decision received + if self.current_decision.approved: + return "approved" + else: + return "rejected" + + except asyncio.TimeoutError: + # Timeout waiting for approval + return "timeout" +``` + +## Create the Worker + +The worker runs the workflow and activities. It uses the `pydantic_data_converter` to handle serialization +of our Pydantic models. + +*File: worker.py* +```python +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from workflows.human_in_the_loop_workflow import HumanInTheLoopWorkflow +from activities.openai_responses import create +from activities.execute_action import execute_action +from activities.notify_approval_needed import notify_approval_needed +from temporalio.contrib.pydantic import pydantic_data_converter + + +async def main(): + # Configure logging to see workflow.logger output + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + client = await Client.connect( + "localhost:7233", + data_converter=pydantic_data_converter, + ) + + worker = Worker( + client, + task_queue="human-in-the-loop-task-queue", + workflows=[HumanInTheLoopWorkflow], + activities=[ + create, + execute_action, + notify_approval_needed, + ], + ) + + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +## Create the Workflow Starter + +The starter script initiates the workflow and waits for completion. The workflow will pause for human +approval, so this script will block until the approval is received or times out. + +*File: start_workflow.py* +```python +import asyncio +import sys +import uuid + +from temporalio.client import Client + +from workflows.human_in_the_loop_workflow import HumanInTheLoopWorkflow +from temporalio.contrib.pydantic import pydantic_data_converter + + +async def main(): + client = await Client.connect( + "localhost:7233", + data_converter=pydantic_data_converter, + ) + + default_request = "Delete all test data from the production database" + user_request = sys.argv[1] if len(sys.argv) > 1 else default_request + + # Use a unique workflow ID so you can run multiple instances + workflow_id = f"human-in-the-loop-{uuid.uuid4()}" + + print(f"Starting workflow with ID: {workflow_id}") + print(f"User request: {user_request}") + print("\nWorkflow will pause for approval. Watch the worker output for instructions.\n") + + # Submit the workflow for execution + result = await client.execute_workflow( + HumanInTheLoopWorkflow.run, + args=[user_request, 300], # user_request and approval timeout in seconds + id=workflow_id, + task_queue="human-in-the-loop-task-queue", + ) + + print(f"\nWorkflow completed!") + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +## Create a Signal Sender Script + +This helper script makes it easy to send approval decisions to running workflows. In a real system, this +would be replaced by a UI, API endpoint, or other integration. + +*File: send_approval.py* +```python +import asyncio +import sys +from datetime import datetime, timezone + +from temporalio.client import Client +from temporalio.contrib.pydantic import pydantic_data_converter +from models.approval import ApprovalDecision + + +async def main(): + if len(sys.argv) < 4: + print("Usage: python -m send_approval [notes]") + print("\nExample:") + example = " python -m send_approval human-in-the-loop-123 abc-def-ghi approve 'Looks good'" + print(example) + sys.exit(1) + + workflow_id = sys.argv[1] + request_id = sys.argv[2] + decision = sys.argv[3].lower() + notes = sys.argv[4] if len(sys.argv) > 4 else None + + if decision not in ["approve", "reject"]: + print("Decision must be 'approve' or 'reject'") + sys.exit(1) + + client = await Client.connect( + "localhost:7233", + data_converter=pydantic_data_converter, + ) + + approval_decision = ApprovalDecision( + request_id=request_id, + approved=(decision == "approve"), + reviewer_notes=notes, + decided_at=datetime.now(timezone.utc).isoformat(), + ) + + # Get workflow handle and send signal + handle = client.get_workflow_handle(workflow_id) + await handle.signal("approval_decision", approval_decision) + + decision_type = 'approval' if approval_decision.approved else 'rejection' + print(f"Sent {decision_type} signal to workflow {workflow_id}") + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +## Running + +### Start the Temporal Dev Server + +```bash +temporal server start-dev +``` + +### Install dependencies + +From the recipe directory: + +```bash +uv sync +``` + +### Run the worker + +First set the `OPENAI_API_KEY` environment variable, then: + +```bash +uv run python -m worker +``` + +### Start a workflow + +In a new terminal: + +```bash +uv run python -m start_workflow "Delete all test data from the production database" +``` + +The workflow will start, the AI will analyze the request, and then it will pause waiting for approval. The +worker output will show instructions for sending the approval signal. + +### Send approval decision + +Watch the worker output for the workflow ID and request ID, then send an approval: + +```bash +uv run python -m send_approval approve "Verified this is safe to execute" +``` + +Or reject it: + +```bash +uv run python -m send_approval reject "This looks too risky" +``` + +Alternatively, you can use the Temporal CLI directly: + +```bash +temporal workflow signal \ + --workflow-id \ + --name approval_decision \ + --input '{"request_id": "", "approved": true, \ + "reviewer_notes": "Approved", "decided_at": "2025-01-01T12:00:00Z"}' +``` + +## Testing Timeout Behavior + +To test the timeout behavior, start a workflow and simply don't send any approval signal. After the +timeout period (default 300 seconds / 5 minutes), the workflow will automatically complete with a timeout +result. + +You can also set a shorter timeout for testing: + +```python +# In start_workflow.py, change the timeout parameter: +result = await client.execute_workflow( + HumanInTheLoopWorkflow.run, + args=[user_request, 30], # 30 second timeout for testing + id=workflow_id, + task_queue="human-in-the-loop-task-queue", +) +``` + +## Key Patterns and Best Practices + +### Durable Waiting + +The workflow can wait for hours or days for human approval without consuming resources. The workflow state +is persisted, so even if the worker crashes and restarts, the workflow will resume from where it left off. + +### Signal Validation + +The workflow validates that incoming approval signals match the current pending request ID. This prevents +confusion if multiple approval requests are in flight or if stale approvals arrive. + +### Timeout Handling + +Always implement timeouts for human approval requests. Without timeouts, workflows could wait indefinitely +if approvals are forgotten or lost. + +### Audit Trail + +Every decision (AI proposal, human approval/rejection, timeout) is logged with full context. This creates +a complete audit trail for compliance and debugging. + +### Idempotency + +The workflow uses unique request IDs to ensure each approval request is distinct. This prevents accidental +double-approvals and makes it safe to retry operations. + +## Extensions + +This basic pattern can be extended in many ways: + +- **Multiple Approvers**: Require approval from multiple people or implement voting +- **Escalation**: Automatically escalate to higher authority if initial approver doesn't respond +- **Conditional Approval**: Only require approval for high-risk actions +- **Approval Chains**: Implement multi-stage approval workflows +- **Rich Notifications**: Integrate with Slack, email, or custom UIs for approval requests +- **Query Handlers**: Add query handlers to check approval status without modifying workflow state