Skip to content

Commit bb50c9b

Browse files
authored
Merge pull request #49 from CS3219-AY2425S1/titus/matching-service-improvements
feat: use redis pubsub, transactions, locks
2 parents 74baa54 + 512ad31 commit bb50c9b

File tree

18 files changed

+574
-535
lines changed

18 files changed

+574
-535
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package databases
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"os"
8+
"time"
9+
10+
"github.com/bsm/redislock"
11+
"github.com/redis/go-redis/v9"
12+
)
13+
14+
const MatchmakingQueueRedisKey = "matchmaking_queue"
15+
const matchmakingRedisLock = "matchmaking_lock"
16+
17+
var redisClient *redis.Client
18+
var redisLock *redislock.Client
19+
20+
// SetupRedisClient sets-up the Redis client, and assigns it to a global variable
21+
func SetupRedisClient() *redis.Client {
22+
// Retrieve redis url env variable and setup the redis client
23+
redisAddr := os.Getenv("REDIS_URL")
24+
redisClient = redis.NewClient(&redis.Options{
25+
Addr: redisAddr,
26+
Password: "", // no password set
27+
DB: 0, // use default DB
28+
})
29+
30+
// Ping the redis server
31+
_, err := redisClient.Ping(context.Background()).Result()
32+
if err != nil {
33+
log.Fatalf("Could not connect to Redis: %v", err)
34+
} else {
35+
log.Println("Connected to Redis at the following address: " + redisAddr)
36+
}
37+
38+
// Create a new lock client.
39+
redisLock = redislock.New(redisClient)
40+
41+
return redisClient
42+
}
43+
44+
func GetRedisClient() *redis.Client {
45+
return redisClient
46+
}
47+
48+
func GetRedisLock() *redislock.Client {
49+
return redisLock
50+
}
51+
52+
func ObtainRedisLock(ctx context.Context) (*redislock.Lock, error) {
53+
// Retry every 100ms, for up-to 100x
54+
backoff := redislock.LimitRetry(redislock.LinearBackoff(100*time.Millisecond), 100)
55+
56+
// Obtain lock with retry
57+
lock, err := redisLock.Obtain(ctx, matchmakingRedisLock, time.Second, &redislock.Options{
58+
RetryStrategy: backoff,
59+
})
60+
if err == redislock.ErrNotObtained {
61+
fmt.Println("Could not obtain lock!")
62+
return nil, err
63+
} else if err != nil {
64+
log.Fatalln(err)
65+
return nil, err
66+
}
67+
return lock, err
68+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package databases
2+
3+
import (
4+
"context"
5+
"log"
6+
"matching-service/models"
7+
"strings"
8+
9+
"github.com/redis/go-redis/v9"
10+
)
11+
12+
// Add user into each specified topic set based on the topics selected by users
13+
func AddUserToTopicSets(tx *redis.Tx, request models.MatchRequest, ctx context.Context) {
14+
for _, topic := range request.Topics {
15+
err := tx.SAdd(ctx, strings.ToLower(topic), request.Username).Err()
16+
if err != nil {
17+
log.Println("Error adding user to topic set:", err)
18+
}
19+
}
20+
}
21+
22+
// Remove user from each specified topic set based on the topics selected by users
23+
func RemoveUserFromTopicSets(tx *redis.Tx, username string, ctx context.Context) {
24+
request, err := GetUserDetails(tx, username, ctx)
25+
if err != nil {
26+
log.Println("Error retrieving user from hashset:", err)
27+
return
28+
}
29+
30+
for _, topic := range request.Topics {
31+
err := tx.SRem(ctx, strings.ToLower(topic), request.Username).Err()
32+
if err != nil {
33+
log.Println("Error removing user from topic set:", err)
34+
}
35+
}
36+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package databases
2+
3+
import (
4+
"context"
5+
"matching-service/models"
6+
7+
"github.com/redis/go-redis/v9"
8+
)
9+
10+
// Clean up queue, sets and hashset in Redis
11+
func CleanUpUser(tx *redis.Tx, username string, ctx context.Context) {
12+
DequeueUser(tx, username, ctx)
13+
RemoveUserFromTopicSets(tx, username, ctx)
14+
RemoveUserDetails(tx, username, ctx)
15+
}
16+
17+
// Adds the user to the queue, sets and hashsets in Redis
18+
func AddUser(tx *redis.Tx, matchRequest models.MatchRequest, ctx context.Context) {
19+
EnqueueUser(tx, matchRequest.Username, ctx)
20+
AddUserToTopicSets(tx, matchRequest, ctx)
21+
StoreUserDetails(tx, matchRequest, ctx)
22+
}
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package databases
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"log"
8+
"matching-service/models"
9+
"strings"
10+
11+
"github.com/redis/go-redis/v9"
12+
)
13+
14+
// Print existing users in the matching queue
15+
func PrintMatchingQueue(tx *redis.Tx, status string, ctx context.Context) {
16+
users, err := GetAllQueuedUsers(tx, ctx)
17+
if err != nil {
18+
return
19+
}
20+
21+
var concatenatedUsers strings.Builder
22+
for i, user := range users {
23+
concatenatedUsers.WriteString(user)
24+
if i != len(users)-1 {
25+
concatenatedUsers.WriteString(", ")
26+
}
27+
}
28+
29+
log.Println("Redis Queue (" + status + "): " + concatenatedUsers.String())
30+
}
31+
32+
func IsQueueEmpty(tx *redis.Tx, ctx context.Context) (bool, error) {
33+
queueLength, err := tx.LLen(ctx, MatchmakingQueueRedisKey).Result()
34+
if err != nil {
35+
log.Println("Error checking queue length:", err)
36+
return false, err
37+
}
38+
// No users in the queue, so no need to perform matching
39+
return queueLength == 0, nil
40+
}
41+
42+
// Enqueue a user into the matchmaking queue
43+
func EnqueueUser(tx *redis.Tx, username string, ctx context.Context) {
44+
err := tx.LPush(ctx, MatchmakingQueueRedisKey, username).Err()
45+
if err != nil {
46+
log.Println("Error enqueuing user:", err)
47+
}
48+
}
49+
50+
// Remove user from the matchmaking queue
51+
func DequeueUser(tx *redis.Tx, username string, ctx context.Context) {
52+
err := tx.LRem(ctx, MatchmakingQueueRedisKey, 1, username).Err()
53+
if err != nil {
54+
log.Println("Error dequeuing user:", err)
55+
return
56+
}
57+
}
58+
59+
// Returns the first user's username from the queue.
60+
func GetFirstUser(tx *redis.Tx, ctx context.Context) (string, error) {
61+
// Peek at the user queue
62+
username, err := tx.LIndex(ctx, MatchmakingQueueRedisKey, 0).Result()
63+
if err != nil {
64+
log.Println("Error peeking user from queue:", err)
65+
return "", err
66+
}
67+
return username, nil
68+
}
69+
70+
func GetAllQueuedUsers(tx *redis.Tx, ctx context.Context) ([]string, error) {
71+
users, err := tx.LRange(ctx, MatchmakingQueueRedisKey, 0, -1).Result()
72+
if err != nil {
73+
log.Println("Error retrieving users from queue:", err)
74+
return nil, err
75+
}
76+
return users, nil
77+
}
78+
79+
// Add user details into hashset in Redis
80+
func StoreUserDetails(tx *redis.Tx, request models.MatchRequest, ctx context.Context) {
81+
topicsJSON, err := json.Marshal(request.Topics)
82+
if err != nil {
83+
log.Println("Error marshalling topics:", err)
84+
return
85+
}
86+
87+
difficultiesJSON, err := json.Marshal(request.Difficulties)
88+
if err != nil {
89+
log.Println("Error marshalling difficulties:", err)
90+
return
91+
}
92+
93+
err = tx.HSet(ctx, request.Username, map[string]interface{}{
94+
"topics": topicsJSON,
95+
"difficulty": difficultiesJSON,
96+
"username": request.Username,
97+
}).Err()
98+
if err != nil {
99+
log.Println("Error storing user details:", err)
100+
}
101+
}
102+
103+
// Retrieve user details from hashset in Redis
104+
func GetUserDetails(tx *redis.Tx, username string, ctx context.Context) (models.MatchRequest, error) {
105+
userDetails, err := tx.HGetAll(ctx, username).Result()
106+
if err != nil {
107+
return models.MatchRequest{}, err
108+
}
109+
110+
if len(userDetails) == 0 {
111+
return models.MatchRequest{}, fmt.Errorf("user not found in hashset: %s", username)
112+
}
113+
114+
topicsJSON, topicsExist := userDetails["topics"]
115+
difficultiesJSON, difficultiesExist := userDetails["difficulty"]
116+
117+
if !topicsExist || !difficultiesExist {
118+
return models.MatchRequest{}, fmt.Errorf("incomplete user details for: %s", username)
119+
}
120+
121+
var topics []string
122+
err = json.Unmarshal([]byte(topicsJSON), &topics)
123+
if err != nil {
124+
return models.MatchRequest{}, fmt.Errorf("error unmarshalling topics: %v", err)
125+
}
126+
127+
var difficulties []string
128+
err = json.Unmarshal([]byte(difficultiesJSON), &difficulties)
129+
if err != nil {
130+
return models.MatchRequest{}, fmt.Errorf("error unmarshalling difficulties: %v", err)
131+
}
132+
133+
matchRequest := models.MatchRequest{
134+
Topics: topics,
135+
Difficulties: difficulties,
136+
Username: username,
137+
}
138+
139+
return matchRequest, nil
140+
}
141+
142+
// Remove user details from HashSet
143+
func RemoveUserDetails(tx *redis.Tx, username string, ctx context.Context) {
144+
err := tx.Del(ctx, username).Err()
145+
if err != nil {
146+
log.Println("Error removing user details:", err)
147+
}
148+
}
149+
150+
// Find the first matching user based on topics
151+
// TODO: match based on available questions
152+
func FindMatchingUser(tx *redis.Tx, username string, ctx context.Context) (*models.MatchFound, error) {
153+
user, err := GetUserDetails(tx, username, ctx)
154+
if err != nil {
155+
return nil, err
156+
}
157+
158+
for _, topic := range user.Topics {
159+
users, err := tx.SMembers(ctx, strings.ToLower(topic)).Result()
160+
if err != nil {
161+
return nil, err
162+
}
163+
164+
for _, potentialMatch := range users {
165+
if potentialMatch == username {
166+
continue
167+
}
168+
169+
matchedUser, err := GetUserDetails(tx, potentialMatch, ctx)
170+
if err != nil {
171+
return nil, err
172+
}
173+
174+
commonDifficulty := models.GetCommonDifficulty(user.Difficulties, matchedUser.Difficulties)
175+
176+
matchFound := models.MatchFound{
177+
Type: "match_found",
178+
MatchedUser: potentialMatch,
179+
Topic: topic,
180+
Difficulty: commonDifficulty,
181+
}
182+
183+
return &matchFound, nil
184+
}
185+
}
186+
187+
return nil, nil
188+
}
189+
190+
func PopAndInsertUser(tx *redis.Tx, username string, ctx context.Context) {
191+
// Pop user
192+
username, err := tx.LPop(ctx, MatchmakingQueueRedisKey).Result()
193+
if err != nil {
194+
log.Println("Error popping user from queue:", err)
195+
}
196+
197+
// Insert back in queue
198+
err = tx.LPush(ctx, MatchmakingQueueRedisKey, username).Err()
199+
if err != nil {
200+
log.Println("Error enqueuing user:", err)
201+
}
202+
}

apps/matching-service/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module matching-service
33
go 1.23.1
44

55
require (
6+
github.com/bsm/redislock v0.9.4
67
github.com/gorilla/websocket v1.5.3
78
github.com/joho/godotenv v1.5.1
89
github.com/redis/go-redis/v9 v9.6.2

apps/matching-service/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
22
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
33
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
44
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
5+
github.com/bsm/redislock v0.9.4 h1:X/Wse1DPpiQgHbVYRE9zv6m070UcKoOGekgvpNhiSvw=
6+
github.com/bsm/redislock v0.9.4/go.mod h1:Epf7AJLiSFwLCiZcfi6pWFO/8eAYrYpQXFxEDPoDeAk=
57
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
68
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
79
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package handlers
2+
3+
import (
4+
"log"
5+
"matching-service/models"
6+
7+
"github.com/gorilla/websocket"
8+
)
9+
10+
// sendTimeoutResponse sends a timeout message to the WebSocket client.
11+
func sendTimeoutResponse(ws *websocket.Conn) {
12+
result := models.Timeout{
13+
Type: "timeout",
14+
Message: "No match found. Please try again later.",
15+
}
16+
if err := ws.WriteJSON(result); err != nil {
17+
log.Printf("write error: %v", err)
18+
}
19+
}
20+
21+
func sendRejectionResponse(ws *websocket.Conn) {
22+
if err := ws.WriteJSON(models.MatchRejected{
23+
Type: "match_rejected",
24+
Message: "You are already in a matchmaking queue. Please disconnect before reconnecting.",
25+
}); err != nil {
26+
log.Printf("write error: %v", err)
27+
}
28+
}
29+
30+
// Send message to matched user
31+
func sendMatchFoundResponse(ws *websocket.Conn, username string, result models.MatchFound) {
32+
if err := ws.WriteJSON(result); err != nil {
33+
log.Printf("Error sending message to user %s: %v\n", username, err)
34+
}
35+
}

0 commit comments

Comments
 (0)