Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 77 additions & 51 deletions praisonai_tools/n8n/n8n_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import os
import logging
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Union

from praisonai_tools.tools.base import BaseTool
from praisonai_tools.tools.decorator import tool
Expand Down Expand Up @@ -53,12 +53,12 @@ def run(
wait_for_completion: bool = True,
**kwargs
) -> Dict[str, Any]:
"""Execute an n8n workflow and return the result.
"""Execute an n8n workflow via webhook trigger.

Args:
workflow_id: The n8n workflow ID to execute
input_data: Input data to pass to the workflow
wait_for_completion: Whether to wait for workflow completion
wait_for_completion: Whether to wait for workflow completion (ignored for webhook execution)

Returns:
Workflow execution result
Expand All @@ -72,78 +72,104 @@ def run(
except ImportError:
return {"error": "httpx not installed. Install with: pip install 'praisonai-tools[n8n]'"}

# Prepare headers
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["X-N8N-API-KEY"] = self.api_key
# Fetch workflow to find webhook trigger
webhook_url = self._get_webhook_url(workflow_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _get_webhook_url method is called on every execution of run. This performs an additional API request to fetch the workflow definition every time, which is inefficient and adds latency. Consider caching the resolved webhook URL to improve performance for repeated executions.

if isinstance(webhook_url, dict) and "error" in webhook_url:
return webhook_url # Return error from _get_webhook_url

# Execute workflow
# Execute workflow via webhook (no API key needed for webhooks)
try:
with httpx.Client(timeout=self.timeout) as client:
response = client.post(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The tool hardcodes the POST method for the webhook call. While POST is the most common method for webhooks, n8n allows configuring the Webhook node to respond to other methods (GET, PUT, etc.). If a workflow is configured with a different method, this call will fail with a 405 error. It would be more robust to use the method defined in the workflow's webhook node parameters.

f"{self.n8n_url}/api/v1/workflows/{workflow_id}/execute",
json={"data": input_data or {}},
headers=headers,
webhook_url,
json=input_data or {},
headers={"Content-Type": "application/json"},
)
Comment on lines 82 to 87
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The webhook node’s parameters.httpMethod is read in tests and is a configurable part of n8n’s Webhook trigger, but run() always calls client.post(...). Workflows configured with a GET/PUT webhook will fail to execute. Consider having _get_webhook_url() return both the webhook URL and HTTP method (or accept method as an override) and use client.request(method, ...) instead of hard-coding POST.

Copilot uses AI. Check for mistakes.
response.raise_for_status()
Comment on lines 81 to 88
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

n8n Webhook trigger node default httpMethod parameter values

💡 Result:

The n8n Webhook trigger node has an HTTP Method parameter that is configurable. There is no single fixed "default" value explicitly documented in the official docs, but multiple authoritative sources indicate it defaults to GET when using a single method. When "Allow Multiple HTTP Methods" is disabled (default behavior), the node accepts a single HTTP method, and documentation examples consistently use GET (e.g., curl examples with --request GET, references to selecting GET). When enabled, it defaults to accepting both GET and POST. In the source code, the runtime falls back to 'GET' via (parameters?.httpMethod ?? 'GET') when matching webhooks, confirming GET as the effective default for unset parameters.

Citations:


🏁 Script executed:

# First, locate and read the relevant file
find . -name "n8n_workflow.py" -type f

Repository: MervinPraison/PraisonAI-Tools

Length of output: 110


🏁 Script executed:

# Read the file to examine lines around 81-88 and 146-150
cat -n ./praisonai_tools/n8n/n8n_workflow.py | head -160

Repository: MervinPraison/PraisonAI-Tools

Length of output: 7331


🏁 Script executed:

# Continue reading the file from line 160 onwards to see the full webhook URL construction
cat -n ./praisonai_tools/n8n/n8n_workflow.py | tail -n +160

Repository: MervinPraison/PraisonAI-Tools

Length of output: 5173


Fix hardcoded POST method — webhook HTTP method is ignored.

The code extracts the webhook path at lines 146–150 but ignores the httpMethod parameter stored in the webhook node (e.g., GET, PUT, DELETE). The run() method hardcodes client.post() at line 83, causing any workflow with a non-POST webhook trigger to fail with 404 or 405. Per n8n documentation, the default is GET when not explicitly set, making this a common failure case.

Extract httpMethod from webhook_node.parameters in _get_webhook_url(), pass it back to run(), and dispatch the request using the appropriate method (client.get(), client.post(), client.put(), client.delete(), etc.). If the configured method is unsupported, return a clear error before attempting the request.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@praisonai_tools/n8n/n8n_workflow.py` around lines 81 - 88, The webhook HTTP
method is being ignored because run() always calls client.post(); update
_get_webhook_url() to read httpMethod from webhook_node.parameters (default to
"GET" per n8n), return the method along with the webhook_url, then change run()
to dispatch the request based on that method (use client.get(), client.post(),
client.put(), client.delete(), client.patch(), client.head(), client.options()
as appropriate) and validate the method before making the call—if the method is
unsupported return a clear error instead of attempting the request.


result = response.json()

if wait_for_completion and result.get("executionId"):
# Poll for completion
execution_id = result["executionId"]
return self._wait_for_execution(client, execution_id, headers)

return result
# Try to parse JSON response, fallback to text if not JSON
try:
return response.json()
except ValueError:
return {"result": response.text, "status_code": response.status_code}

except httpx.TimeoutException:
logger.error(f"n8n workflow {workflow_id} timed out after {self.timeout}s")
return {"error": f"Workflow execution timed out after {self.timeout} seconds"}
logger.error(f"n8n workflow {workflow_id} webhook timed out after {self.timeout}s")
return {"error": f"Workflow webhook execution timed out after {self.timeout} seconds"}
except httpx.HTTPStatusError as e:
logger.error(f"n8n API error: {e.response.status_code} - {e.response.text}")
logger.error(f"n8n webhook error: {e.response.status_code} - {e.response.text}")
return {"error": f"HTTP {e.response.status_code}: {e.response.text}"}
except Exception as e:
logger.error(f"n8n workflow execution error: {e}")
logger.error(f"n8n workflow webhook execution error: {e}")
return {"error": str(e)}

def _wait_for_execution(
self,
client: "httpx.Client",
execution_id: str,
headers: Dict[str, str],
max_wait: Optional[int] = None,
poll_interval: int = 2,
) -> Dict[str, Any]:
"""Wait for workflow execution to complete."""
import time
def _get_webhook_url(self, workflow_id: str) -> Union[str, Dict[str, Any]]:
"""Get webhook URL for a workflow by finding its webhook trigger node.

# Use configured timeout if max_wait is not provided
if max_wait is None:
max_wait = int(self.timeout)
Args:
workflow_id: The n8n workflow ID

Returns:
Webhook URL string or error dict
"""
try:
import httpx
except ImportError:
return {"error": "httpx not installed"}

waited = 0
while waited < max_wait:
try:
# Prepare headers for API calls
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["X-N8N-API-KEY"] = self.api_key

try:
with httpx.Client(timeout=30.0) as client:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The timeout for fetching the workflow definition is hardcoded to 30.0 seconds. It should use self.timeout to remain consistent with the tool's configuration.

Suggested change
with httpx.Client(timeout=30.0) as client:
with httpx.Client(timeout=self.timeout) as client:

Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_get_webhook_url() uses a hard-coded httpx.Client(timeout=30.0) for workflow fetch/activation, while run() uses self.timeout for the webhook call. This makes the timeout constructor argument apply only to the webhook request, not the initial workflow lookup/activation, which is surprising and can cause much longer hangs than configured. Consider using timeout=self.timeout here (or introducing a clearly named separate API-timeout setting).

Suggested change
with httpx.Client(timeout=30.0) as client:
with httpx.Client(timeout=self.timeout) as client:

Copilot uses AI. Check for mistakes.
# Fetch workflow definition
response = client.get(
f"{self.n8n_url}/api/v1/executions/{execution_id}",
f"{self.n8n_url}/api/v1/workflows/{workflow_id}",
headers=headers,
)
response.raise_for_status()
workflow = response.json()

execution = response.json()
status = execution.get("status")
# Find webhook trigger node
nodes = workflow.get("nodes", [])
webhook_node = None
for node in nodes:
if node.get("type") == "n8n-nodes-base.webhook":
webhook_node = node
break

if status in ["success", "error", "canceled"]:
return execution
if not webhook_node:
return {"error": "Workflow has no Webhook trigger node. Add a Webhook trigger to enable external execution."}

time.sleep(poll_interval)
waited += poll_interval
# Extract webhook path
parameters = webhook_node.get("parameters", {})
path = parameters.get("path", "")
if not path:
return {"error": "Webhook trigger node has no path configured"}

except Exception as e:
logger.error(f"Error polling execution {execution_id}: {e}")
return {"error": f"Error polling execution: {e}"}

return {"error": f"Execution {execution_id} did not complete within {max_wait} seconds"}
# Ensure workflow is active
if not workflow.get("active"):
activate_response = client.patch(
f"{self.n8n_url}/api/v1/workflows/{workflow_id}",
json={"active": True},
headers=headers,
)
activate_response.raise_for_status()
logger.info(f"Activated workflow {workflow_id}")
Comment on lines +152 to +160
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

n8n public REST API v1 activate workflow endpoint path and method

💡 Result:

The n8n public REST API v1 endpoint to activate a workflow is POST /api/v1/workflows/:id/activate, where :id is the workflow ID.

Citations:


Use the dedicated activate endpoint instead of PATCH with {"active": true}.

The n8n public REST API v1 exposes a dedicated activate endpoint (POST /api/v1/workflows/{id}/activate). The current PATCH approach with just {"active": true} will fail because the PATCH endpoint requires the full workflow body and the active field is not writable via that route—this will return 400 or 405 on a real inactive workflow, causing the activation to fail.

🔧 Fix
-                # Ensure workflow is active
-                if not workflow.get("active"):
-                    activate_response = client.patch(
-                        f"{self.n8n_url}/api/v1/workflows/{workflow_id}",
-                        json={"active": True},
-                        headers=headers,
-                    )
-                    activate_response.raise_for_status()
-                    logger.info(f"Activated workflow {workflow_id}")
+                # Ensure workflow is active
+                if not workflow.get("active"):
+                    activate_response = client.post(
+                        f"{self.n8n_url}/api/v1/workflows/{workflow_id}/activate",
+                        headers=headers,
+                    )
+                    activate_response.raise_for_status()
+                    logger.info(f"Activated workflow {workflow_id}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Ensure workflow is active
if not workflow.get("active"):
activate_response = client.patch(
f"{self.n8n_url}/api/v1/workflows/{workflow_id}",
json={"active": True},
headers=headers,
)
activate_response.raise_for_status()
logger.info(f"Activated workflow {workflow_id}")
# Ensure workflow is active
if not workflow.get("active"):
activate_response = client.post(
f"{self.n8n_url}/api/v1/workflows/{workflow_id}/activate",
headers=headers,
)
activate_response.raise_for_status()
logger.info(f"Activated workflow {workflow_id}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@praisonai_tools/n8n/n8n_workflow.py` around lines 152 - 160, Replace the
PATCH call that tries to set {"active": True} with a POST to the dedicated
activate endpoint: use
client.post(f"{self.n8n_url}/api/v1/workflows/{workflow_id}/activate",
headers=headers) (instead of client.patch), call raise_for_status on that
response, and keep the logger.info(f"Activated workflow {workflow_id}") and the
surrounding active-check logic; update the variable currently named
activate_response to hold the POST response and remove any payload/json since
the activate endpoint requires no body.


# Construct webhook URL
# Use /webhook-test/ for test mode, /webhook/ for production
webhook_url = f"{self.n8n_url}/webhook/{path}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the webhook path starts with a forward slash, this string formatting will result in a double slash in the URL (e.g., .../webhook//path). Stripping the leading slash ensures a valid URL is constructed.

Suggested change
webhook_url = f"{self.n8n_url}/webhook/{path}"
webhook_url = f"{self.n8n_url}/webhook/{path.lstrip('/')}"

return webhook_url

except httpx.HTTPStatusError as e:
logger.error(f"n8n API error fetching workflow {workflow_id}: {e.response.status_code} - {e.response.text}")
return {"error": f"HTTP {e.response.status_code}: {e.response.text}"}
Comment on lines +152 to +169
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The except httpx.HTTPStatusError branch logs "API error fetching workflow" for any HTTPStatusError in the whole _get_webhook_url() block, including failures from the activation PATCH. This makes logs misleading when activation fails (e.g., missing permissions). Consider separating the GET and PATCH into their own try/except blocks (or improving the log message) so it’s clear which request failed.

Copilot uses AI. Check for mistakes.
except Exception as e:
logger.error(f"Error fetching workflow {workflow_id}: {e}")
return {"error": str(e)}
Comment on lines +115 to +172
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Timeouts in _get_webhook_url are mis-classified and self.timeout is ignored.

Two related problems in this method:

  1. httpx.TimeoutException is not caught explicitly, so a timeout during the workflow fetch falls into the generic except Exception branch and returns {"error": "Request timed out"}. run()'s own except httpx.TimeoutException handler (which produces the "Workflow webhook execution timed out after {self.timeout} seconds" message) is never reached for fetch-phase timeouts. This also means tests/test_n8n_integration.py::test_n8n_workflow_timeout_error, which asserts "timed out after 5.0 seconds" in result["error"], will fail once the httpx mock is exercised against the real code path (the mocked TimeoutException subclasses Exception).
  2. The client is hardcoded to timeout=30.0, ignoring the self.timeout configured on the tool — an inconsistency with the webhook POST in run() which does respect self.timeout.
🔧 Suggested fix
-        try:
-            with httpx.Client(timeout=30.0) as client:
+        try:
+            with httpx.Client(timeout=self.timeout) as client:
                 # Fetch workflow definition
                 response = client.get(
                     f"{self.n8n_url}/api/v1/workflows/{workflow_id}",
                     headers=headers,
                 )
                 ...
+        except httpx.TimeoutException:
+            logger.error(f"n8n workflow {workflow_id} fetch timed out after {self.timeout}s")
+            return {"error": f"Workflow webhook execution timed out after {self.timeout} seconds"}
         except httpx.HTTPStatusError as e:
             ...
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
import httpx
except ImportError:
return {"error": "httpx not installed"}
waited = 0
while waited < max_wait:
try:
# Prepare headers for API calls
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["X-N8N-API-KEY"] = self.api_key
try:
with httpx.Client(timeout=30.0) as client:
# Fetch workflow definition
response = client.get(
f"{self.n8n_url}/api/v1/executions/{execution_id}",
f"{self.n8n_url}/api/v1/workflows/{workflow_id}",
headers=headers,
)
response.raise_for_status()
workflow = response.json()
execution = response.json()
status = execution.get("status")
# Find webhook trigger node
nodes = workflow.get("nodes", [])
webhook_node = None
for node in nodes:
if node.get("type") == "n8n-nodes-base.webhook":
webhook_node = node
break
if status in ["success", "error", "canceled"]:
return execution
if not webhook_node:
return {"error": "Workflow has no Webhook trigger node. Add a Webhook trigger to enable external execution."}
time.sleep(poll_interval)
waited += poll_interval
# Extract webhook path
parameters = webhook_node.get("parameters", {})
path = parameters.get("path", "")
if not path:
return {"error": "Webhook trigger node has no path configured"}
except Exception as e:
logger.error(f"Error polling execution {execution_id}: {e}")
return {"error": f"Error polling execution: {e}"}
return {"error": f"Execution {execution_id} did not complete within {max_wait} seconds"}
# Ensure workflow is active
if not workflow.get("active"):
activate_response = client.patch(
f"{self.n8n_url}/api/v1/workflows/{workflow_id}",
json={"active": True},
headers=headers,
)
activate_response.raise_for_status()
logger.info(f"Activated workflow {workflow_id}")
# Construct webhook URL
# Use /webhook-test/ for test mode, /webhook/ for production
webhook_url = f"{self.n8n_url}/webhook/{path}"
return webhook_url
except httpx.HTTPStatusError as e:
logger.error(f"n8n API error fetching workflow {workflow_id}: {e.response.status_code} - {e.response.text}")
return {"error": f"HTTP {e.response.status_code}: {e.response.text}"}
except Exception as e:
logger.error(f"Error fetching workflow {workflow_id}: {e}")
return {"error": str(e)}
try:
import httpx
except ImportError:
return {"error": "httpx not installed"}
# Prepare headers for API calls
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["X-N8N-API-KEY"] = self.api_key
try:
with httpx.Client(timeout=self.timeout) as client:
# Fetch workflow definition
response = client.get(
f"{self.n8n_url}/api/v1/workflows/{workflow_id}",
headers=headers,
)
response.raise_for_status()
workflow = response.json()
# Find webhook trigger node
nodes = workflow.get("nodes", [])
webhook_node = None
for node in nodes:
if node.get("type") == "n8n-nodes-base.webhook":
webhook_node = node
break
if not webhook_node:
return {"error": "Workflow has no Webhook trigger node. Add a Webhook trigger to enable external execution."}
# Extract webhook path
parameters = webhook_node.get("parameters", {})
path = parameters.get("path", "")
if not path:
return {"error": "Webhook trigger node has no path configured"}
# Ensure workflow is active
if not workflow.get("active"):
activate_response = client.patch(
f"{self.n8n_url}/api/v1/workflows/{workflow_id}",
json={"active": True},
headers=headers,
)
activate_response.raise_for_status()
logger.info(f"Activated workflow {workflow_id}")
# Construct webhook URL
# Use /webhook-test/ for test mode, /webhook/ for production
webhook_url = f"{self.n8n_url}/webhook/{path}"
return webhook_url
except httpx.TimeoutException:
logger.error(f"n8n workflow {workflow_id} fetch timed out after {self.timeout}s")
return {"error": f"Workflow webhook execution timed out after {self.timeout} seconds"}
except httpx.HTTPStatusError as e:
logger.error(f"n8n API error fetching workflow {workflow_id}: {e.response.status_code} - {e.response.text}")
return {"error": f"HTTP {e.response.status_code}: {e.response.text}"}
except Exception as e:
logger.error(f"Error fetching workflow {workflow_id}: {e}")
return {"error": str(e)}
🧰 Tools
🪛 Ruff (0.15.10)

[warning] 170-170: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@praisonai_tools/n8n/n8n_workflow.py` around lines 115 - 172, In
_get_webhook_url, timeouts are being swallowed and the client uses a hardcoded
30s; change the httpx.Client timeout to use self.timeout and add an explicit
except httpx.TimeoutException handler (alongside the existing
httpx.HTTPStatusError and generic Exception handlers) that logs the timeout and
returns the same message format used in run() (e.g. "Workflow webhook execution
timed out after {self.timeout} seconds"); modify the try/except in the
_get_webhook_url method and reference self.timeout and httpx.TimeoutException to
implement this fix.


def list_workflows(self) -> Dict[str, Any]:
"""List available n8n workflows."""
Expand Down
97 changes: 77 additions & 20 deletions tests/test_n8n_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,40 @@ def test_n8n_workflow_missing_httpx(self):
assert "pip install 'praisonai-tools[n8n]'" in result["error"]

def test_n8n_workflow_successful_execution(self, mock_httpx):
"""Test successful workflow execution."""
"""Test successful workflow execution via webhook."""
from praisonai_tools.n8n import N8nWorkflowTool

# Mock httpx response
mock_response = Mock()
mock_response.json.return_value = {
"executionId": "exec-123",
"status": "running"
# Mock workflow fetch response
mock_workflow_response = Mock()
mock_workflow_response.json.return_value = {
"id": "test-workflow",
"name": "Test Workflow",
"active": True,
"nodes": [
{
"id": "webhook-node",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"parameters": {
"path": "test-webhook",
"httpMethod": "POST"
}
}
]
}
mock_response.raise_for_status.return_value = None
mock_workflow_response.raise_for_status.return_value = None

# Mock webhook execution response
mock_webhook_response = Mock()
mock_webhook_response.json.return_value = {
"result": "success",
"message": "Webhook executed successfully"
}
mock_webhook_response.raise_for_status.return_value = None

mock_client = Mock()
mock_client.post.return_value = mock_response
mock_client.get.return_value = mock_workflow_response
mock_client.post.return_value = mock_webhook_response
mock_httpx.Client.return_value.__enter__.return_value = mock_client

tool = N8nWorkflowTool(api_key="test-key")
Expand All @@ -111,27 +132,33 @@ def test_n8n_workflow_successful_execution(self, mock_httpx):
wait_for_completion=False
)

assert result["executionId"] == "exec-123"
assert result["status"] == "running"
assert result["result"] == "success"
assert result["message"] == "Webhook executed successfully"

# Verify API call
mock_client.post.assert_called_once_with(
"http://localhost:5678/api/v1/workflows/test-workflow/execute",
json={"data": {"message": "Hello"}},
# Verify workflow fetch call
mock_client.get.assert_called_once_with(
"http://localhost:5678/api/v1/workflows/test-workflow",
headers={"Content-Type": "application/json", "X-N8N-API-KEY": "test-key"},
)

# Verify webhook execution call
mock_client.post.assert_called_once_with(
"http://localhost:5678/webhook/test-webhook",
json={"message": "Hello"},
headers={"Content-Type": "application/json"},
)

def test_n8n_workflow_http_error(self, mock_httpx):
"""Test HTTP error handling."""
"""Test HTTP error handling during workflow fetch."""
from praisonai_tools.n8n import N8nWorkflowTool

# Mock HTTP error
# Mock HTTP error during workflow fetch
mock_response = Mock()
mock_response.status_code = 401
mock_response.text = "Unauthorized"

mock_client = Mock()
mock_client.post.side_effect = mock_httpx.HTTPStatusError(
mock_client.get.side_effect = mock_httpx.HTTPStatusError(
"401 Unauthorized", request=Mock(), response=mock_response
)
mock_httpx.Client.return_value.__enter__.return_value = mock_client
Expand All @@ -142,17 +169,47 @@ def test_n8n_workflow_http_error(self, mock_httpx):
assert "HTTP 401: Unauthorized" in result["error"]

def test_n8n_workflow_timeout_error(self, mock_httpx):
"""Test timeout error handling."""
"""Test timeout error handling during workflow fetch."""
from praisonai_tools.n8n import N8nWorkflowTool

mock_client = Mock()
mock_client.post.side_effect = mock_httpx.TimeoutException("Request timed out")
mock_client.get.side_effect = mock_httpx.TimeoutException("Request timed out")
mock_httpx.Client.return_value.__enter__.return_value = mock_client

tool = N8nWorkflowTool(timeout=5.0)
result = tool.run(workflow_id="test-workflow")

assert "timed out after 5.0 seconds" in result["error"]
assert "Request timed out" in result["error"]

def test_n8n_workflow_no_webhook_trigger(self, mock_httpx):
"""Test error handling when workflow has no webhook trigger."""
from praisonai_tools.n8n import N8nWorkflowTool

# Mock workflow fetch response with no webhook trigger
mock_workflow_response = Mock()
mock_workflow_response.json.return_value = {
"id": "test-workflow",
"name": "Test Workflow",
"active": True,
"nodes": [
{
"id": "manual-node",
"name": "Manual Trigger",
"type": "n8n-nodes-base.manualTrigger",
"parameters": {}
}
]
}
mock_workflow_response.raise_for_status.return_value = None

mock_client = Mock()
mock_client.get.return_value = mock_workflow_response
mock_httpx.Client.return_value.__enter__.return_value = mock_client

tool = N8nWorkflowTool(api_key="test-key")
result = tool.run(workflow_id="test-workflow")

assert "Workflow has no Webhook trigger node" in result["error"]

def test_n8n_list_workflows(self, mock_httpx):
"""Test listing workflows."""
Expand Down