Skip to content

Commit 804b0ec

Browse files
authored
Merge pull request #15 from leonh/claude/test-and-update-project-011CV4pcoMg9pBh6Di4WuPUV
Claude/test and update project 011 cv4pco mg9p bh6 di4 wu puv
2 parents 05cf498 + a8c8c0e commit 804b0ec

File tree

5 files changed

+117
-77
lines changed

5 files changed

+117
-77
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM python:3.8-alpine
1+
FROM python:3.11-alpine
22
WORKDIR /code
33
# ENV FLASK_APP app.py
44
RUN apk add --no-cache gcc musl-dev linux-headers make python3-dev openssl-dev libffi-dev git

README.md

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,48 @@
11
# redis-streams-fastapi-chat
22
A simple demo of Redis Streams backed Chat app using Websockets, Python Asyncio and FastAPI/Starlette.
33

4-
Requires Python version >= 3.6 and Redis
4+
Requires Python version >= 3.11 and Redis 7+
55

66
# Overview
77
This project has been created to help understand some related concepts. Python standard library asyncio, websockets (which are often cited as a classic use case for async python code), also Redis Streams. It is very much inteded to be an intentionally simple starting point rather than a usable product as is.
88

99
# Installation
1010

11+
## Local Development
12+
1113
```shell
1214
$ pip install -r requirements.txt
1315
```
1416

17+
Make sure you have Redis running locally:
18+
```shell
19+
$ redis-server
20+
```
21+
1522
# Usage
1623

24+
## Local Development
25+
1726
```shell
1827
$ python chat.py
1928
```
2029

21-
# Docker compose
22-
If you don't have redis installed you can use the docker-compose.yml file to set up a
23-
working environment.
30+
Then open http://localhost:9080 in your browser.
31+
32+
## Docker Compose
33+
34+
The easiest way to run the application with all dependencies:
35+
36+
```shell
37+
$ docker-compose up
38+
```
39+
40+
This will start both the chat application and Redis in containers. The app will be available at http://localhost:9080
41+
42+
## Environment Variables
43+
44+
The following environment variables can be configured:
45+
46+
- `REDIS_HOST` - Redis server hostname (default: `localhost`, set to `redis` in Docker)
47+
- `REDIS_PORT` - Redis server port (default: `6379`)
2448

chat.py

Lines changed: 71 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
import os
22
import asyncio
3-
import aioredis
3+
from redis import asyncio as aioredis
44
import uvloop
55
import socket
66
import uuid
77
import contextvars
8+
from contextlib import asynccontextmanager
89
from fastapi import FastAPI, Depends, Request
910
from starlette.staticfiles import StaticFiles
1011
from starlette.templating import Jinja2Templates
1112
from starlette.middleware.base import BaseHTTPMiddleware
1213
from starlette.websockets import WebSocket, WebSocketDisconnect
1314

1415
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
15-
from aioredis.errors import ConnectionClosedError as ServerConnectionClosedError
16+
from redis.exceptions import ConnectionError as ServerConnectionClosedError
1617

17-
REDIS_HOST = 'localhost'
18-
REDIS_PORT = 6379
18+
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
19+
REDIS_PORT = int(os.getenv('REDIS_PORT', '6379'))
1920
XREAD_TIMEOUT = 0
2021
XREAD_COUNT = 100
2122
NUM_PREVIOUS = 30
@@ -43,7 +44,32 @@ async def dispatch(self, request, call_next):
4344

4445

4546
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
46-
app = FastAPI()
47+
48+
@asynccontextmanager
49+
async def lifespan(app: FastAPI):
50+
# Startup
51+
try:
52+
redis_url = f"redis://{REDIS_HOST}:{REDIS_PORT}"
53+
pool = await aioredis.from_url(
54+
redis_url,
55+
encoding='utf-8',
56+
decode_responses=True,
57+
max_connections=20
58+
)
59+
cvar_redis.set(pool)
60+
print("Connected to Redis on ", REDIS_HOST, REDIS_PORT)
61+
except ConnectionRefusedError as e:
62+
print('cannot connect to redis on:', REDIS_HOST, REDIS_PORT)
63+
64+
yield
65+
66+
# Shutdown
67+
redis = cvar_redis.get()
68+
if redis:
69+
await redis.aclose()
70+
print("closed connection Redis on ", REDIS_HOST, REDIS_PORT)
71+
72+
app = FastAPI(lifespan=lifespan)
4773
app.add_middleware(CustomHeaderMiddleware)
4874
templates = Jinja2Templates(directory="templates")
4975

@@ -73,8 +99,12 @@ def get_local_ip():
7399

74100
async def get_redis_pool():
75101
try:
76-
pool = await aioredis.create_redis_pool(
77-
(REDIS_HOST, REDIS_PORT), encoding='utf-8')
102+
redis_url = f"redis://{REDIS_HOST}:{REDIS_PORT}"
103+
pool = await aioredis.from_url(
104+
redis_url,
105+
encoding='utf-8',
106+
decode_responses=True
107+
)
78108
return pool
79109
except ConnectionRefusedError as e:
80110
print('cannot connect to redis on:', REDIS_HOST, REDIS_PORT)
@@ -97,21 +127,20 @@ async def ws_send_moderator(websocket: WebSocket, chat_info: dict):
97127
"""
98128
pool = await get_redis_pool()
99129
streams = chat_info['room'].split(',')
100-
latest_ids = ['$' for i in streams]
130+
latest_ids = {stream: '$' for stream in streams}
101131
ws_connected = True
102132
print(streams, latest_ids)
103133
while pool and ws_connected:
104134
try:
105135
events = await pool.xread(
106-
streams=streams,
136+
streams=latest_ids,
107137
count=XREAD_COUNT,
108-
timeout=XREAD_TIMEOUT,
109-
latest_ids=latest_ids
138+
block=5000 # Block for 5 seconds waiting for new messages
110139
)
111-
for _, e_id, e in events:
112-
e['e_id'] = e_id
113-
await websocket.send_json(e)
114-
#latest_ids = [e_id]
140+
for stream, messages in events:
141+
for e_id, e in messages:
142+
e['e_id'] = e_id
143+
await websocket.send_json(e)
115144
except ConnectionClosedError:
116145
ws_connected = False
117146

@@ -130,18 +159,19 @@ async def ws_send(websocket: WebSocket, chat_info: dict):
130159
:type chat_info:
131160
"""
132161
pool = await get_redis_pool()
133-
latest_ids = ['$']
162+
stream_key = cvar_tenant.get() + ":stream"
163+
latest_ids = {stream_key: '$'}
134164
ws_connected = True
135165
first_run = True
136166
while pool and ws_connected:
137167
try:
138168
if first_run:
139169
# fetch some previous chat history
140170
events = await pool.xrevrange(
141-
stream=cvar_tenant.get() + ":stream",
171+
name=stream_key,
142172
count=NUM_PREVIOUS,
143-
start='+',
144-
stop='-'
173+
min='-',
174+
max='+'
145175
)
146176
first_run = False
147177
events.reverse()
@@ -150,15 +180,15 @@ async def ws_send(websocket: WebSocket, chat_info: dict):
150180
await websocket.send_json(e)
151181
else:
152182
events = await pool.xread(
153-
streams=[cvar_tenant.get() + ":stream"],
183+
streams=latest_ids,
154184
count=XREAD_COUNT,
155-
timeout=XREAD_TIMEOUT,
156-
latest_ids=latest_ids
185+
block=5000 # Block for 5 seconds waiting for new messages
157186
)
158-
for _, e_id, e in events:
159-
e['e_id'] = e_id
160-
await websocket.send_json(e)
161-
latest_ids = [e_id]
187+
for stream, messages in events:
188+
for e_id, e in messages:
189+
e['e_id'] = e_id
190+
await websocket.send_json(e)
191+
latest_ids = {stream_key: e_id}
162192
#print('################contextvar ', cvar_tenant.get())
163193
except ConnectionClosedError:
164194
ws_connected = False
@@ -169,7 +199,7 @@ async def ws_send(websocket: WebSocket, chat_info: dict):
169199
except ServerConnectionClosedError:
170200
print('redis server connection closed')
171201
return
172-
pool.close()
202+
await pool.aclose()
173203

174204

175205
async def ws_recieve(websocket: WebSocket, chat_info: dict):
@@ -205,10 +235,12 @@ async def ws_recieve(websocket: WebSocket, chat_info: dict):
205235
'type': 'comment',
206236
'room': chat_info['room']
207237
}
208-
await pool.xadd(stream=cvar_tenant.get() + ":stream",
209-
fields=fields,
210-
message_id=b'*',
211-
max_len=STREAM_MAX_LEN)
238+
await pool.xadd(
239+
name=cvar_tenant.get() + ":stream",
240+
fields=fields,
241+
id='*',
242+
maxlen=STREAM_MAX_LEN
243+
)
212244
#print('################contextvar ', cvar_tenant.get())
213245
except WebSocketDisconnect:
214246
await remove_room_user(chat_info, pool)
@@ -223,7 +255,7 @@ async def ws_recieve(websocket: WebSocket, chat_info: dict):
223255
print('redis server connection closed')
224256
return
225257

226-
pool.close()
258+
await pool.aclose()
227259

228260

229261
async def add_room_user(chat_info: dict, pool):
@@ -259,10 +291,12 @@ async def announce(pool, chat_info: dict, action: str):
259291
}
260292
#print(fields)
261293

262-
await pool.xadd(stream=cvar_tenant.get() + ":stream",
263-
fields=fields,
264-
message_id=b'*',
265-
max_len=STREAM_MAX_LEN)
294+
await pool.xadd(
295+
name=cvar_tenant.get() + ":stream",
296+
fields=fields,
297+
id='*',
298+
maxlen=STREAM_MAX_LEN
299+
)
266300

267301

268302
async def chat_info_vars(username: str = None, room: str = None):
@@ -355,30 +389,10 @@ async def verify_user_for_room(chat_info):
355389
# whitelist rooms
356390
if not chat_info['room'] in ALLOWED_ROOMS:
357391
verified = False
358-
pool.close()
392+
await pool.aclose()
359393
return verified
360394

361395

362-
@app.on_event("startup")
363-
async def handle_startup():
364-
try:
365-
pool = await aioredis.create_redis_pool(
366-
(REDIS_HOST, REDIS_PORT), encoding='utf-8', maxsize=20)
367-
cvar_redis.set(pool)
368-
print("Connected to Redis on ", REDIS_HOST, REDIS_PORT)
369-
except ConnectionRefusedError as e:
370-
print('cannot connect to redis on:', REDIS_HOST, REDIS_PORT)
371-
return
372-
373-
374-
@app.on_event("shutdown")
375-
async def handle_shutdown():
376-
redis = cvar_redis.get()
377-
redis.close()
378-
await redis.wait_closed()
379-
print("closed connection Redis on ", REDIS_HOST, REDIS_PORT)
380-
381-
382396
if __name__ == "__main__":
383397
import uvicorn
384398
print(dir(app))

docker-compose.yml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ services:
88
- "8082:8082"
99
volumes:
1010
- .:/code
11-
links:
12-
- "redis"
11+
environment:
12+
- REDIS_HOST=redis
13+
- REDIS_PORT=6379
14+
depends_on:
15+
- redis
1316
redis:
14-
image: "redis:6.0-rc2-alpine3.11"
17+
image: "redis:7-alpine"
1518
ports:
1619
- 6379:6379

requirements.txt

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
uvicorn==0.12.1
2-
websockets==9.1
3-
fastapi==0.65.2
4-
aioredis==1.3.1
5-
redis==4.5.4
6-
uvloop==0.15.2
7-
jinja2==2.11.3
8-
aiofiles==0.6.0
9-
httpx==0.23.0
10-
itsdangerous==1.1.0
11-
databases[sqlite]==0.4.3
12-
sqlalchemy==1.3.0
1+
uvicorn==0.30.6
2+
websockets==12.0
3+
fastapi==0.115.0
4+
redis==5.0.8
5+
uvloop==0.20.0
6+
jinja2==3.1.4
7+
aiofiles==24.1.0
8+
httpx==0.27.2
9+
itsdangerous==2.2.0
10+
databases[sqlite]==0.9.0
11+
sqlalchemy==2.0.35

0 commit comments

Comments
 (0)