diff --git a/examples/genai/n8n/README.md b/examples/genai/n8n/README.md new file mode 100644 index 000000000..8d49f32e7 --- /dev/null +++ b/examples/genai/n8n/README.md @@ -0,0 +1,32 @@ +# Flyte-hosted n8n app + +In this example, we'll deploy a production-ready n8n app using Flyte. + +## Prerequisites + +In this example, we'll use a postgres database hosted on Supabase. You'll need to create a Supabase project and get the +database credentials: https://supabase.com + +Thne create postgres database secrets in Flyte: + +```bash +flyte create secret n8n_postgres_password --value +flyte create secret n8n_encryption_key --value +``` + +Install the example requirements: + +```bash +uv pip install kubernetes +``` + +Then deploy the app: + +```bash +python n8n_app.py +``` + +This will deploy two apps and one task environment: +1. `n8n-app` - The main n8n app. This contains a sidecar container that runs the n8n javascript and python task runners. +2. `flyte-n8n-webhook-app` - A webhook app that allows you to trigger Flyte tasks from n8n workflows. +3. `flyte-n8n-webhook-task` - A task environment that contains some toy Flyte task that can be triggered via the webhook. diff --git a/examples/genai/n8n/flyte_webhook_app.py b/examples/genai/n8n/flyte_webhook_app.py new file mode 100644 index 000000000..dd708cbc7 --- /dev/null +++ b/examples/genai/n8n/flyte_webhook_app.py @@ -0,0 +1,231 @@ +""" +Flyte Webhook App for n8n Integration + +A FastAPI-based webhook service that allows n8n workflows to trigger Flyte task +runs via HTTP. Uses passthrough authentication so that the caller's credentials +are forwarded to the Flyte control plane when launching tasks. + +Usage: + # Deploy + python flyte_webhook_app.py + + # n8n HTTP Request node configuration: + # - Method: POST + # - URL: https://.apps./run-task/{project}/{domain}/{task_name} + # - Headers: Authorization: Bearer , Content-Type: application/json + # - Body (JSON): {"input_key": "input_value", ...} +""" + +import logging +import os +from contextlib import asynccontextmanager + +from fastapi import FastAPI, HTTPException +from starlette import status + +import flyte +import flyte.app +import flyte.errors +import flyte.remote as remote +from flyte.app.extras import FastAPIAppEnvironment, FastAPIPassthroughAuthMiddleware + +logger = logging.getLogger(__name__) + +image = flyte.Image.from_debian_base().with_pip_packages("fastapi", "uvicorn") + + +# --------------------------------------------------------------------------- +# FastAPI lifespan: initialize Flyte passthrough auth on startup +# --------------------------------------------------------------------------- +@asynccontextmanager +async def lifespan(app: FastAPI): + PROJECT_NAME_ENV_VAR = "FLYTE_INTERNAL_EXECUTION_PROJECT" + DOMAIN_NAME_ENV_VAR = "FLYTE_INTERNAL_EXECUTION_DOMAIN" + + await flyte.init_passthrough.aio( + project=os.getenv(PROJECT_NAME_ENV_VAR, None), + domain=os.getenv(DOMAIN_NAME_ENV_VAR, None), + ) + logger.info("Initialized Flyte passthrough auth") + yield + + +# --------------------------------------------------------------------------- +# FastAPI app +# --------------------------------------------------------------------------- +app = FastAPI( + title="Flyte n8n Webhook Runner", + description="A webhook service that lets n8n trigger Flyte task runs", + version="1.0.0", + lifespan=lifespan, +) + +# Middleware: extract Authorization header and set Flyte auth context per-request +app.add_middleware(FastAPIPassthroughAuthMiddleware, excluded_paths={"/health"}) + + +@app.get("/health") +async def health_check(): + """Health check endpoint (no auth required).""" + return {"status": "healthy"} + + +@app.get("/me") +async def get_current_user(): + """Verify passthrough auth by fetching the current user from the Flyte control plane.""" + try: + user = await remote.User.get.aio() + return { + "name": user.name, + "subject": user.subject, + } + except Exception: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid credentials or unauthorized", + ) + + +@app.post("/run-task/{project}/{domain}/{name}") +async def run_task( + project: str, + domain: str, + name: str, + inputs: dict, + version: str | None = None, + wait: bool = True, +) -> dict: + """ + Trigger a Flyte task run with the caller's credentials. + + The task is executed with the permissions of the calling user (passthrough auth). + + Path parameters: + project: Flyte project name + domain: Flyte domain (e.g. development, staging, production) + name: Fully-qualified task name (e.g. "my_env.my_task") + + Query parameters: + version: Task version (optional — defaults to "latest") + + Body (JSON): + inputs: Dictionary of input parameters for the task + """ + logger.info(f"Running task: {project}/{domain}/{name} version={version}") + try: + auto_version = "latest" if version is None else None + tk = remote.Task.get( + project=project, + domain=domain, + name=name, + version=version, + auto_version=auto_version, + ) + r = await flyte.run.aio(tk, **inputs) + if wait: + await r.wait.aio() + return (await r.outputs.aio()).named_outputs + return {"url": r.url, "name": r.name} + + except flyte.errors.RemoteTaskNotFoundError: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Task {name} v{version} in {project}/{domain} not found", + ) + except flyte.errors.RemoteTaskUsageError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e), + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=str(e), + ) + + +# --------------------------------------------------------------------------- +# Task environment: an example task that can be triggered via the webhook +# --------------------------------------------------------------------------- +task_env = flyte.TaskEnvironment( + name="flyte-n8n-webhook-task", + image=image, + resources=flyte.Resources(cpu=1, memory="512Mi"), +) + + +@task_env.task +async def webhook_task(x: int, y: str) -> dict: + """Example Flyte task callable via the webhook.""" + return {"result": f"{x!s} {y}"} + + +@task_env.task +async def add_field(data: dict) -> dict: + """Example Flyte task callable via the webhook.""" + data["new_flyte_field"] = "hello from flyte" + return data + + +# --------------------------------------------------------------------------- +# App environment: the webhook FastAPI service +# --------------------------------------------------------------------------- +flyte_n8n_webhook_app = FastAPIAppEnvironment( + name="flyte-n8n-webhook-app", + app=app, + description="A webhook service that lets n8n trigger Flyte task runs with passthrough auth", + image=image, + resources=flyte.Resources(cpu=2, memory="2Gi"), + requires_auth=True, # Platform handles auth at the gateway + depends_on=[task_env], + scaling=flyte.app.Scaling(replicas=(0, 1)), + port=8080, +) + + +# --------------------------------------------------------------------------- +# Deploy helper +# --------------------------------------------------------------------------- +if __name__ == "__main__": + import argparse + + import httpx + + import flyte.remote + + parser = argparse.ArgumentParser(description="Deploy the n8n app.") + parser.add_argument("--deploy", action="store_true", help="Deploy the n8n app.") + parser.add_argument("--test", action="store_true", help="Wait for the task to complete") + args = parser.parse_args() + + flyte.init_from_config() + if args.deploy: + flyte.deploy(flyte_n8n_webhook_app) + + if args.test: + app = flyte.remote.App.get(name="flyte-n8n-webhook-app") + url = app.url + endpoint = app.endpoint + print(f"Deployed webhook app: {url}") + print(f"Webhook is served on {endpoint}. you can check logs, status etc {endpoint}") + + # --- Quick smoke test --- + token = os.getenv("FLYTE_API_KEY") + + headers = {"Authorization": f"Bearer {token}"} + + # Test /run-task (triggers the example add_field) + data = {"data": {"x": 42, "y": "hello from n8n"}} + route = "/run-task/flytesnacks/development/flyte-n8n-webhook-task.add_field" + full_endpoint = endpoint.rstrip("/") + route + print(f"POST {full_endpoint}") + + resp = httpx.post( + full_endpoint, + json=data, + headers=headers, + ) + if resp.is_success: + print(f"Webhook response: {resp.text}") + else: + print(f"HTTP Error: {resp.status_code} - {resp.text}") diff --git a/examples/genai/n8n/n8n-task-runners.json b/examples/genai/n8n/n8n-task-runners.json new file mode 100644 index 000000000..f648eb06c --- /dev/null +++ b/examples/genai/n8n/n8n-task-runners.json @@ -0,0 +1,55 @@ +{ + "task-runners": [ + { + "runner-type": "javascript", + "workdir": "/home/runner", + "command": "/usr/local/bin/node", + "args": [ + "--disallow-code-generation-from-strings", + "--disable-proto=delete", + "/opt/runners/task-runner-javascript/dist/start.js" + ], + "health-check-server-port": "5681", + "allowed-env": [ + "PATH", + "GENERIC_TIMEZONE", + "NODE_OPTIONS", + "N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT", + "N8N_RUNNERS_TASK_TIMEOUT", + "N8N_RUNNERS_MAX_CONCURRENCY", + "N8N_SENTRY_DSN", + "N8N_VERSION", + "ENVIRONMENT", + "DEPLOYMENT_NAME" + ], + "env-overrides": { + "NODE_FUNCTION_ALLOW_BUILTIN": "crypto", + "NODE_FUNCTION_ALLOW_EXTERNAL": "moment,uuid", + "N8N_RUNNERS_HEALTH_CHECK_SERVER_HOST": "0.0.0.0" + } + }, + { + "runner-type": "python", + "workdir": "/home/runner", + "command": "/opt/runners/task-runner-python/.venv/bin/python", + "args": ["-m", "src.main"], + "health-check-server-port": "5682", + "allowed-env": [ + "PATH", + "N8N_RUNNERS_LAUNCHER_LOG_LEVEL", + "N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT", + "N8N_RUNNERS_TASK_TIMEOUT", + "N8N_RUNNERS_MAX_CONCURRENCY", + "N8N_SENTRY_DSN", + "N8N_VERSION", + "ENVIRONMENT", + "DEPLOYMENT_NAME" + ], + "env-overrides": { + "PYTHONPATH": "/opt/runners/task-runner-python", + "N8N_RUNNERS_STDLIB_ALLOW": "json", + "N8N_RUNNERS_EXTERNAL_ALLOW": "numpy,pandas" + } + } + ] +} \ No newline at end of file diff --git a/examples/genai/n8n/n8n_app.py b/examples/genai/n8n/n8n_app.py new file mode 100644 index 000000000..e0fc0f85e --- /dev/null +++ b/examples/genai/n8n/n8n_app.py @@ -0,0 +1,116 @@ +import pathlib + +import kubernetes +from flyte_webhook_app import flyte_n8n_webhook_app + +import flyte +import flyte.app + + +def n8n_pod_template(version: str, runner_auth_token: str, runner_image_uri: str | None = None) -> flyte.PodTemplate: + return flyte.PodTemplate( + primary_container_name="app", + pod_spec=kubernetes.client.V1PodSpec( + containers=[ + # Primary container: n8n main server + kubernetes.client.V1Container(name="app", image=f"n8nio/n8n:{version}"), + # Sidecar container: task runners + kubernetes.client.V1Container( + name="task-runners", + image=runner_image_uri or f"n8nio/runners:{version}", + env=[ + # Connect to n8n broker via localhost since they're in the same pod + kubernetes.client.V1EnvVar(name="N8N_RUNNERS_TASK_BROKER_URI", value="http://127.0.0.1:5679"), + kubernetes.client.V1EnvVar(name="N8N_RUNNERS_AUTH_TOKEN", value=runner_auth_token), + ], + ), + ], + ), + ) + + +n8n_app = flyte.app.AppEnvironment( + name="n8n-app", + resources=flyte.Resources(cpu=4, memory="4Gi"), + scaling=flyte.app.Scaling(replicas=(1, 2)), + port=5678, + command=["n8n", "start"], + secrets=[ + flyte.Secret("n8n_postgres_password", as_env_var="DB_POSTGRESDB_PASSWORD"), + flyte.Secret("n8n_encryption_key", as_env_var="N8N_ENCRYPTION_KEY"), + ], + requires_auth=False, + env_vars={ + "N8N_RUNNERS_ENABLED": "true", + "N8N_RUNNERS_MODE": "external", + "N8N_RUNNERS_BROKER_LISTEN_ADDRESS": "0.0.0.0", + "N8N_NATIVE_PYTHON_RUNNER": "true", + # db config: https://docs.n8n.io/hosting/installation/docker/#using-with-postgresql + "DB_TYPE": "postgresdb", + "DB_POSTGRESDB_HOST": "aws-0-us-west-2.pooler.supabase.com", + "DB_POSTGRESDB_DATABASE": "postgres", + "DB_POSTGRESDB_USER": "postgres.qcfcidgymclxvslgphyb", + "DB_POSTGRESDB_PORT": "6543", + "DB_POSTGRESDB_SCHEMA": "public", + }, + depends_on=[flyte_n8n_webhook_app], +) + + +def build_runner_image() -> flyte.Image: + flyte.init_from_config(image_builder="local") + + image = flyte.Image.from_dockerfile( + pathlib.Path(__file__).parent / "task_runner.dockerfile", + registry="ghcr.io/flyteorg", + name="n8n-task-runner-image", + ) + return flyte.build(image, wait=True) + + +def get_webhook_url(subdomain: str) -> str: + cfg = get_init_config() + return f"https://{subdomain}.apps.{cfg.client.endpoint.replace('dns:///', '').rstrip('/')}/" + + +if __name__ == "__main__": + import argparse + import random + import string + + from flyte._initialize import get_init_config + + parser = argparse.ArgumentParser(description="Deploy the n8n app.") + parser.add_argument("--subdomain", type=str, default="n8n-app", help="The subdomain to use for the n8n app.") + + n8n_version = "2.6.3" + # Create a random 32-character alphanumeric string for the n8n runners auth token. it's okay + # to regenerate this every time the app is deployed, since only the main n8n app and the runner + # sidecar container use this token. + n8n_runners_auth_token = "".join(random.choices(string.ascii_letters + string.digits, k=32)) + + image = build_runner_image() + + flyte.init_from_config(image_builder="remote") + pod_template = n8n_pod_template( + version=n8n_version, + runner_auth_token=n8n_runners_auth_token, + runner_image_uri=image.uri, + ) + + subdomain = "n8n-app" + webhook_url = get_webhook_url(subdomain) + + app = flyte.serve( + n8n_app.clone_with( + name="n8n-app-with-runners", + pod_template=pod_template, + domain=flyte.app.Domain(subdomain=subdomain), + env_vars=n8n_app.env_vars + | { + "WEBHOOK_URL": webhook_url, + "N8N_RUNNERS_AUTH_TOKEN": n8n_runners_auth_token, + }, + ) + ) + print(app.url) diff --git a/examples/genai/n8n/task_runner.dockerfile b/examples/genai/n8n/task_runner.dockerfile new file mode 100644 index 000000000..6ea5961e1 --- /dev/null +++ b/examples/genai/n8n/task_runner.dockerfile @@ -0,0 +1,6 @@ +FROM n8nio/runners:2.6.3 +USER root +COPY n8n-task-runners.json /etc/n8n-task-runners.json +RUN cd /opt/runners/task-runner-javascript && pnpm add moment uuid +RUN cd /opt/runners/task-runner-python && uv pip install numpy pandas +USER runner diff --git a/examples/genai/n8n/task_runner_image.py b/examples/genai/n8n/task_runner_image.py new file mode 100644 index 000000000..3c390751a --- /dev/null +++ b/examples/genai/n8n/task_runner_image.py @@ -0,0 +1,18 @@ +import pathlib + +import flyte + + +def build_runner_image() -> flyte.Image: + image = flyte.Image.from_dockerfile( + pathlib.Path(__file__).parent / "task_runner.dockerfile", + registry="ghcr.io/flyteorg", + name="n8n-task-runner-image", + ) + return image + + +if __name__ == "__main__": + flyte.init_from_config(image_builder="local") + image = flyte.build(build_runner_image(), wait=True) + print(image.uri) diff --git a/examples/genai/vllm/vllm_app.py b/examples/genai/vllm/vllm_app.py index c84d1aa9e..e63b5bd49 100644 --- a/examples/genai/vllm/vllm_app.py +++ b/examples/genai/vllm/vllm_app.py @@ -49,7 +49,7 @@ name="qwen3-0-6b-vllm", model_hf_path="Qwen/Qwen3-0.6B", model_id="qwen3-0.6b", - resources=flyte.Resources(cpu="4", memory="16Gi", gpu="V100:1", disk="10Gi"), + resources=flyte.Resources(cpu="4", memory="16Gi", gpu="L40s:1", disk="10Gi"), image=( flyte.Image.from_debian_base( name="vllm-app-image", @@ -57,6 +57,7 @@ ) .with_pip_packages("flashinfer-python", "flashinfer-cubin") .with_pip_packages("flashinfer-jit-cache", index_url="https://flashinfer.ai/whl/cu129") + .with_pip_packages("vllm==0.11.0", "transformers==4.57.6") .with_pip_packages("flyteplugins-vllm", pre=True) ), stream_model=True, # Stream model directly from blob store to GPU diff --git a/src/flyte/app/_app_environment.py b/src/flyte/app/_app_environment.py index 6c32749a7..106ba96b4 100644 --- a/src/flyte/app/_app_environment.py +++ b/src/flyte/app/_app_environment.py @@ -319,6 +319,7 @@ def clone_with( include = kwargs.pop("include", None) parameters = kwargs.pop("parameters", None) cluster_pool = kwargs.pop("cluster_pool", None) + pod_template = kwargs.pop("pod_template", None) if kwargs: raise TypeError(f"Unexpected keyword arguments: {list(kwargs.keys())}") @@ -327,6 +328,8 @@ def clone_with( kwargs["name"] = name if image is not None: kwargs["image"] = image + if pod_template is not None: + kwargs["pod_template"] = pod_template if resources is not None: kwargs["resources"] = resources if env_vars is not None: