Skip to content

Commit bc0521a

Browse files
committed
feat: implement WebSocket support and RabbitMQ integration for message handling
1 parent d875f25 commit bc0521a

File tree

6 files changed

+128
-26
lines changed

6 files changed

+128
-26
lines changed

src/modules/api/__init__.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1-
"""API module initialization."""
2-
print("API module loaded.")
3-
import src.modules.api.index
1+
import uvicorn
2+
from .app import app as api_app
3+
import logging
4+
5+
async def server():
6+
"""Main application entry point."""
7+
try:
8+
print("Start api...")
9+
config = uvicorn.Config(api_app, host="0.0.0.0", port=8080, log_level="info")
10+
server = uvicorn.Server(config)
11+
await server.serve()
12+
except Exception as e:
13+
logging.error(f"Application failed to start: {e}", exc_info=True)
14+
raise

src/modules/api/app.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,26 @@
11
"""FastAPI application factory."""
22

3-
from fastapi import FastAPI
3+
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
44
from fastapi.middleware.cors import CORSMiddleware
55

66
from .routes import v1_router
77

8+
class ConnectionManager:
9+
def __init__(self):
10+
self.active_connections: List[WebSocket] = []
11+
12+
async def connect(self, websocket: WebSocket):
13+
await websocket.accept()
14+
self.active_connections.append(websocket)
15+
16+
def disconnect(self, websocket: WebSocket):
17+
self.active_connections.remove(websocket)
18+
19+
async def broadcast(self, message: str):
20+
for connection in self.active_connections:
21+
await connection.send_text(message)
22+
23+
manager = ConnectionManager()
824

925
def create_app() -> FastAPI:
1026
"""Create and configure the FastAPI application."""
@@ -31,6 +47,18 @@ async def startup_event():
3147
async def shutdown_event():
3248
"""Run shutdown events."""
3349
pass
50+
51+
@app.websocket("/ws")
52+
async def websocket_endpoint(websocket: WebSocket):
53+
await manager.connect(websocket)
54+
try:
55+
while True:
56+
data = await websocket.receive_text()
57+
print(f"Received: {data}")
58+
await manager.broadcast(f"Message from client: {data}")
59+
except WebSocketDisconnect:
60+
manager.disconnect(websocket)
61+
await manager.broadcast("A client disconnected")
3462

3563
return app
3664

src/modules/api/index.py

Lines changed: 0 additions & 21 deletions
This file was deleted.

src/modules/api/routes/v1.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22

33
from fastapi import APIRouter
44
from typing import Dict, List
5-
5+
from src.modules.transporter import publish_message
66

77
# Create v1 router
88
router = APIRouter(prefix='/v1', tags=['v1'])
99

1010
@router.get("/hello")
1111
async def hello_world() -> Dict[str, str]:
1212
"""Hello world endpoint."""
13+
publish_message("hello-python", "Hello from FastAPI!")
1314
return {"message": "Hello, World!"}
1415

1516
@router.get("/health")
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# app/utils/__init__.py
2+
3+
import os
4+
import importlib
5+
import pkgutil
6+
7+
# Lấy tên package hiện tại
8+
__all__ = []
9+
10+
# Duyệt tất cả module con trong thư mục hiện tại (không đệ quy)
11+
package_dir = os.path.dirname(__file__)
12+
for _, module_name, is_pkg in pkgutil.iter_modules([package_dir]):
13+
if not is_pkg and module_name != "__init__":
14+
module = importlib.import_module(f".{module_name}", package=__name__)
15+
16+
# Nếu module có biến __all__, import những gì nó khai báo
17+
if hasattr(module, "__all__"):
18+
for attr in module.__all__:
19+
globals()[attr] = getattr(module, attr)
20+
__all__.append(attr)
21+
else:
22+
# Import tất cả public attributes (không bắt đầu bằng _)
23+
for attr in dir(module):
24+
if not attr.startswith("_"):
25+
globals()[attr] = getattr(module, attr)
26+
__all__.append(attr)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import pika
2+
import threading
3+
4+
credentials = pika.PlainCredentials('guest', 'guest')
5+
parameters = pika.ConnectionParameters(
6+
host='ubuntu',
7+
port=5672, # default RabbitMQ port
8+
virtual_host='/', # default vhost, change if needed
9+
credentials=credentials
10+
)
11+
12+
def queue_listener(queue_name, host="localhost"):
13+
def decorator(func):
14+
def start_consumer():
15+
connection = pika.BlockingConnection(parameters)
16+
channel = connection.channel()
17+
channel.queue_declare(queue=queue_name, durable=True)
18+
19+
def callback(ch, method, properties, body):
20+
try:
21+
func(body.decode()) # Gọi hàm xử lý
22+
ch.basic_ack(delivery_tag=method.delivery_tag)
23+
except Exception as e:
24+
print(f"Error handling message: {e}")
25+
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
26+
27+
channel.basic_qos(prefetch_count=1)
28+
channel.basic_consume(queue=queue_name, on_message_callback=callback)
29+
print(f"[+] Listening on queue '{queue_name}'")
30+
channel.start_consuming()
31+
32+
# chạy consumer trên thread riêng (không chặn main thread)
33+
threading.Thread(target=start_consumer, daemon=True).start()
34+
return func
35+
return decorator
36+
37+
@queue_listener("hello-python")
38+
def process_message(msg):
39+
print(f"[x] Received: {msg}")
40+
41+
42+
# publish a message to the queue
43+
def publish_message(queue_name, message):
44+
connection = pika.BlockingConnection(parameters)
45+
channel = connection.channel()
46+
# channel.queue_declare(queue=queue_name, durable=True)
47+
channel.basic_publish(
48+
exchange='',
49+
routing_key=queue_name,
50+
body=message,
51+
properties=pika.BasicProperties(
52+
delivery_mode=2, # make message persistent
53+
)
54+
)
55+
print(f"[x] Sent: {message}")
56+
57+
publish_message("hello-python", "RabbitMQ is running!")

0 commit comments

Comments
 (0)