diff --git a/praisonai_tools/n8n/n8n_workflow.py b/praisonai_tools/n8n/n8n_workflow.py index 3bc67ba..b5d59c6 100644 --- a/praisonai_tools/n8n/n8n_workflow.py +++ b/praisonai_tools/n8n/n8n_workflow.py @@ -21,7 +21,7 @@ import os import logging -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union from praisonai_tools.tools.base import BaseTool from praisonai_tools.tools.decorator import tool @@ -53,12 +53,12 @@ def run( wait_for_completion: bool = True, **kwargs ) -> Dict[str, Any]: - """Execute an n8n workflow and return the result. + """Execute an n8n workflow via webhook trigger. Args: workflow_id: The n8n workflow ID to execute input_data: Input data to pass to the workflow - wait_for_completion: Whether to wait for workflow completion + wait_for_completion: Whether to wait for workflow completion (ignored for webhook execution) Returns: Workflow execution result @@ -72,78 +72,104 @@ def run( except ImportError: return {"error": "httpx not installed. Install with: pip install 'praisonai-tools[n8n]'"} - # Prepare headers - headers = {"Content-Type": "application/json"} - if self.api_key: - headers["X-N8N-API-KEY"] = self.api_key + # Fetch workflow to find webhook trigger + webhook_url = self._get_webhook_url(workflow_id) + if isinstance(webhook_url, dict) and "error" in webhook_url: + return webhook_url # Return error from _get_webhook_url - # Execute workflow + # Execute workflow via webhook (no API key needed for webhooks) try: with httpx.Client(timeout=self.timeout) as client: response = client.post( - f"{self.n8n_url}/api/v1/workflows/{workflow_id}/execute", - json={"data": input_data or {}}, - headers=headers, + webhook_url, + json=input_data or {}, + headers={"Content-Type": "application/json"}, ) response.raise_for_status() - result = response.json() - - if wait_for_completion and result.get("executionId"): - # Poll for completion - execution_id = result["executionId"] - return self._wait_for_execution(client, execution_id, headers) - - return result + # Try to parse JSON response, fallback to text if not JSON + try: + return response.json() + except ValueError: + return {"result": response.text, "status_code": response.status_code} except httpx.TimeoutException: - logger.error(f"n8n workflow {workflow_id} timed out after {self.timeout}s") - return {"error": f"Workflow execution timed out after {self.timeout} seconds"} + logger.error(f"n8n workflow {workflow_id} webhook timed out after {self.timeout}s") + return {"error": f"Workflow webhook execution timed out after {self.timeout} seconds"} except httpx.HTTPStatusError as e: - logger.error(f"n8n API error: {e.response.status_code} - {e.response.text}") + logger.error(f"n8n webhook error: {e.response.status_code} - {e.response.text}") return {"error": f"HTTP {e.response.status_code}: {e.response.text}"} except Exception as e: - logger.error(f"n8n workflow execution error: {e}") + logger.error(f"n8n workflow webhook execution error: {e}") return {"error": str(e)} - def _wait_for_execution( - self, - client: "httpx.Client", - execution_id: str, - headers: Dict[str, str], - max_wait: Optional[int] = None, - poll_interval: int = 2, - ) -> Dict[str, Any]: - """Wait for workflow execution to complete.""" - import time + def _get_webhook_url(self, workflow_id: str) -> Union[str, Dict[str, Any]]: + """Get webhook URL for a workflow by finding its webhook trigger node. - # Use configured timeout if max_wait is not provided - if max_wait is None: - max_wait = int(self.timeout) + Args: + workflow_id: The n8n workflow ID + + Returns: + Webhook URL string or error dict + """ + try: + import httpx + except ImportError: + return {"error": "httpx not installed"} - waited = 0 - while waited < max_wait: - try: + # Prepare headers for API calls + headers = {"Content-Type": "application/json"} + if self.api_key: + headers["X-N8N-API-KEY"] = self.api_key + + try: + with httpx.Client(timeout=30.0) as client: + # Fetch workflow definition response = client.get( - f"{self.n8n_url}/api/v1/executions/{execution_id}", + f"{self.n8n_url}/api/v1/workflows/{workflow_id}", headers=headers, ) response.raise_for_status() + workflow = response.json() - execution = response.json() - status = execution.get("status") + # Find webhook trigger node + nodes = workflow.get("nodes", []) + webhook_node = None + for node in nodes: + if node.get("type") == "n8n-nodes-base.webhook": + webhook_node = node + break - if status in ["success", "error", "canceled"]: - return execution + if not webhook_node: + return {"error": "Workflow has no Webhook trigger node. Add a Webhook trigger to enable external execution."} - time.sleep(poll_interval) - waited += poll_interval + # Extract webhook path + parameters = webhook_node.get("parameters", {}) + path = parameters.get("path", "") + if not path: + return {"error": "Webhook trigger node has no path configured"} - except Exception as e: - logger.error(f"Error polling execution {execution_id}: {e}") - return {"error": f"Error polling execution: {e}"} - - return {"error": f"Execution {execution_id} did not complete within {max_wait} seconds"} + # Ensure workflow is active + if not workflow.get("active"): + activate_response = client.patch( + f"{self.n8n_url}/api/v1/workflows/{workflow_id}", + json={"active": True}, + headers=headers, + ) + activate_response.raise_for_status() + logger.info(f"Activated workflow {workflow_id}") + + # Construct webhook URL + # Use /webhook-test/ for test mode, /webhook/ for production + webhook_url = f"{self.n8n_url}/webhook/{path}" + return webhook_url + + except httpx.HTTPStatusError as e: + logger.error(f"n8n API error fetching workflow {workflow_id}: {e.response.status_code} - {e.response.text}") + return {"error": f"HTTP {e.response.status_code}: {e.response.text}"} + except Exception as e: + logger.error(f"Error fetching workflow {workflow_id}: {e}") + return {"error": str(e)} def list_workflows(self) -> Dict[str, Any]: """List available n8n workflows.""" diff --git a/tests/test_n8n_integration.py b/tests/test_n8n_integration.py index f730585..f37e984 100644 --- a/tests/test_n8n_integration.py +++ b/tests/test_n8n_integration.py @@ -89,19 +89,40 @@ def test_n8n_workflow_missing_httpx(self): assert "pip install 'praisonai-tools[n8n]'" in result["error"] def test_n8n_workflow_successful_execution(self, mock_httpx): - """Test successful workflow execution.""" + """Test successful workflow execution via webhook.""" from praisonai_tools.n8n import N8nWorkflowTool - # Mock httpx response - mock_response = Mock() - mock_response.json.return_value = { - "executionId": "exec-123", - "status": "running" + # Mock workflow fetch response + mock_workflow_response = Mock() + mock_workflow_response.json.return_value = { + "id": "test-workflow", + "name": "Test Workflow", + "active": True, + "nodes": [ + { + "id": "webhook-node", + "name": "Webhook", + "type": "n8n-nodes-base.webhook", + "parameters": { + "path": "test-webhook", + "httpMethod": "POST" + } + } + ] } - mock_response.raise_for_status.return_value = None + mock_workflow_response.raise_for_status.return_value = None + + # Mock webhook execution response + mock_webhook_response = Mock() + mock_webhook_response.json.return_value = { + "result": "success", + "message": "Webhook executed successfully" + } + mock_webhook_response.raise_for_status.return_value = None mock_client = Mock() - mock_client.post.return_value = mock_response + mock_client.get.return_value = mock_workflow_response + mock_client.post.return_value = mock_webhook_response mock_httpx.Client.return_value.__enter__.return_value = mock_client tool = N8nWorkflowTool(api_key="test-key") @@ -111,27 +132,33 @@ def test_n8n_workflow_successful_execution(self, mock_httpx): wait_for_completion=False ) - assert result["executionId"] == "exec-123" - assert result["status"] == "running" + assert result["result"] == "success" + assert result["message"] == "Webhook executed successfully" - # Verify API call - mock_client.post.assert_called_once_with( - "http://localhost:5678/api/v1/workflows/test-workflow/execute", - json={"data": {"message": "Hello"}}, + # Verify workflow fetch call + mock_client.get.assert_called_once_with( + "http://localhost:5678/api/v1/workflows/test-workflow", headers={"Content-Type": "application/json", "X-N8N-API-KEY": "test-key"}, ) + + # Verify webhook execution call + mock_client.post.assert_called_once_with( + "http://localhost:5678/webhook/test-webhook", + json={"message": "Hello"}, + headers={"Content-Type": "application/json"}, + ) def test_n8n_workflow_http_error(self, mock_httpx): - """Test HTTP error handling.""" + """Test HTTP error handling during workflow fetch.""" from praisonai_tools.n8n import N8nWorkflowTool - # Mock HTTP error + # Mock HTTP error during workflow fetch mock_response = Mock() mock_response.status_code = 401 mock_response.text = "Unauthorized" mock_client = Mock() - mock_client.post.side_effect = mock_httpx.HTTPStatusError( + mock_client.get.side_effect = mock_httpx.HTTPStatusError( "401 Unauthorized", request=Mock(), response=mock_response ) mock_httpx.Client.return_value.__enter__.return_value = mock_client @@ -142,17 +169,47 @@ def test_n8n_workflow_http_error(self, mock_httpx): assert "HTTP 401: Unauthorized" in result["error"] def test_n8n_workflow_timeout_error(self, mock_httpx): - """Test timeout error handling.""" + """Test timeout error handling during workflow fetch.""" from praisonai_tools.n8n import N8nWorkflowTool mock_client = Mock() - mock_client.post.side_effect = mock_httpx.TimeoutException("Request timed out") + mock_client.get.side_effect = mock_httpx.TimeoutException("Request timed out") mock_httpx.Client.return_value.__enter__.return_value = mock_client tool = N8nWorkflowTool(timeout=5.0) result = tool.run(workflow_id="test-workflow") - assert "timed out after 5.0 seconds" in result["error"] + assert "Request timed out" in result["error"] + + def test_n8n_workflow_no_webhook_trigger(self, mock_httpx): + """Test error handling when workflow has no webhook trigger.""" + from praisonai_tools.n8n import N8nWorkflowTool + + # Mock workflow fetch response with no webhook trigger + mock_workflow_response = Mock() + mock_workflow_response.json.return_value = { + "id": "test-workflow", + "name": "Test Workflow", + "active": True, + "nodes": [ + { + "id": "manual-node", + "name": "Manual Trigger", + "type": "n8n-nodes-base.manualTrigger", + "parameters": {} + } + ] + } + mock_workflow_response.raise_for_status.return_value = None + + mock_client = Mock() + mock_client.get.return_value = mock_workflow_response + mock_httpx.Client.return_value.__enter__.return_value = mock_client + + tool = N8nWorkflowTool(api_key="test-key") + result = tool.run(workflow_id="test-workflow") + + assert "Workflow has no Webhook trigger node" in result["error"] def test_n8n_list_workflows(self, mock_httpx): """Test listing workflows."""