Skip to content

Commit a91a65e

Browse files
committed
Add queue leaving
1 parent 4fc49c1 commit a91a65e

File tree

8 files changed

+172
-73
lines changed

8 files changed

+172
-73
lines changed

frontend/components/matching/find-match.tsx

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import { SelectionSummary } from "@/components/matching/selection-summary";
66
import { useToast } from "@/components/hooks/use-toast";
77
import { useAuth } from "@/app/auth/auth-context";
88
import { joinMatchQueue } from "@/lib/join-match-queue";
9+
import { leaveMatchQueue } from "@/lib/leave-match-queue";
10+
import { matchingServiceWebSockUri } from "@/lib/api-uri";
11+
import { subscribeMatch } from "@/lib/subscribe-match";
912

1013
export default function FindMatch() {
1114
const [selectedDifficulty, setSelectedDifficulty] = useState<string>("");
@@ -71,17 +74,20 @@ export default function FindMatch() {
7174
selectedDifficulty
7275
);
7376
switch (response.status) {
74-
case 200:
77+
case 201:
7578
toast({
7679
title: "Matched",
7780
description: "Successfully matched",
7881
variant: "success",
7982
});
8083
return;
8184
case 202:
85+
case 304:
8286
setIsSearching(true);
83-
const ws = new WebSocket(
84-
`ws://localhost:6969/match/subscribe/${auth?.user?.id}/${selectedTopic}/${selectedDifficulty}`
87+
const ws = await subscribeMatch(
88+
auth?.user.id,
89+
selectedTopic,
90+
selectedDifficulty
8591
);
8692
ws.onmessage = () => {
8793
setIsSearching(false);
@@ -90,7 +96,16 @@ export default function FindMatch() {
9096
description: "Successfully matched",
9197
variant: "success",
9298
});
99+
ws.onclose = () => null;
93100
};
101+
ws.onclose = () => {
102+
setIsSearching(false);
103+
toast({
104+
title: "Matching Stopped",
105+
description: "Matching has been stopped",
106+
variant: "destructive",
107+
});
108+
}
94109
setWebsocket(ws);
95110
return;
96111
default:
@@ -103,9 +118,54 @@ export default function FindMatch() {
103118
}
104119
};
105120

106-
const handleCancel = () => {
107-
setIsSearching(false);
108-
setWaitTime(0);
121+
const handleCancel = async () => {
122+
if (!selectedDifficulty || !selectedTopic) {
123+
toast({
124+
title: "Invalid Selection",
125+
description: "Please select both a difficulty level and a topic",
126+
variant: "destructive",
127+
});
128+
return;
129+
}
130+
131+
if (!auth || !auth.token) {
132+
toast({
133+
title: "Access denied",
134+
description: "No authentication token found",
135+
variant: "destructive",
136+
});
137+
return;
138+
}
139+
140+
if (!auth.user) {
141+
toast({
142+
title: "Access denied",
143+
description: "Not logged in",
144+
variant: "destructive",
145+
});
146+
return;
147+
}
148+
149+
const response = await leaveMatchQueue(auth.token, auth.user?.id, selectedTopic, selectedDifficulty);
150+
switch (response.status) {
151+
case 200:
152+
setIsSearching(false);
153+
setWaitTime(0);
154+
setWebsocket(undefined);
155+
toast({
156+
title: "Success",
157+
description: "Successfully left queue",
158+
variant: "success",
159+
});
160+
return;
161+
default:
162+
toast({
163+
title: "Unknown Error",
164+
description: "An unexpected error has occured",
165+
variant: "destructive",
166+
});
167+
return;
168+
}
109169
};
110170

111171
return (

frontend/lib/api-uri.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
const constructUri = (baseUri: string, port: string | undefined) =>
22
`http://${process.env.NEXT_PUBLIC_BASE_URI || baseUri}:${port}`;
33

4+
const constructWebSockUri = (baseUri: string, port: string | undefined) =>
5+
`ws://${process.env.NEXT_PUBLIC_BASE_URI || baseUri}:${port}`;
6+
47
export const userServiceUri: (baseUri: string) => string = (baseUri) =>
58
constructUri(baseUri, process.env.NEXT_PUBLIC_USER_SVC_PORT);
69
export const questionServiceUri: (baseUri: string) => string = (baseUri) =>
710
constructUri(baseUri, process.env.NEXT_PUBLIC_QUESTION_SVC_PORT);
811
export const matchingServiceUri: (baseUri: string) => string = (baseUri) =>
912
constructUri(baseUri, process.env.NEXT_PUBLIC_MATCHING_SVC_PORT);
13+
14+
export const matchingServiceWebSockUri: (baseUri: string) => string = (baseUri) =>
15+
constructWebSockUri(baseUri, process.env.NEXT_PUBLIC_MATCHING_SVC_PORT);

frontend/lib/leave-match-queue.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { matchingServiceUri } from "@/lib/api-uri";
2+
3+
export const leaveMatchQueue = async (
4+
jwtToken: string,
5+
userId: string,
6+
category: string,
7+
complexity: string
8+
) => {
9+
const params = new URLSearchParams({
10+
topic: category,
11+
difficulty: complexity,
12+
}).toString();
13+
const response = await fetch(
14+
`${matchingServiceUri(window.location.hostname)}/match/queue/${userId}?${params}`,
15+
{
16+
method: "DELETE",
17+
headers: {
18+
Authorization: `Bearer ${jwtToken}`,
19+
"Content-Type": "application/json",
20+
},
21+
}
22+
);
23+
return response;
24+
};

frontend/lib/subscribe-match.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { matchingServiceWebSockUri } from "@/lib/api-uri";
2+
3+
export const subscribeMatch = async (
4+
userId: string,
5+
category: string,
6+
complexity: string
7+
) => {
8+
const params = new URLSearchParams({
9+
topic: category,
10+
difficulty: complexity,
11+
})
12+
return new WebSocket(
13+
`${matchingServiceWebSockUri(window.location.hostname)}/match/subscribe/${userId}?${params}`
14+
);
15+
};

matching-service/app/logic/matching.py

Lines changed: 27 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,62 @@
1+
from fastapi.responses import JSONResponse, Response
12
from typing import Union
23

34
from models.match import MatchModel, MessageModel
4-
from utils.redis import get_redis, acquire_lock, release_lock
5+
from utils.redis import acquire_lock, redis_client, release_lock
56
from utils.socketmanager import manager
67

78
async def find_match_else_enqueue(
89
user_id: str,
910
topic: str,
1011
difficulty: str
11-
) -> Union[MessageModel, MatchModel]:
12-
redis_client = await get_redis()
12+
) -> Union[Response, JSONResponse]:
1313
queue_key = _build_queue_key(topic, difficulty)
14-
15-
result = None
16-
17-
# ACQUIRE LOCK
1814
islocked = await acquire_lock(redis_client, queue_key)
1915

2016
if not islocked:
2117
raise Exception("Could not acquire lock")
2218

2319
# Check if the user is already in the queue
24-
user_in_queue = await redis_client.lrange(queue_key, 0, -1)
25-
if user_id in user_in_queue:
26-
result = MessageModel(
27-
message=f"User {user_id} is already in the queue, waiting for a match"
28-
)
29-
else:
30-
queue_length = await redis_client.llen(queue_key)
31-
if queue_length > 0:
32-
matched_user = await redis_client.rpop(queue_key)
33-
result = MatchModel(
34-
user1=user_id,
35-
user2=matched_user,
36-
topic=topic,
37-
difficulty=difficulty,
38-
)
39-
await manager.broadcast(matched_user, topic, difficulty, result.json())
40-
# await manager.disconnect(matched_user, topic, difficulty)
41-
else:
42-
await redis_client.lpush(queue_key, user_id)
43-
result = MessageModel(
44-
message=f"User {user_id} enqueued, waiting for a match"
45-
)
20+
if user_id in await redis_client.lrange(queue_key, 0, -1):
21+
await release_lock(redis_client, queue_key)
22+
return Response(status_code=304)
4623

47-
# RELEASE LOCK
24+
# Check if there are no other users in the queue
25+
if await redis_client.llen(queue_key) == 0:
26+
await redis_client.lpush(queue_key, user_id)
27+
await release_lock(redis_client, queue_key)
28+
return Response(status_code=202)
29+
30+
# There is a user in the queue
31+
matched_user = await redis_client.rpop(queue_key)
4832
await release_lock(redis_client, queue_key)
49-
return result
33+
response = MatchModel(
34+
user1=matched_user,
35+
user2=user_id,
36+
topic=topic,
37+
difficulty=difficulty,
38+
)
39+
await manager.broadcast(matched_user, topic, difficulty, response.json())
40+
await manager.disconnect_all(matched_user, topic, difficulty)
41+
return JSONResponse(status_code=201, content=response.json())
5042

5143
async def remove_user_from_queue(
5244
user_id: str,
5345
topic: str,
5446
difficulty: str
55-
) -> MessageModel:
56-
redis_client = await get_redis()
47+
) -> Response:
5748
queue_key = _build_queue_key(topic, difficulty)
58-
59-
# ACQUIRE LOCK
6049
islocked = await acquire_lock(redis_client, queue_key)
6150

6251
if not islocked:
6352
raise Exception("Could not acquire lock")
6453

65-
# Check if the user is already in the queue
66-
user_in_queue = await redis_client.lrange(queue_key, 0, -1)
67-
if user_id in user_in_queue:
54+
if user_id in await redis_client.lrange(queue_key, 0, -1):
6855
await redis_client.lrem(queue_key, 0, user_id)
69-
result = MessageModel(message=f"User {user_id} removed from the queue")
70-
else:
71-
result = MessageModel(message=f"User {user_id} is not in the queue")
7256

73-
# RELEASE LOCK
7457
await release_lock(redis_client, queue_key)
75-
return result
58+
await manager.disconnect_all(user_id, topic, difficulty)
59+
return Response(status_code=200)
7660

7761
'''
7862
Helper functions for matching.

matching-service/app/routers/match.py

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import asyncio
22
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
3-
from fastapi.responses import JSONResponse
43
from typing import Union
54

65
from logic.matching import find_match_else_enqueue, remove_user_from_queue
@@ -16,11 +15,7 @@ async def enqueue_user(
1615
topic: str,
1716
difficulty: str
1817
):
19-
response = await find_match_else_enqueue(user_id, topic, difficulty)
20-
if isinstance(response, MatchModel):
21-
return JSONResponse(status_code=200, content=response.json())
22-
elif isinstance(response, MessageModel):
23-
return JSONResponse(status_code=202, content=response.json())
18+
return await find_match_else_enqueue(user_id, topic, difficulty)
2419

2520
# Remove a user from the queue
2621
@router.delete("/queue/{user_id}")
@@ -29,12 +24,9 @@ async def dequeue_user(
2924
topic: str,
3025
difficulty: str
3126
):
32-
return JSONResponse(
33-
status_code=200,
34-
content=await remove_user_from_queue(user_id, topic, difficulty)
35-
)
27+
return await remove_user_from_queue(user_id, topic, difficulty)
3628

37-
@router.websocket("/subscribe/{user_id}/{topic}/{difficulty}")
29+
@router.websocket("/subscribe/{user_id}")
3830
async def subscribe(
3931
websocket: WebSocket,
4032
user_id: str,
@@ -47,4 +39,4 @@ async def subscribe(
4739
while True:
4840
await websocket.receive_text()
4941
except WebSocketDisconnect:
50-
pass
42+
manager.disconnect(user_id, topic, difficulty, websocket)

matching-service/app/utils/redis.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,6 @@
22
import os
33
import redis.asyncio as redis
44

5-
# Initialize Redis client
6-
async def get_redis():
7-
redis_host = os.getenv("REDIS_HOST", "localhost")
8-
redis_port = os.getenv("REDIS_PORT", "6379")
9-
return redis.Redis(host=redis_host, port=int(redis_port), db=0, decode_responses=True)
10-
11-
125
async def acquire_lock(redis_client, key, lock_timeout_ms=30000, retry_interval_ms=100, max_retries=100) -> bool:
136
lock_key = f"{key}:lock"
147
retries = 0
@@ -27,3 +20,10 @@ async def acquire_lock(redis_client, key, lock_timeout_ms=30000, retry_interval_
2720
async def release_lock(redis_client, key) -> None:
2821
lock_key = f"{key}:lock"
2922
await redis_client.delete(lock_key)
23+
24+
redis_client = redis.Redis(
25+
host=os.environ.get("REDIS_HOST", "localhost"),
26+
port=int(os.environ.get("REDIS_PORT", "6379")),
27+
db=0,
28+
decode_responses=True
29+
)

matching-service/app/utils/socketmanager.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,40 @@ async def connect(
3737
'''
3838
Disconnects all connections associated with (user_id, topic, complexity)
3939
'''
40-
async def disconnect(
40+
async def disconnect_all(
4141
self,
4242
user_id: str,
4343
topic: str,
4444
complexity: str,
45-
websocket: WebSocket,
4645
) -> None:
4746
key: Tuple[str, str, str] = (user_id, topic, complexity)
48-
if not key in self.connection_map:
47+
if key not in self.connection_map:
4948
return
5049

5150
await asyncio.gather(
52-
*[websocket for websocket in self.connection_map[key]]
51+
*[websocket.close() for websocket in self.connection_map[key]]
5352
)
5453
del self.connection_map[key]
5554

55+
'''
56+
Disconnects the single connection.
57+
'''
58+
async def disconnect(
59+
self,
60+
user_id: str,
61+
topic: str,
62+
complexity: str,
63+
websocket: WebSocket,
64+
):
65+
key: Tuple[str, str, str] = (user_id, topic, complexity)
66+
if key not in self.connection_map:
67+
return
68+
69+
self.connection_map[key].remove(websocket)
70+
if len(self.connection_map[key]) == 0:
71+
del self.connections_map[key]
72+
websocket.close()
73+
5674
'''
5775
Data is sent to through all connections associated with
5876
(user_id, topic, complexity)
@@ -65,7 +83,7 @@ async def broadcast(
6583
data: str,
6684
):
6785
key: Tuple[str, str, str] = (user_id, topic, complexity)
68-
if not key in self.connection_map:
86+
if key not in self.connection_map:
6987
return
7088

7189
await asyncio.gather(

0 commit comments

Comments
 (0)