Skip to content

Commit a0da90d

Browse files
authored
Merge pull request #34 from CS3219-AY2526Sem1/collab/basic-realtime
Basic Implementation of Collaborative Service
2 parents a596acd + 7b5e57a commit a0da90d

File tree

14 files changed

+261
-3
lines changed

14 files changed

+261
-3
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# Virtual environments
2+
**/venv/
3+
4+
.pytest_cache/
15
**/__pycache__/
26

37
# Ignore environment variables file
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
## To run locally:
2+
`uvicorn app.main:app --reload`
3+
## Planning
4+
5+
## APIs to implement:
6+
7+
From Matching (REST)
8+
[ ] /sessions
9+
-
10+
11+
From UI (Websocket)
12+
[ ] /
13+
! make session_id non-determined from user ids (for security purpose; hacker cannot hijack session solely from user data)
File renamed without changes.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
from typing import Dict
2+
import uuid
3+
import asyncio
4+
from app.schemas.messages import CollaboratorConnectMessage, CollaboratorDisconnectMessage, DisplayMessage
5+
from app.core.errors import SessionNotFoundError, UserNotFoundError, InvalidUserIDsError
6+
7+
class ConnectionManager:
8+
def __init__(self):
9+
self.active_connections: Dict[str, Dict[str, asyncio.Queue]] = {}
10+
self.condition = asyncio.Condition()
11+
12+
def init_session(self, user_ids: list[str]) -> str:
13+
if len(user_ids) != 2:
14+
raise InvalidUserIDsError()
15+
session_id = self.generate_uuid(user_ids[0], user_ids[1])
16+
17+
self.active_connections[session_id] = {}
18+
for user_id in user_ids:
19+
self.active_connections[session_id][user_id] = None
20+
return session_id
21+
22+
def _get_collaborator_q(self, session_id, user_id):
23+
session_data = self.active_connections[session_id]
24+
id = next((uid for uid in session_data if uid != user_id), None)
25+
if id is None:
26+
return None
27+
return session_data[id]
28+
29+
async def on_connect(self, session_id: str, user_id: str, out_q: asyncio.Queue):
30+
if session_id not in self.active_connections:
31+
raise SessionNotFoundError()
32+
33+
if user_id not in self.active_connections[session_id]:
34+
raise UserNotFoundError()
35+
36+
self.active_connections[session_id][user_id] = out_q
37+
38+
collaborator_q = self._get_collaborator_q(session_id, user_id)
39+
40+
if collaborator_q is None:
41+
return None
42+
43+
## Notify collaborator that this user has connected
44+
await collaborator_q.put(CollaboratorConnectMessage())
45+
46+
## Notify this user that the other user has already connected
47+
await out_q.put(CollaboratorConnectMessage())
48+
49+
async def on_disconnect(self, session_id: str, user_id: str):
50+
if session_id not in self.active_connections:
51+
raise SessionNotFoundError()
52+
53+
if user_id not in self.active_connections[session_id]:
54+
raise UserNotFoundError()
55+
56+
self.active_connections[session_id][user_id] = None
57+
58+
collaborator_q = self._get_collaborator_q(session_id, user_id)
59+
60+
if collaborator_q is None:
61+
return None
62+
63+
await collaborator_q.put(CollaboratorDisconnectMessage())
64+
65+
async def on_message(self, session_id: str, user_id: str, msg: str):
66+
collaborator_q = self._get_collaborator_q(session_id, user_id)
67+
68+
if collaborator_q is None:
69+
return None
70+
71+
await collaborator_q.put(DisplayMessage(msg=msg))
72+
73+
def generate_uuid(self, user1: str, user2: str) -> str:
74+
namespace = uuid.NAMESPACE_DNS
75+
seed = ''.join(sorted([user1, user2]))
76+
return str(uuid.uuid5(namespace, seed))
77+
78+
79+
connection_manager = ConnectionManager()
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
2+
3+
class InvalidUserIDsError(Exception):
4+
"""Exception raised when the provided user IDs are invalid."""
5+
msg = "user_ids must contain exactly two user IDs."
6+
pass
7+
8+
class UserNotFoundError(Exception):
9+
"""Exception raised when a user is not found in the system."""
10+
msg = "user_id not available in this session"
11+
pass
12+
13+
class SessionNotFoundError(Exception):
14+
"""Exception raised when a session is not found."""
15+
msg = "Session not initialized"
16+
pass
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from enum import Enum
2+
3+
class UserState(Enum):
4+
AWAIT_CONNECT = "awaiting_connection"
5+
AWAIT_POLLING = "awaiting_polling"
6+
7+
8+
9+
Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
from fastapi import FastAPI
2+
from app.ws import sessions as ws_sessions
3+
from app.rest import sessions
24

35
app = FastAPI()
6+
app.include_router(sessions.router, prefix="/sessions")
7+
app.include_router(ws_sessions.router, prefix="/ws/sessions")
48

59
## API for testing connection
6-
@app.get("/ping")
7-
def ping():
8-
return {"message": "Pong"}
10+
@app.get("/health")
11+
def health_check():
12+
return {"status": "Healthy"}

services/collaboration-service/app/rest/__init__.py

Whitespace-only changes.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
2+
from fastapi import APIRouter
3+
from app.schemas.collaboration import CreateSessionRequest, CreateSessionResponse
4+
from app.core.connection_manager import connection_manager
5+
6+
router = APIRouter()
7+
8+
@router.post("/", response_model=CreateSessionResponse)
9+
def create_session(request: CreateSessionRequest):
10+
user_ids = request.user_ids
11+
session_id = connection_manager.init_session(user_ids)
12+
return CreateSessionResponse(session_id=session_id)
13+
14+
15+

services/collaboration-service/app/schemas/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)