Skip to content

Commit 8c373f2

Browse files
committed
feat: add keys to watch
1 parent 9834600 commit 8c373f2

File tree

4 files changed

+70
-53
lines changed

4 files changed

+70
-53
lines changed

apps/matching-service/databases/redis.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/redis/go-redis/v9"
99
)
1010

11-
const matchmakingQueueRedisKey = "matchmaking_queue"
11+
const MatchmakingQueueRedisKey = "matchmaking_queue"
1212

1313
var redisClient *redis.Client
1414

apps/matching-service/databases/userqueue.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func PrintMatchingQueue(tx *redis.Tx, status string, ctx context.Context) {
3030
}
3131

3232
func IsQueueEmpty(tx *redis.Tx, ctx context.Context) (bool, error) {
33-
queueLength, err := tx.LLen(ctx, matchmakingQueueRedisKey).Result()
33+
queueLength, err := tx.LLen(ctx, MatchmakingQueueRedisKey).Result()
3434
if err != nil {
3535
log.Println("Error checking queue length:", err)
3636
return false, err
@@ -41,15 +41,15 @@ func IsQueueEmpty(tx *redis.Tx, ctx context.Context) (bool, error) {
4141

4242
// Enqueue a user into the matchmaking queue
4343
func EnqueueUser(tx *redis.Tx, username string, ctx context.Context) {
44-
err := tx.LPush(ctx, matchmakingQueueRedisKey, username).Err()
44+
err := tx.LPush(ctx, MatchmakingQueueRedisKey, username).Err()
4545
if err != nil {
4646
log.Println("Error enqueuing user:", err)
4747
}
4848
}
4949

5050
// Remove user from the matchmaking queue
5151
func DequeueUser(tx *redis.Tx, username string, ctx context.Context) {
52-
err := tx.LRem(ctx, matchmakingQueueRedisKey, 1, username).Err()
52+
err := tx.LRem(ctx, MatchmakingQueueRedisKey, 1, username).Err()
5353
if err != nil {
5454
log.Println("Error dequeuing user:", err)
5555
return
@@ -59,7 +59,7 @@ func DequeueUser(tx *redis.Tx, username string, ctx context.Context) {
5959
// Returns the first user's username from the queue.
6060
func GetFirstUser(tx *redis.Tx, ctx context.Context) (string, error) {
6161
// Peek at the user queue
62-
username, err := tx.LIndex(ctx, matchmakingQueueRedisKey, 0).Result()
62+
username, err := tx.LIndex(ctx, MatchmakingQueueRedisKey, 0).Result()
6363
if err != nil {
6464
log.Println("Error peeking user from queue:", err)
6565
return "", err
@@ -68,7 +68,7 @@ func GetFirstUser(tx *redis.Tx, ctx context.Context) (string, error) {
6868
}
6969

7070
func GetAllQueuedUsers(tx *redis.Tx, ctx context.Context) ([]string, error) {
71-
users, err := tx.LRange(ctx, matchmakingQueueRedisKey, 0, -1).Result()
71+
users, err := tx.LRange(ctx, MatchmakingQueueRedisKey, 0, -1).Result()
7272
if err != nil {
7373
log.Println("Error retrieving users from queue:", err)
7474
return nil, err
@@ -189,13 +189,13 @@ func FindMatchingUser(tx *redis.Tx, username string, ctx context.Context) (*mode
189189

190190
func PopAndInsertUser(tx *redis.Tx, username string, ctx context.Context) {
191191
// Pop user
192-
username, err := tx.LPop(ctx, matchmakingQueueRedisKey).Result()
192+
username, err := tx.LPop(ctx, MatchmakingQueueRedisKey).Result()
193193
if err != nil {
194194
log.Println("Error popping user from queue:", err)
195195
}
196196

197197
// Insert back in queue
198-
err = tx.LPush(ctx, matchmakingQueueRedisKey, username).Err()
198+
err = tx.LPush(ctx, MatchmakingQueueRedisKey, username).Err()
199199
if err != nil {
200200
log.Println("Error enqueuing user:", err)
201201
}

apps/matching-service/processes/performmatches.go

Lines changed: 62 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -8,70 +8,87 @@ import (
88
"matching-service/databases"
99
"matching-service/models"
1010
"matching-service/utils"
11+
"time"
1112

1213
"github.com/redis/go-redis/v9"
1314
)
1415

1516
// Performs the matching algorithm at most once starting from the front of the queue of users,
1617
// until a match is found or no match and the user is enqueued to the queue.
1718
func PerformMatching(rdb *redis.Client, matchRequest models.MatchRequest, ctx context.Context, errorChan chan error) {
18-
err := rdb.Watch(ctx, func(tx *redis.Tx) error {
19-
// Log queue before and after matchmaking
20-
databases.PrintMatchingQueue(tx, "Before Matchmaking", ctx)
21-
defer databases.PrintMatchingQueue(tx, "After Matchmaking", ctx)
19+
currentUsername := matchRequest.Username
20+
keys := []string{databases.MatchmakingQueueRedisKey, currentUsername}
21+
for _, topic := range matchRequest.Topics {
22+
keys = append(keys, topic)
23+
}
24+
for true {
2225

23-
currentUsername := matchRequest.Username
24-
queuedUsernames, err := databases.GetAllQueuedUsers(tx, ctx)
25-
if err != nil {
26-
return err
27-
}
26+
err := rdb.Watch(ctx, func(tx *redis.Tx) error {
27+
// Log queue before and after matchmaking
28+
databases.PrintMatchingQueue(tx, "Before Matchmaking", ctx)
29+
defer databases.PrintMatchingQueue(tx, "After Matchmaking", ctx)
2830

29-
// Check that user is not part of the existing queue
30-
for _, username := range queuedUsernames {
31-
if username == currentUsername {
32-
return models.ExistingUserError
31+
queuedUsernames, err := databases.GetAllQueuedUsers(tx, ctx)
32+
if err != nil {
33+
return err
3334
}
34-
}
3535

36-
databases.AddUser(tx, matchRequest, ctx)
37-
38-
// Find a matching user if any
39-
matchFound, err := databases.FindMatchingUser(tx, currentUsername, ctx)
40-
if err != nil {
41-
log.Println("Error finding matching user:", err)
42-
return err
43-
}
36+
// Check that user is not part of the existing queue
37+
for _, username := range queuedUsernames {
38+
if username == currentUsername {
39+
return models.ExistingUserError
40+
}
41+
}
4442

45-
if matchFound != nil {
46-
matchedUsername := matchFound.MatchedUser
47-
matchedTopic := matchFound.Topic
48-
matchedDifficulty := matchFound.Difficulty
43+
databases.AddUser(tx, matchRequest, ctx)
4944

50-
// Generate a random match ID
51-
matchId, err := utils.GenerateMatchID()
45+
// Find a matching user if any
46+
matchFound, err := databases.FindMatchingUser(tx, currentUsername, ctx)
5247
if err != nil {
53-
log.Println("Unable to randomly generate matchID")
48+
log.Println("Error finding matching user:", err)
49+
return err
5450
}
5551

56-
// Log down which users got matched
57-
matchFound.MatchID = matchId
58-
log.Printf("Users %s and %s matched on the topic: %s with difficulty: %s", currentUsername, matchedUsername, matchedTopic, matchedDifficulty)
52+
if matchFound != nil {
53+
matchedUsername := matchFound.MatchedUser
54+
matchedTopic := matchFound.Topic
55+
matchedDifficulty := matchFound.Difficulty
5956

60-
// Clean up redis for this match
61-
databases.CleanUpUser(tx, currentUsername, ctx)
62-
databases.CleanUpUser(tx, matchedUsername, ctx)
57+
// Generate a random match ID
58+
matchId, err := utils.GenerateMatchID()
59+
if err != nil {
60+
log.Println("Unable to randomly generate matchID")
61+
}
6362

64-
publishMatch(tx, ctx, currentUsername, matchedUsername, matchFound)
65-
publishMatch(tx, ctx, matchedUsername, currentUsername, matchFound)
66-
}
63+
// Log down which users got matched
64+
matchFound.MatchID = matchId
65+
log.Printf("Users %s and %s matched on the topic: %s with difficulty: %s", currentUsername, matchedUsername, matchedTopic, matchedDifficulty)
6766

68-
return nil
69-
})
70-
if err != nil {
71-
// Handle error (like retry logic could be added here)
72-
// return fmt.Errorf("transaction execution failed: %v", err)
73-
if errors.Is(err, models.ExistingUserError) {
74-
errorChan <- err
67+
// Clean up redis for this match
68+
databases.CleanUpUser(tx, currentUsername, ctx)
69+
databases.CleanUpUser(tx, matchedUsername, ctx)
70+
71+
publishMatch(tx, ctx, currentUsername, matchedUsername, matchFound)
72+
publishMatch(tx, ctx, matchedUsername, currentUsername, matchFound)
73+
}
74+
75+
time.Sleep(time.Duration(time.Second * 1))
76+
77+
return nil
78+
}, keys...)
79+
if err != nil {
80+
// transaction failed
81+
println(err)
82+
// Handle error (like retry logic could be added here)
83+
// return fmt.Errorf("transaction execution failed: %v", err)
84+
if errors.Is(err, models.ExistingUserError) {
85+
errorChan <- err
86+
break
87+
}
88+
}
89+
if err == nil {
90+
// transaction succeeded
91+
break
7592
}
7693
}
7794
}

0 commit comments

Comments
 (0)