Skip to content

Commit 1bede2f

Browse files
Feature/matching service/queue (#165)
* Set up basic queuing * Abstract logic * Update env sample * Unfug the race conditions * Made the lock a spin lock with multiple retries. Add test.py to test the correctness of the code. * Add delete user from queue feature * Externalise docker variables * Update env sample --------- Co-authored-by: Selwyn Ang <[email protected]>
1 parent dbbe5ac commit 1bede2f

File tree

12 files changed

+220
-7
lines changed

12 files changed

+220
-7
lines changed

.env.sample

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,12 @@ USER_SVC_DB_URI=
1818
JWT_SECRET=
1919
EMAIL_ADDRESS=
2020
EMAIL_PASSWORD=
21+
22+
## Matching service variables
23+
MATCHING_SVC_PORT=6969
24+
25+
## Redis variables
26+
REDIS_PORT=6379
27+
28+
## Redisinsight variables
29+
REDIS_INSIGHT_PORT=5540

docker-compose.yml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,23 @@ services:
4444
- $MATCHING_SVC_PORT:$MATCHING_SVC_PORT
4545
environment:
4646
- PORT=$MATCHING_SVC_PORT
47+
- REDIS_HOST=redis
48+
- REDIS_PORT=$REDIS_PORT
49+
depends_on:
50+
- redis
4751

4852
redis:
4953
image: redis:7.4-alpine
5054
restart: always
5155
ports:
52-
- 6379:6379
56+
- $REDIS_PORT:$REDIS_PORT
57+
58+
# access RedisInsight at http://localhost:5540
59+
# connect to redis on redis insight at redis:6379
60+
redisinsight:
61+
image: redis/redisinsight:latest
62+
restart: always
63+
ports:
64+
- $REDIS_INSIGHT_PORT:$REDIS_INSIGHT_PORT # Expose RedisInsight UI on port 5540
65+
depends_on:
66+
- redis

matching-service/.gitignore

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
.venv
2-
app/__pycache__
3-
app/routers/__pycache__
2+
**/__pycache__

matching-service/app/__init__.py

Whitespace-only changes.

matching-service/app/models/__init__.py

Whitespace-only changes.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from pydantic import BaseModel
2+
3+
# Define the MatchData model
4+
class MatchData(BaseModel):
5+
user1: str
6+
user2: str
7+
topic: str
8+
difficulty: str

matching-service/app/routers/__init__.py

Whitespace-only changes.
Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,38 @@
1+
from utils.redis_utils import get_redis, build_queue_key, listen_for_matches, find_match_else_enqueue, match_channel, remove_user_from_queue
2+
from models.match import MatchData
13
from fastapi import APIRouter
4+
import asyncio
25

36
router = APIRouter()
47

5-
# Make a dummy endpoint to test the connection
6-
@router.get("/")
7-
async def test_connection():
8-
return {"message": "Connection successful"}
8+
# List to store matched pairs
9+
matched_pairs = []
10+
11+
# Start the match listener in a background task
12+
13+
14+
@router.on_event("startup")
15+
async def startup_event():
16+
redis_client = await get_redis()
17+
asyncio.create_task(listen_for_matches(redis_client))
18+
19+
# Add a user to the queue
20+
21+
22+
@router.post("/queue/{user_id}")
23+
async def enqueue_user(user_id: str, topic: str, difficulty: str):
24+
# redis_client = await get_redis()
25+
return await find_match_else_enqueue(user_id, topic, difficulty)
26+
27+
# Get all matched pairs
28+
29+
30+
@router.get("/matches")
31+
async def get_matched_pairs():
32+
return {"matched_pairs": matched_pairs}
33+
34+
# Remove a user from the queue
35+
@router.delete("/queue/{user_id}")
36+
async def dequeue_user(user_id: str, topic: str, difficulty: str):
37+
print("removing user from queue")
38+
return await remove_user_from_queue(user_id, topic, difficulty)

matching-service/app/utils/__init__.py

Whitespace-only changes.
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import os
2+
import redis.asyncio as redis
3+
from models.match import MatchData
4+
import asyncio
5+
match_channel = 'match_channel'
6+
7+
# Initialize Redis client
8+
9+
10+
async def get_redis():
11+
redis_host = os.getenv("REDIS_HOST", "localhost")
12+
redis_port = os.getenv("REDIS_PORT", "6379")
13+
return redis.Redis(host=redis_host, port=int(redis_port), db=0, decode_responses=True)
14+
15+
16+
async def acquire_lock(redis_client, key, lock_timeout_ms=30000, retry_interval_ms=100, max_retries=100) -> bool:
17+
lock_key = f"{key}:lock"
18+
retries = 0
19+
20+
while retries < max_retries:
21+
locked = await redis_client.set(lock_key, "locked", nx=True, px=lock_timeout_ms)
22+
if locked:
23+
return True
24+
else:
25+
retries += 1
26+
# Convert ms to seconds
27+
await asyncio.sleep(retry_interval_ms / 1000)
28+
29+
return False
30+
31+
32+
async def release_lock(redis_client, key) -> None:
33+
lock_key = f"{key}:lock"
34+
await redis_client.delete(lock_key)
35+
36+
37+
# Helper function to build a unique queue key based on topic and difficulty
38+
def build_queue_key(topic, difficulty):
39+
return f"{topic}:{difficulty}"
40+
41+
# Asynchronous task to listen for matches
42+
43+
44+
async def listen_for_matches(redis_client):
45+
pubsub = redis_client.pubsub()
46+
await pubsub.subscribe(match_channel)
47+
async for message in pubsub.listen():
48+
if message["type"] == "message":
49+
print(f"Match notification: {message['data']}")
50+
51+
# Asynchronous matching logic
52+
53+
54+
async def find_match_else_enqueue(user_id, topic, difficulty):
55+
redis_client = await get_redis()
56+
queue_key = build_queue_key(topic, difficulty)
57+
58+
result = None
59+
60+
# ACQUIRE LOCK
61+
islocked = await acquire_lock(redis_client, queue_key)
62+
63+
if not islocked:
64+
raise Exception("Could not acquire lock")
65+
66+
# Check if the user is already in the queue
67+
user_in_queue = await redis_client.lrange(queue_key, 0, -1)
68+
if user_id in user_in_queue:
69+
result = {"message": f"User {
70+
user_id} is already in the queue, waiting for a match"}
71+
else:
72+
queue_length = await redis_client.llen(queue_key)
73+
if queue_length > 0:
74+
matched_user = await redis_client.rpop(queue_key)
75+
match_data = MatchData(
76+
user1=user_id, user2=matched_user, topic=topic, difficulty=difficulty)
77+
await redis_client.publish(match_channel, match_data.json())
78+
result = {"message": "Match found", "match": match_data}
79+
else:
80+
await redis_client.lpush(queue_key, user_id)
81+
result = {"message": f"User {
82+
user_id} enqueued, waiting for a match"}
83+
84+
# RELEASE LOCK
85+
await release_lock(redis_client, queue_key)
86+
return result
87+
88+
89+
async def remove_user_from_queue(user_id, topic, difficulty):
90+
redis_client = await get_redis()
91+
queue_key = build_queue_key(topic, difficulty)
92+
93+
# ACQUIRE LOCK
94+
islocked = await acquire_lock(redis_client, queue_key)
95+
96+
if not islocked:
97+
raise Exception("Could not acquire lock")
98+
99+
# Check if the user is already in the queue
100+
user_in_queue = await redis_client.lrange(queue_key, 0, -1)
101+
if user_id in user_in_queue:
102+
await redis_client.lrem(queue_key, 0, user_id)
103+
result = {"message": f"User {
104+
user_id} removed from the queue"}
105+
else:
106+
result = {"message": f"User {
107+
user_id} is not in the queue"}
108+
109+
# RELEASE LOCK
110+
await release_lock(redis_client, queue_key)
111+
return result

0 commit comments

Comments
 (0)