Skip to content

Commit b78ba0b

Browse files
committed
working fastapi postgres pubsub
1 parent f92b2f1 commit b78ba0b

File tree

11 files changed

+631
-0
lines changed

11 files changed

+631
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
FROM mcr.microsoft.com/devcontainers/python:3.12-bookworm
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"build": {
3+
"dockerfile": "Dockerfile",
4+
"context": ".."
5+
},
6+
"features": {
7+
"ghcr.io/defanglabs/devcontainer-feature/defang-cli:1.0.4": {},
8+
"ghcr.io/devcontainers/features/docker-in-docker:2": {},
9+
"ghcr.io/devcontainers/features/aws-cli:1": {}
10+
}
11+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
name: Deploy
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
8+
jobs:
9+
deploy:
10+
environment: playground
11+
runs-on: ubuntu-latest
12+
permissions:
13+
contents: read
14+
id-token: write
15+
16+
steps:
17+
- name: Checkout Repo
18+
uses: actions/checkout@v4
19+
20+
- name: Deploy
21+
uses: DefangLabs/[email protected]
22+
with:
23+
config-env-vars: POSTGRES_PASSWORD SSL_MODE
24+
env:
25+
POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }}
26+
SSL_MODE: ${{ secrets.SSL_MODE }}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# FastAPI Postgres Pub/Sub Chat
2+
3+
[![1-click-deploy](https://raw.githubusercontent.com/DefangLabs/defang-assets/main/Logos/Buttons/SVG/deploy-with-defang.svg)](https://portal.defang.dev/redirect?url=https%3A%2F%2Fgithub.com%2Fnew%3Ftemplate_name%3Dsample-fastapi-postgres-pubsub-template%26template_owner%3DDefangSamples)
4+
5+
This sample pairs FastAPI with PostgreSQL `LISTEN/NOTIFY` to demonstrate real-time updates between two application containers. A minimal chat UI sends messages with a REST request, and both FastAPI instances broadcast the new message over WebSockets after Postgres notifies them.
6+
7+
## Prerequisites
8+
9+
1. Download [Defang CLI](https://github.com/DefangLabs/defang)
10+
2. (Optional) If you are using [Defang BYOC](https://docs.defang.io/docs/concepts/defang-byoc) authenticate with your cloud provider account
11+
3. (Optional for local development) [Docker CLI](https://docs.docker.com/engine/install/)
12+
13+
## Development
14+
15+
To run the application locally, you can use the following command:
16+
17+
```bash
18+
docker compose -f compose.dev.yaml up --build
19+
```
20+
21+
Once everything is running:
22+
- Visit [http://localhost:8000](http://localhost:8000) for the first FastAPI service.
23+
- Visit [http://localhost:8001](http://localhost:8001) for the second service.
24+
- Send a chat message in either window. Both pages should update immediately, proving Postgres `LISTEN/NOTIFY` fans the event across containers.
25+
26+
Stop the stack with `Ctrl+C`, then run `docker compose -f compose.dev.yaml down`.
27+
28+
## Configuration
29+
30+
For this sample, you can rely on the defaults. Override them with environment variables if needed:
31+
32+
> Note that if you are using the 1-click deploy option, you can set these values as secrets in your GitHub repository and the action will automatically deploy them for you.
33+
34+
### `POSTGRES_PASSWORD`
35+
Database password (default `chat_password`).
36+
```bash
37+
defang config set POSTGRES_PASSWORD --random
38+
```
39+
40+
### `SSL_MODE`
41+
42+
Postgres SSL mode (default `disable`, should set to `require` in production).
43+
```bash
44+
defang config set SSL_MODE=require
45+
```
46+
47+
## Deployment
48+
49+
> [!NOTE]
50+
> Download [Defang CLI](https://github.com/DefangLabs/defang)
51+
52+
### Defang Playground
53+
54+
Deploy your application to the Defang Playground by opening up your terminal and typing:
55+
```bash
56+
defang compose up
57+
```
58+
59+
### BYOC
60+
61+
If you want to deploy to your own cloud account, you can [use Defang BYOC](https://docs.defang.io/docs/tutorials/deploy-to-your-cloud).
62+
63+
---
64+
65+
Title: FastAPI Postgres Pub/Sub
66+
67+
Short Description: FastAPI sample that stores messages in Postgres and streams them to two app instances via LISTEN/NOTIFY.
68+
69+
Tags: FastAPI, PostgreSQL, WebSockets, PubSub
70+
71+
Languages: Python, SQL
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.venv
2+
.env
3+
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
FROM python:3.11-slim
2+
3+
ENV PYTHONUNBUFFERED=1
4+
ENV PYTHONDONTWRITEBYTECODE=1
5+
6+
WORKDIR /app
7+
8+
RUN apt-get update \
9+
&& apt-get install -y --no-install-recommends build-essential libpq-dev curl \
10+
&& rm -rf /var/lib/apt/lists/*
11+
12+
COPY requirements.txt ./
13+
RUN pip install --no-cache-dir --upgrade pip \
14+
&& pip install --no-cache-dir -r requirements.txt
15+
16+
COPY . .
17+
18+
EXPOSE 8000
19+
20+
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
"""Simplest possible FastAPI chat demo using PostgreSQL LISTEN/NOTIFY.
2+
3+
The goal of this file is to show the basic flow end to end:
4+
5+
1. A user submits a chat message with ``POST /messages``.
6+
2. The API stores the message in PostgreSQL and issues ``pg_notify``.
7+
3. Every running API instance listens for that notification and forwards it
8+
to connected WebSocket clients.
9+
4. The browser keeps a WebSocket open so it can display new messages instantly.
10+
"""
11+
12+
from __future__ import annotations
13+
14+
import asyncio
15+
import json
16+
import logging
17+
import os
18+
from datetime import datetime
19+
from pathlib import Path
20+
from typing import Any
21+
22+
import asyncpg
23+
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
24+
from fastapi.responses import HTMLResponse
25+
from pydantic import BaseModel, Field
26+
27+
logger = logging.getLogger("chat")
28+
logging.basicConfig(level=logging.INFO)
29+
30+
CHAT_CHANNEL = "chat_messages"
31+
MIGRATION_LOCK_ID = 101
32+
TEMPLATE_PATH = Path(__file__).resolve().parent / "templates" / "index.html"
33+
34+
35+
class MessageCreate(BaseModel):
36+
"""Request body for ``POST /messages``."""
37+
38+
message: str = Field(..., min_length=1, max_length=500)
39+
40+
41+
class WebSocketRegistry:
42+
"""Keeps track of the browsers that are connected via WebSocket."""
43+
44+
def __init__(self) -> None:
45+
self._sockets: set[WebSocket] = set()
46+
self._lock = asyncio.Lock()
47+
48+
async def add(self, socket: WebSocket) -> None:
49+
async with self._lock:
50+
self._sockets.add(socket)
51+
52+
async def remove(self, socket: WebSocket) -> None:
53+
async with self._lock:
54+
self._sockets.discard(socket)
55+
56+
async def broadcast(self, payload: dict[str, Any]) -> None:
57+
async with self._lock:
58+
sockets = list(self._sockets)
59+
for socket in sockets:
60+
try:
61+
await socket.send_json(payload)
62+
except Exception:
63+
await self.remove(socket)
64+
65+
66+
def build_database_dsn() -> str:
67+
"""Build a PostgreSQL connection string from environment variables."""
68+
69+
url = os.getenv("DATABASE_URL")
70+
if url:
71+
return url
72+
raise ValueError("DATABASE_URL environment variable is not set")
73+
74+
async def create_tables(pool: asyncpg.Pool) -> None:
75+
"""Make sure the ``messages`` table exists."""
76+
77+
async with pool.acquire() as conn:
78+
await conn.execute("SELECT pg_advisory_lock($1)", MIGRATION_LOCK_ID)
79+
try:
80+
await conn.execute(
81+
"""
82+
CREATE TABLE IF NOT EXISTS messages (
83+
id BIGSERIAL PRIMARY KEY,
84+
message TEXT NOT NULL,
85+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
86+
);
87+
"""
88+
)
89+
finally:
90+
await conn.execute("SELECT pg_advisory_unlock($1)", MIGRATION_LOCK_ID)
91+
92+
93+
async def fetch_recent_messages(pool: asyncpg.Pool, limit: int = 50) -> list[dict[str, Any]]:
94+
"""Return the most recent chat messages."""
95+
96+
async with pool.acquire() as conn:
97+
rows = await conn.fetch(
98+
"""
99+
SELECT id, message, created_at
100+
FROM messages
101+
ORDER BY created_at ASC
102+
LIMIT $1
103+
""",
104+
limit,
105+
)
106+
return [
107+
{
108+
"id": row["id"],
109+
"message": row["message"],
110+
"created_at": row["created_at"].isoformat(),
111+
}
112+
for row in rows
113+
]
114+
115+
116+
async def save_message(pool: asyncpg.Pool, text: str) -> dict[str, Any]:
117+
"""Insert a message and notify other app instances about it."""
118+
119+
message = text.strip()
120+
if not message:
121+
raise HTTPException(status_code=400, detail="Message must not be empty")
122+
123+
async with pool.acquire() as conn:
124+
async with conn.transaction():
125+
record = await conn.fetchrow(
126+
"""
127+
INSERT INTO messages (message)
128+
VALUES ($1)
129+
RETURNING id, message, created_at
130+
""",
131+
message,
132+
)
133+
body = {
134+
"type": "message",
135+
"data": {
136+
"id": record["id"],
137+
"message": record["message"],
138+
"created_at": record["created_at"].isoformat(),
139+
},
140+
}
141+
await conn.execute("SELECT pg_notify($1, $2)", CHAT_CHANNEL, json.dumps(body))
142+
return body["data"]
143+
144+
145+
async def forward_notification(payload: str) -> None:
146+
"""Handle the JSON payload coming from PostgreSQL."""
147+
148+
try:
149+
data = json.loads(payload)
150+
except json.JSONDecodeError:
151+
logger.warning("Ignoring unexpected payload: %s", payload)
152+
return
153+
await app.state.connections.broadcast(data)
154+
155+
156+
async def notification_listener(stop_event: asyncio.Event) -> None:
157+
"""Listen for ``pg_notify`` events until the app shuts down."""
158+
159+
loop = asyncio.get_running_loop()
160+
conn = await asyncpg.connect(dsn=app.state.database_dsn)
161+
162+
def _listener(_connection: asyncpg.Connection, _pid: int, _channel: str, payload: str) -> None:
163+
loop.create_task(forward_notification(payload))
164+
165+
await conn.add_listener(CHAT_CHANNEL, _listener)
166+
try:
167+
await stop_event.wait()
168+
finally:
169+
await conn.remove_listener(CHAT_CHANNEL, _listener)
170+
await conn.close()
171+
172+
173+
app = FastAPI(title="FastAPI Postgres Pub/Sub Chat")
174+
app.state.connections = WebSocketRegistry()
175+
app.state.listener_stop: asyncio.Event | None = None
176+
app.state.listener_task: asyncio.Task | None = None
177+
app.state.db_pool: asyncpg.Pool | None = None
178+
app.state.database_dsn = build_database_dsn()
179+
180+
181+
def get_pool() -> asyncpg.Pool:
182+
pool = app.state.db_pool
183+
if pool is None:
184+
raise HTTPException(status_code=500, detail="Database connection not ready")
185+
return pool
186+
187+
188+
@app.on_event("startup")
189+
async def startup() -> None:
190+
pool = await asyncpg.create_pool(dsn=app.state.database_dsn, min_size=1, max_size=5)
191+
app.state.db_pool = pool
192+
await create_tables(pool)
193+
194+
stop_event = asyncio.Event()
195+
app.state.listener_stop = stop_event
196+
app.state.listener_task = asyncio.create_task(notification_listener(stop_event))
197+
198+
199+
@app.on_event("shutdown")
200+
async def shutdown() -> None:
201+
stop_event = app.state.listener_stop
202+
if stop_event:
203+
stop_event.set()
204+
listener_task = app.state.listener_task
205+
if listener_task:
206+
await listener_task
207+
208+
pool = app.state.db_pool
209+
if pool:
210+
await pool.close()
211+
212+
213+
@app.get("/", response_class=HTMLResponse)
214+
async def index() -> HTMLResponse:
215+
if not TEMPLATE_PATH.exists():
216+
raise HTTPException(status_code=500, detail="Template not found")
217+
return HTMLResponse(TEMPLATE_PATH.read_text(encoding="utf-8"))
218+
219+
220+
@app.post("/messages")
221+
async def create_message(payload: MessageCreate) -> dict[str, Any]:
222+
pool = get_pool()
223+
message = await save_message(pool, payload.message)
224+
return message
225+
226+
227+
@app.websocket("/ws")
228+
async def websocket_endpoint(websocket: WebSocket) -> None:
229+
pool = get_pool()
230+
await websocket.accept()
231+
await app.state.connections.add(websocket)
232+
233+
try:
234+
history = await fetch_recent_messages(pool)
235+
await websocket.send_json({"type": "history", "messages": history})
236+
237+
while True:
238+
await websocket.receive_text()
239+
except WebSocketDisconnect:
240+
pass
241+
finally:
242+
await app.state.connections.remove(websocket)
243+
244+
245+
@app.get("/healthz")
246+
async def healthcheck() -> dict[str, str]:
247+
pool = get_pool()
248+
async with pool.acquire() as conn:
249+
await conn.execute("SELECT 1")
250+
return {"status": "ok", "timestamp": datetime.utcnow().isoformat()}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
fastapi==0.110.1
2+
uvicorn[standard]==0.29.0
3+
asyncpg==0.29.0
4+
jinja2==3.1.3

0 commit comments

Comments
 (0)