Skip to content

Commit 48139f4

Browse files
cosmicBboyjeevb
authored andcommitted
add n8n example (#591)
Production-grade example of serving a standalone n8n app on Union apps. - Uses postgres (via supabase) for data persistence - Uses podtemplate sidecar to implement javascript/python runners - Uses flyte webhook app to trigger flyte tasks from n8n workflows --------- Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> Signed-off-by: Jeev B <jeevb@users.noreply.github.com>
1 parent b58d585 commit 48139f4

File tree

8 files changed

+463
-1
lines changed

8 files changed

+463
-1
lines changed

examples/genai/n8n/README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Flyte-hosted n8n app
2+
3+
In this example, we'll deploy a production-ready n8n app using Flyte.
4+
5+
## Prerequisites
6+
7+
In this example, we'll use a postgres database hosted on Supabase. You'll need to create a Supabase project and get the
8+
database credentials: https://supabase.com
9+
10+
Thne create postgres database secrets in Flyte:
11+
12+
```bash
13+
flyte create secret n8n_postgres_password --value <password>
14+
flyte create secret n8n_encryption_key --value <encryption_key>
15+
```
16+
17+
Install the example requirements:
18+
19+
```bash
20+
uv pip install kubernetes
21+
```
22+
23+
Then deploy the app:
24+
25+
```bash
26+
python n8n_app.py
27+
```
28+
29+
This will deploy two apps and one task environment:
30+
1. `n8n-app` - The main n8n app. This contains a sidecar container that runs the n8n javascript and python task runners.
31+
2. `flyte-n8n-webhook-app` - A webhook app that allows you to trigger Flyte tasks from n8n workflows.
32+
3. `flyte-n8n-webhook-task` - A task environment that contains some toy Flyte task that can be triggered via the webhook.
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
"""
2+
Flyte Webhook App for n8n Integration
3+
4+
A FastAPI-based webhook service that allows n8n workflows to trigger Flyte task
5+
runs via HTTP. Uses passthrough authentication so that the caller's credentials
6+
are forwarded to the Flyte control plane when launching tasks.
7+
8+
Usage:
9+
# Deploy
10+
python flyte_webhook_app.py
11+
12+
# n8n HTTP Request node configuration:
13+
# - Method: POST
14+
# - URL: https://<subdomain>.apps.<endpoint>/run-task/{project}/{domain}/{task_name}
15+
# - Headers: Authorization: Bearer <token>, Content-Type: application/json
16+
# - Body (JSON): {"input_key": "input_value", ...}
17+
"""
18+
19+
import logging
20+
import os
21+
from contextlib import asynccontextmanager
22+
23+
from fastapi import FastAPI, HTTPException
24+
from starlette import status
25+
26+
import flyte
27+
import flyte.app
28+
import flyte.errors
29+
import flyte.remote as remote
30+
from flyte.app.extras import FastAPIAppEnvironment, FastAPIPassthroughAuthMiddleware
31+
32+
logger = logging.getLogger(__name__)
33+
34+
image = flyte.Image.from_debian_base().with_pip_packages("fastapi", "uvicorn")
35+
36+
37+
# ---------------------------------------------------------------------------
38+
# FastAPI lifespan: initialize Flyte passthrough auth on startup
39+
# ---------------------------------------------------------------------------
40+
@asynccontextmanager
41+
async def lifespan(app: FastAPI):
42+
PROJECT_NAME_ENV_VAR = "FLYTE_INTERNAL_EXECUTION_PROJECT"
43+
DOMAIN_NAME_ENV_VAR = "FLYTE_INTERNAL_EXECUTION_DOMAIN"
44+
45+
await flyte.init_passthrough.aio(
46+
project=os.getenv(PROJECT_NAME_ENV_VAR, None),
47+
domain=os.getenv(DOMAIN_NAME_ENV_VAR, None),
48+
)
49+
logger.info("Initialized Flyte passthrough auth")
50+
yield
51+
52+
53+
# ---------------------------------------------------------------------------
54+
# FastAPI app
55+
# ---------------------------------------------------------------------------
56+
app = FastAPI(
57+
title="Flyte n8n Webhook Runner",
58+
description="A webhook service that lets n8n trigger Flyte task runs",
59+
version="1.0.0",
60+
lifespan=lifespan,
61+
)
62+
63+
# Middleware: extract Authorization header and set Flyte auth context per-request
64+
app.add_middleware(FastAPIPassthroughAuthMiddleware, excluded_paths={"/health"})
65+
66+
67+
@app.get("/health")
68+
async def health_check():
69+
"""Health check endpoint (no auth required)."""
70+
return {"status": "healthy"}
71+
72+
73+
@app.get("/me")
74+
async def get_current_user():
75+
"""Verify passthrough auth by fetching the current user from the Flyte control plane."""
76+
try:
77+
user = await remote.User.get.aio()
78+
return {
79+
"name": user.name,
80+
"subject": user.subject,
81+
}
82+
except Exception:
83+
raise HTTPException(
84+
status_code=status.HTTP_401_UNAUTHORIZED,
85+
detail="Invalid credentials or unauthorized",
86+
)
87+
88+
89+
@app.post("/run-task/{project}/{domain}/{name}")
90+
async def run_task(
91+
project: str,
92+
domain: str,
93+
name: str,
94+
inputs: dict,
95+
version: str | None = None,
96+
wait: bool = True,
97+
) -> dict:
98+
"""
99+
Trigger a Flyte task run with the caller's credentials.
100+
101+
The task is executed with the permissions of the calling user (passthrough auth).
102+
103+
Path parameters:
104+
project: Flyte project name
105+
domain: Flyte domain (e.g. development, staging, production)
106+
name: Fully-qualified task name (e.g. "my_env.my_task")
107+
108+
Query parameters:
109+
version: Task version (optional — defaults to "latest")
110+
111+
Body (JSON):
112+
inputs: Dictionary of input parameters for the task
113+
"""
114+
logger.info(f"Running task: {project}/{domain}/{name} version={version}")
115+
try:
116+
auto_version = "latest" if version is None else None
117+
tk = remote.Task.get(
118+
project=project,
119+
domain=domain,
120+
name=name,
121+
version=version,
122+
auto_version=auto_version,
123+
)
124+
r = await flyte.run.aio(tk, **inputs)
125+
if wait:
126+
await r.wait.aio()
127+
return (await r.outputs.aio()).named_outputs
128+
return {"url": r.url, "name": r.name}
129+
130+
except flyte.errors.RemoteTaskNotFoundError:
131+
raise HTTPException(
132+
status_code=status.HTTP_404_NOT_FOUND,
133+
detail=f"Task {name} v{version} in {project}/{domain} not found",
134+
)
135+
except flyte.errors.RemoteTaskUsageError as e:
136+
raise HTTPException(
137+
status_code=status.HTTP_400_BAD_REQUEST,
138+
detail=str(e),
139+
)
140+
except Exception as e:
141+
raise HTTPException(
142+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
143+
detail=str(e),
144+
)
145+
146+
147+
# ---------------------------------------------------------------------------
148+
# Task environment: an example task that can be triggered via the webhook
149+
# ---------------------------------------------------------------------------
150+
task_env = flyte.TaskEnvironment(
151+
name="flyte-n8n-webhook-task",
152+
image=image,
153+
resources=flyte.Resources(cpu=1, memory="512Mi"),
154+
)
155+
156+
157+
@task_env.task
158+
async def webhook_task(x: int, y: str) -> dict:
159+
"""Example Flyte task callable via the webhook."""
160+
return {"result": f"{x!s} {y}"}
161+
162+
163+
@task_env.task
164+
async def add_field(data: dict) -> dict:
165+
"""Example Flyte task callable via the webhook."""
166+
data["new_flyte_field"] = "hello from flyte"
167+
return data
168+
169+
170+
# ---------------------------------------------------------------------------
171+
# App environment: the webhook FastAPI service
172+
# ---------------------------------------------------------------------------
173+
flyte_n8n_webhook_app = FastAPIAppEnvironment(
174+
name="flyte-n8n-webhook-app",
175+
app=app,
176+
description="A webhook service that lets n8n trigger Flyte task runs with passthrough auth",
177+
image=image,
178+
resources=flyte.Resources(cpu=2, memory="2Gi"),
179+
requires_auth=True, # Platform handles auth at the gateway
180+
depends_on=[task_env],
181+
scaling=flyte.app.Scaling(replicas=(0, 1)),
182+
port=8080,
183+
)
184+
185+
186+
# ---------------------------------------------------------------------------
187+
# Deploy helper
188+
# ---------------------------------------------------------------------------
189+
if __name__ == "__main__":
190+
import argparse
191+
192+
import httpx
193+
194+
import flyte.remote
195+
196+
parser = argparse.ArgumentParser(description="Deploy the n8n app.")
197+
parser.add_argument("--deploy", action="store_true", help="Deploy the n8n app.")
198+
parser.add_argument("--test", action="store_true", help="Wait for the task to complete")
199+
args = parser.parse_args()
200+
201+
flyte.init_from_config()
202+
if args.deploy:
203+
flyte.deploy(flyte_n8n_webhook_app)
204+
205+
if args.test:
206+
app = flyte.remote.App.get(name="flyte-n8n-webhook-app")
207+
url = app.url
208+
endpoint = app.endpoint
209+
print(f"Deployed webhook app: {url}")
210+
print(f"Webhook is served on {endpoint}. you can check logs, status etc {endpoint}")
211+
212+
# --- Quick smoke test ---
213+
token = os.getenv("FLYTE_API_KEY")
214+
215+
headers = {"Authorization": f"Bearer {token}"}
216+
217+
# Test /run-task (triggers the example add_field)
218+
data = {"data": {"x": 42, "y": "hello from n8n"}}
219+
route = "/run-task/flytesnacks/development/flyte-n8n-webhook-task.add_field"
220+
full_endpoint = endpoint.rstrip("/") + route
221+
print(f"POST {full_endpoint}")
222+
223+
resp = httpx.post(
224+
full_endpoint,
225+
json=data,
226+
headers=headers,
227+
)
228+
if resp.is_success:
229+
print(f"Webhook response: {resp.text}")
230+
else:
231+
print(f"HTTP Error: {resp.status_code} - {resp.text}")
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
{
2+
"task-runners": [
3+
{
4+
"runner-type": "javascript",
5+
"workdir": "/home/runner",
6+
"command": "/usr/local/bin/node",
7+
"args": [
8+
"--disallow-code-generation-from-strings",
9+
"--disable-proto=delete",
10+
"/opt/runners/task-runner-javascript/dist/start.js"
11+
],
12+
"health-check-server-port": "5681",
13+
"allowed-env": [
14+
"PATH",
15+
"GENERIC_TIMEZONE",
16+
"NODE_OPTIONS",
17+
"N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT",
18+
"N8N_RUNNERS_TASK_TIMEOUT",
19+
"N8N_RUNNERS_MAX_CONCURRENCY",
20+
"N8N_SENTRY_DSN",
21+
"N8N_VERSION",
22+
"ENVIRONMENT",
23+
"DEPLOYMENT_NAME"
24+
],
25+
"env-overrides": {
26+
"NODE_FUNCTION_ALLOW_BUILTIN": "crypto",
27+
"NODE_FUNCTION_ALLOW_EXTERNAL": "moment,uuid",
28+
"N8N_RUNNERS_HEALTH_CHECK_SERVER_HOST": "0.0.0.0"
29+
}
30+
},
31+
{
32+
"runner-type": "python",
33+
"workdir": "/home/runner",
34+
"command": "/opt/runners/task-runner-python/.venv/bin/python",
35+
"args": ["-m", "src.main"],
36+
"health-check-server-port": "5682",
37+
"allowed-env": [
38+
"PATH",
39+
"N8N_RUNNERS_LAUNCHER_LOG_LEVEL",
40+
"N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT",
41+
"N8N_RUNNERS_TASK_TIMEOUT",
42+
"N8N_RUNNERS_MAX_CONCURRENCY",
43+
"N8N_SENTRY_DSN",
44+
"N8N_VERSION",
45+
"ENVIRONMENT",
46+
"DEPLOYMENT_NAME"
47+
],
48+
"env-overrides": {
49+
"PYTHONPATH": "/opt/runners/task-runner-python",
50+
"N8N_RUNNERS_STDLIB_ALLOW": "json",
51+
"N8N_RUNNERS_EXTERNAL_ALLOW": "numpy,pandas"
52+
}
53+
}
54+
]
55+
}

0 commit comments

Comments
 (0)