Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions examples/genai/n8n/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Flyte-hosted n8n app

In this example, we'll deploy a production-ready n8n app using Flyte.

## Prerequisites

In this example, we'll use a postgres database hosted on Supabase. You'll need to create a Supabase project and get the
database credentials: https://supabase.com

Thne create postgres database secrets in Flyte:

```bash
flyte create secret n8n_postgres_password --value <password>
flyte create secret n8n_encryption_key --value <encryption_key>
```

Install the example requirements:

```bash
uv pip install kubernetes
```

Then deploy the app:

```bash
python n8n_app.py
```

This will deploy two apps and one task environment:
1. `n8n-app` - The main n8n app. This contains a sidecar container that runs the n8n javascript and python task runners.
2. `flyte-n8n-webhook-app` - A webhook app that allows you to trigger Flyte tasks from n8n workflows.
3. `flyte-n8n-webhook-task` - A task environment that contains some toy Flyte task that can be triggered via the webhook.
231 changes: 231 additions & 0 deletions examples/genai/n8n/flyte_webhook_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
"""
Flyte Webhook App for n8n Integration

A FastAPI-based webhook service that allows n8n workflows to trigger Flyte task
runs via HTTP. Uses passthrough authentication so that the caller's credentials
are forwarded to the Flyte control plane when launching tasks.

Usage:
# Deploy
python flyte_webhook_app.py

# n8n HTTP Request node configuration:
# - Method: POST
# - URL: https://<subdomain>.apps.<endpoint>/run-task/{project}/{domain}/{task_name}
# - Headers: Authorization: Bearer <token>, Content-Type: application/json
# - Body (JSON): {"input_key": "input_value", ...}
"""

import logging
import os
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException
from starlette import status

import flyte
import flyte.app
import flyte.errors
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment, FastAPIPassthroughAuthMiddleware

logger = logging.getLogger(__name__)

image = flyte.Image.from_debian_base().with_pip_packages("fastapi", "uvicorn")


# ---------------------------------------------------------------------------
# FastAPI lifespan: initialize Flyte passthrough auth on startup
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI):
PROJECT_NAME_ENV_VAR = "FLYTE_INTERNAL_EXECUTION_PROJECT"
DOMAIN_NAME_ENV_VAR = "FLYTE_INTERNAL_EXECUTION_DOMAIN"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have flyte.current_domain() we should use that i dont want to use these env vars. Can i also add current_project and you can use it?


await flyte.init_passthrough.aio(
project=os.getenv(PROJECT_NAME_ENV_VAR, None),
domain=os.getenv(DOMAIN_NAME_ENV_VAR, None),
)
logger.info("Initialized Flyte passthrough auth")
yield


# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(
title="Flyte n8n Webhook Runner",
description="A webhook service that lets n8n trigger Flyte task runs",
version="1.0.0",
lifespan=lifespan,
)

# Middleware: extract Authorization header and set Flyte auth context per-request
app.add_middleware(FastAPIPassthroughAuthMiddleware, excluded_paths={"/health"})


@app.get("/health")
async def health_check():
"""Health check endpoint (no auth required)."""
return {"status": "healthy"}


@app.get("/me")
async def get_current_user():
"""Verify passthrough auth by fetching the current user from the Flyte control plane."""
try:
user = await remote.User.get.aio()
return {
"name": user.name,
"subject": user.subject,
}
except Exception:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials or unauthorized",
)


@app.post("/run-task/{project}/{domain}/{name}")
async def run_task(
project: str,
domain: str,
name: str,
inputs: dict,
version: str | None = None,
wait: bool = True,
) -> dict:
"""
Trigger a Flyte task run with the caller's credentials.

The task is executed with the permissions of the calling user (passthrough auth).

Path parameters:
project: Flyte project name
domain: Flyte domain (e.g. development, staging, production)
name: Fully-qualified task name (e.g. "my_env.my_task")

Query parameters:
version: Task version (optional — defaults to "latest")

Body (JSON):
inputs: Dictionary of input parameters for the task
"""
logger.info(f"Running task: {project}/{domain}/{name} version={version}")
try:
auto_version = "latest" if version is None else None
tk = remote.Task.get(
project=project,
domain=domain,
name=name,
version=version,
auto_version=auto_version,
)
r = await flyte.run.aio(tk, **inputs)
if wait:
await r.wait.aio()
return (await r.outputs.aio()).named_outputs
return {"url": r.url, "name": r.name}

except flyte.errors.RemoteTaskNotFoundError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task {name} v{version} in {project}/{domain} not found",
)
except flyte.errors.RemoteTaskUsageError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)


# ---------------------------------------------------------------------------
# Task environment: an example task that can be triggered via the webhook
# ---------------------------------------------------------------------------
task_env = flyte.TaskEnvironment(
name="flyte-n8n-webhook-task",
image=image,
resources=flyte.Resources(cpu=1, memory="512Mi"),
)


@task_env.task
async def webhook_task(x: int, y: str) -> dict:
"""Example Flyte task callable via the webhook."""
return {"result": f"{x!s} {y}"}


@task_env.task
async def add_field(data: dict) -> dict:
"""Example Flyte task callable via the webhook."""
data["new_flyte_field"] = "hello from flyte"
return data


# ---------------------------------------------------------------------------
# App environment: the webhook FastAPI service
# ---------------------------------------------------------------------------
flyte_n8n_webhook_app = FastAPIAppEnvironment(
name="flyte-n8n-webhook-app",
app=app,
description="A webhook service that lets n8n trigger Flyte task runs with passthrough auth",
image=image,
resources=flyte.Resources(cpu=2, memory="2Gi"),
requires_auth=True, # Platform handles auth at the gateway
depends_on=[task_env],
scaling=flyte.app.Scaling(replicas=(0, 1)),
port=8080,
)


# ---------------------------------------------------------------------------
# Deploy helper
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import argparse

import httpx

import flyte.remote

parser = argparse.ArgumentParser(description="Deploy the n8n app.")
parser.add_argument("--deploy", action="store_true", help="Deploy the n8n app.")
parser.add_argument("--test", action="store_true", help="Wait for the task to complete")
args = parser.parse_args()

flyte.init_from_config()
if args.deploy:
flyte.deploy(flyte_n8n_webhook_app)

if args.test:
app = flyte.remote.App.get(name="flyte-n8n-webhook-app")
url = app.url
endpoint = app.endpoint
print(f"Deployed webhook app: {url}")
print(f"Webhook is served on {endpoint}. you can check logs, status etc {endpoint}")

# --- Quick smoke test ---
token = os.getenv("FLYTE_API_KEY")

headers = {"Authorization": f"Bearer {token}"}

# Test /run-task (triggers the example add_field)
data = {"data": {"x": 42, "y": "hello from n8n"}}
route = "/run-task/flytesnacks/development/flyte-n8n-webhook-task.add_field"
full_endpoint = endpoint.rstrip("/") + route
print(f"POST {full_endpoint}")

resp = httpx.post(
full_endpoint,
json=data,
headers=headers,
)
if resp.is_success:
print(f"Webhook response: {resp.text}")
else:
print(f"HTTP Error: {resp.status_code} - {resp.text}")
55 changes: 55 additions & 0 deletions examples/genai/n8n/n8n-task-runners.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"task-runners": [
{
"runner-type": "javascript",
"workdir": "/home/runner",
"command": "/usr/local/bin/node",
"args": [
"--disallow-code-generation-from-strings",
"--disable-proto=delete",
"/opt/runners/task-runner-javascript/dist/start.js"
],
"health-check-server-port": "5681",
"allowed-env": [
"PATH",
"GENERIC_TIMEZONE",
"NODE_OPTIONS",
"N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT",
"N8N_RUNNERS_TASK_TIMEOUT",
"N8N_RUNNERS_MAX_CONCURRENCY",
"N8N_SENTRY_DSN",
"N8N_VERSION",
"ENVIRONMENT",
"DEPLOYMENT_NAME"
],
"env-overrides": {
"NODE_FUNCTION_ALLOW_BUILTIN": "crypto",
"NODE_FUNCTION_ALLOW_EXTERNAL": "moment,uuid",
"N8N_RUNNERS_HEALTH_CHECK_SERVER_HOST": "0.0.0.0"
}
},
{
"runner-type": "python",
"workdir": "/home/runner",
"command": "/opt/runners/task-runner-python/.venv/bin/python",
"args": ["-m", "src.main"],
"health-check-server-port": "5682",
"allowed-env": [
"PATH",
"N8N_RUNNERS_LAUNCHER_LOG_LEVEL",
"N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT",
"N8N_RUNNERS_TASK_TIMEOUT",
"N8N_RUNNERS_MAX_CONCURRENCY",
"N8N_SENTRY_DSN",
"N8N_VERSION",
"ENVIRONMENT",
"DEPLOYMENT_NAME"
],
"env-overrides": {
"PYTHONPATH": "/opt/runners/task-runner-python",
"N8N_RUNNERS_STDLIB_ALLOW": "json",
"N8N_RUNNERS_EXTERNAL_ALLOW": "numpy,pandas"
}
}
]
}
Loading
Loading