Skip to content

Commit 410adcf

Browse files
authored
Support local app serving (#628)
# Summary This PR implements local app serving for AppEnvironment, enabling developers to serve Flyte apps on localhost for rapid local development and testing. It introduces a symmetry with the existing `flyte.with_runcontext(mode="local") ` pattern by adding `flyte.with_servecontext(mode="local")` for apps. Key changes: - New `_LocalApp` class and local serving infrastructure in `_serve.py` (server-decorator, command/subprocess, and FastAPI-based apps) - `AppEnvironment.endpoint` now transparently resolves to localhost when the app is served locally `ephemeral_ctx` / `ephemeral_ctx_sync` context managers on both _LocalApp and remote App for scoped activate/deactivate lifecycle - `_LocalApp` `activate` and `deactivate` methods mirror the methods in remote `App` object. - `activate`: if the server is not running locally, starts it, and optionally waits for the server to be active. - `deactivate`: stop the server locally - `--local` flag on the flyte serve CLI command - Signal handler and atexit cleanup to ensure child processes are torn down on `Ctrl-C` or process exit - 6 example scripts and comprehensive unit tests --------- Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>
1 parent a70a78c commit 410adcf

File tree

13 files changed

+1926
-38
lines changed

13 files changed

+1926
-38
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
"""Local serving example where one FastAPI app calls another FastAPI app.
2+
3+
This example demonstrates how to serve two FastAPI apps locally and have one
4+
app proxy requests to the other. Both apps are started via
5+
``flyte.with_servecontext(mode="local")`` and communicate over HTTP.
6+
7+
Usage (SDK):
8+
python examples/apps/local_app_calling_app.py
9+
"""
10+
11+
import httpx
12+
from fastapi import FastAPI
13+
14+
import flyte
15+
from flyte.app.extras import FastAPIAppEnvironment
16+
17+
image = flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages("fastapi", "uvicorn", "httpx")
18+
19+
# ---------------------------------------------------------------------------
20+
# App 1 - a simple "square" service
21+
# ---------------------------------------------------------------------------
22+
23+
app1 = FastAPI(
24+
title="Square Service",
25+
description="A FastAPI app that squares a number",
26+
version="1.0.0",
27+
)
28+
29+
app1_env = FastAPIAppEnvironment(
30+
name="local-square-service",
31+
app=app1,
32+
description="Squares a number",
33+
image=image,
34+
resources=flyte.Resources(cpu=1, memory="512Mi"),
35+
port=8091,
36+
requires_auth=False,
37+
)
38+
39+
40+
@app1.get("/")
41+
async def square(x: int) -> dict[str, int]:
42+
"""Return x squared."""
43+
return {"result": x * x}
44+
45+
46+
@app1.get("/health")
47+
async def app1_health() -> dict[str, str]:
48+
return {"status": "healthy"}
49+
50+
51+
# ---------------------------------------------------------------------------
52+
# App 2 - proxies to App 1 and adds its own logic
53+
# ---------------------------------------------------------------------------
54+
55+
app2 = FastAPI(
56+
title="Square-Plus-One Service",
57+
description="A FastAPI app that calls the square service and adds one",
58+
version="1.0.0",
59+
)
60+
61+
app2_env = FastAPIAppEnvironment(
62+
name="local-square-plus-one",
63+
app=app2,
64+
description="Calls the square service and adds one",
65+
image=image,
66+
resources=flyte.Resources(cpu=1, memory="512Mi"),
67+
port=8092,
68+
requires_auth=False,
69+
depends_on=[app1_env],
70+
)
71+
72+
73+
@app2.get("/")
74+
async def square_plus_one(x: int) -> dict[str, int]:
75+
"""Call app1 to square x, then add one."""
76+
async with httpx.AsyncClient() as client:
77+
response = await client.get(app1_env.endpoint, params={"x": x})
78+
response.raise_for_status()
79+
squared = response.json()["result"]
80+
return {"result": squared + 1}
81+
82+
83+
@app2.get("/health")
84+
async def app2_health() -> dict[str, str]:
85+
return {"status": "healthy"}
86+
87+
88+
if __name__ == "__main__":
89+
serve_ctx = flyte.with_servecontext(mode="local")
90+
91+
# Serve app1 first (app2 depends on it)
92+
local_app1 = serve_ctx.serve(app1_env)
93+
local_app1.activate(wait=True)
94+
print(f"App 1 (square) is ready at {local_app1.endpoint}")
95+
96+
# Serve app2
97+
local_app2 = serve_ctx.serve(app2_env)
98+
local_app2.activate(wait=True)
99+
print(f"App 2 (square+1) is ready at {local_app2.endpoint}")
100+
101+
# Call app2 which internally calls app1
102+
response = httpx.get(f"{local_app2.endpoint}", params={"x": 5})
103+
response.raise_for_status()
104+
data = response.json()
105+
print(f"Response: {data}")
106+
assert data["result"] == 26 # 5^2 + 1 = 26
107+
108+
# Shut down both apps
109+
local_app2.deactivate(wait=True)
110+
local_app1.deactivate(wait=True)
111+
print("Done!")
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"""Local serving example where a FastAPI app calls a Flyte task.
2+
3+
This example demonstrates how to serve a FastAPI app locally that invokes a
4+
Flyte task from one of its endpoints. The task is executed via
5+
``flyte.with_runcontext(mode="local").run(...)`` so it runs in the same process.
6+
7+
Usage (SDK):
8+
python examples/apps/local_app_calling_task.py
9+
"""
10+
11+
import httpx
12+
from fastapi import FastAPI
13+
14+
import flyte
15+
from flyte.app.extras import FastAPIAppEnvironment
16+
17+
app = FastAPI(
18+
title="App Calling Task",
19+
description="A FastAPI app that delegates computation to a Flyte task",
20+
version="1.0.0",
21+
)
22+
23+
app_env = FastAPIAppEnvironment(
24+
name="local-app-calling-task",
25+
app=app,
26+
description="App that calls a Flyte task to double a number",
27+
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages("fastapi", "uvicorn", "httpx"),
28+
resources=flyte.Resources(cpu=1, memory="512Mi"),
29+
requires_auth=False,
30+
)
31+
32+
task_env = flyte.TaskEnvironment(
33+
name="local-doubler-task-env",
34+
image=flyte.Image.from_debian_base(python_version=(3, 12)),
35+
resources=flyte.Resources(cpu=1, memory="512Mi"),
36+
)
37+
38+
39+
@task_env.task
40+
async def double(x: int) -> int:
41+
"""A simple task that doubles the input."""
42+
return x * 2
43+
44+
45+
@app.get("/")
46+
async def double_endpoint(x: int) -> dict[str, int]:
47+
"""Endpoint that invokes the ``double`` task and returns the result."""
48+
result = flyte.with_runcontext(mode="local").run(double, x=x)
49+
return {"result": result.outputs()[0]}
50+
51+
52+
@app.get("/health")
53+
async def health() -> dict[str, str]:
54+
"""Health check endpoint."""
55+
return {"status": "healthy"}
56+
57+
58+
if __name__ == "__main__":
59+
# Serve the app locally (non-blocking)
60+
local_app = flyte.with_servecontext(mode="local").serve(app_env)
61+
62+
# Wait for the app to be ready
63+
local_app.activate(wait=True)
64+
print(f"App is ready at {local_app.endpoint}")
65+
66+
# Call the app endpoint which internally runs the task
67+
response = httpx.get(f"{local_app.endpoint}", params={"x": 21})
68+
response.raise_for_status()
69+
data = response.json()
70+
print(f"Response: {data}")
71+
assert data["result"] == 42
72+
73+
# Shut down the local app
74+
local_app.deactivate(wait=True)
75+
print("Done!")
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
"""Local serving example using ephemeral_server context manager.
2+
3+
This example demonstrates how to serve a FastAPI app locally using the
4+
ephemeral_server() context manager, which ensures the app is properly
5+
activated and deactivated. The FastAPI lifespan simulates loading a model
6+
on startup.
7+
8+
Usage (SDK):
9+
python examples/apps/local_app_ephemeral_server.py
10+
11+
Usage (CLI):
12+
flyte serve --local examples/apps/local_app_ephemeral_server.py app_env
13+
"""
14+
15+
import asyncio
16+
from contextlib import asynccontextmanager
17+
18+
import httpx
19+
from fastapi import FastAPI
20+
21+
import flyte
22+
from flyte.app.extras import FastAPIAppEnvironment
23+
24+
25+
@asynccontextmanager
26+
async def lifespan(app: FastAPI):
27+
"""Lifespan function that simulates loading a model on startup."""
28+
print("Simulate loading model...")
29+
await asyncio.sleep(3)
30+
app.state.model = {"m": 5, "b": 11}
31+
print("Model loaded!")
32+
yield
33+
print("Shutting down, releasing model resources...")
34+
35+
36+
app = FastAPI(
37+
title="Local Linear Regression",
38+
description="A local FastAPI app that performs linear regression",
39+
version="1.0.0",
40+
lifespan=lifespan,
41+
)
42+
43+
app_env = FastAPIAppEnvironment(
44+
name="local-linear-regression",
45+
app=app,
46+
description="Performs linear regression",
47+
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages("fastapi", "uvicorn", "httpx"),
48+
resources=flyte.Resources(cpu=1, memory="512Mi"),
49+
requires_auth=False,
50+
)
51+
52+
task_env = flyte.TaskEnvironment(
53+
name="local-linear-regression-task-env",
54+
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages("httpx"),
55+
resources=flyte.Resources(cpu=1, memory="512Mi"),
56+
)
57+
58+
59+
@app.post("/predict")
60+
async def predict(x: int) -> dict[str, int]:
61+
"""Perform linear regression."""
62+
result = app.state.model["m"] * x + app.state.model["b"]
63+
return {"result": result}
64+
65+
66+
@app.get("/health")
67+
async def health() -> dict[str, str]:
68+
"""Health check endpoint."""
69+
return {"status": "healthy"}
70+
71+
72+
@task_env.task
73+
async def predict_task(data: list[int]) -> list[int]:
74+
"""Task that calls the local app endpoint."""
75+
print(f"Calling app at {app_env.endpoint}")
76+
async with httpx.AsyncClient() as client:
77+
results = []
78+
for x in data:
79+
response = await client.post(f"{app_env.endpoint}/predict", params={"x": x})
80+
response.raise_for_status()
81+
results.append(response.json()["result"])
82+
return results
83+
84+
85+
if __name__ == "__main__":
86+
# Serve the app locally (non-blocking)
87+
local_app = flyte.with_servecontext(mode="local").serve(app_env)
88+
89+
# Use ephemeral_server to ensure the app is activated and deactivated
90+
async def main():
91+
async with local_app.ephemeral_ctx():
92+
print(f"App is ready at {local_app.endpoint}")
93+
94+
# Call the app endpoint directly
95+
async with httpx.AsyncClient() as client:
96+
response = await client.post(f"{local_app.endpoint}/predict", params={"x": 5})
97+
response.raise_for_status()
98+
print(f"Direct call result: {response.json()}")
99+
100+
# Run a task that calls the local app
101+
result = await flyte.with_runcontext(mode="local").run.aio(predict_task, data=[5, 10, 15])
102+
print(f"Task result: {result.outputs()[0]}")
103+
assert result.outputs()[0] == [36, 61, 86]
104+
105+
assert local_app.is_deactivated()
106+
107+
asyncio.run(main())
108+
# App is automatically deactivated after exiting the context
109+
print("Done!")

examples/apps/local_command_app.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
"""Local serving example using command-based AppEnvironment.
2+
3+
This example demonstrates how to serve an app locally using the ``command``
4+
specification pattern. The app is run as a subprocess.
5+
6+
Usage (CLI):
7+
flyte serve --local examples/apps/local_command_app.py app_env
8+
9+
Note: This example uses ``python -m http.server`` as the command, which only
10+
serves static files. For a real application, you would use a proper web server
11+
command like ``uvicorn main:app --host 0.0.0.0 --port 8082``.
12+
"""
13+
14+
import flyte
15+
from flyte.app import AppEnvironment
16+
17+
app_env = AppEnvironment(
18+
name="local-static-server",
19+
image=flyte.Image.from_debian_base(python_version=(3, 12)),
20+
resources=flyte.Resources(cpu=1, memory="512Mi"),
21+
command="python -m http.server 8082",
22+
port=8082,
23+
requires_auth=False,
24+
)
25+
26+
27+
if __name__ == "__main__":
28+
import httpx
29+
30+
flyte.init()
31+
32+
# Serve the app locally (non-blocking)
33+
local_app = flyte.with_servecontext(mode="local", health_check_path="/").serve(app_env)
34+
35+
# Wait for the app to be ready
36+
local_app.activate(wait=True)
37+
print(f"App is ready at {local_app.endpoint}")
38+
39+
# Test the endpoint
40+
response = httpx.get(local_app.endpoint)
41+
print(f"Response status: {response.status_code}")
42+
43+
# Shut down the local app
44+
local_app.deactivate(wait=True)
45+
print("Done!")

0 commit comments

Comments
 (0)