Skip to content

Commit 1f07dc6

Browse files
committed
feat: use redis locks
1 parent 444eb80 commit 1f07dc6

File tree

6 files changed

+100
-67
lines changed

6 files changed

+100
-67
lines changed

apps/matching-service/databases/redis.go

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,67 @@ package databases
22

33
import (
44
"context"
5+
"fmt"
56
"log"
67
"os"
8+
"time"
79

10+
"github.com/bsm/redislock"
811
"github.com/redis/go-redis/v9"
912
)
1013

1114
const MatchmakingQueueRedisKey = "matchmaking_queue"
15+
const matchmakingRedisLock = "matchmaking_lock"
1216

1317
var redisClient *redis.Client
18+
var redisLock *redislock.Client
1419

1520
// SetupRedisClient sets-up the Redis client, and assigns it to a global variable
16-
func SetupRedisClient() {
21+
func SetupRedisClient() *redis.Client {
1722
// Retrieve redis url env variable and setup the redis client
1823
redisAddr := os.Getenv("REDIS_URL")
19-
client := redis.NewClient(&redis.Options{
24+
redisClient = redis.NewClient(&redis.Options{
2025
Addr: redisAddr,
2126
Password: "", // no password set
2227
DB: 0, // use default DB
2328
})
2429

2530
// Ping the redis server
26-
_, err := client.Ping(context.Background()).Result()
31+
_, err := redisClient.Ping(context.Background()).Result()
2732
if err != nil {
2833
log.Fatalf("Could not connect to Redis: %v", err)
2934
} else {
3035
log.Println("Connected to Redis at the following address: " + redisAddr)
3136
}
3237

33-
redisClient = client
38+
// Create a new lock client.
39+
redisLock = redislock.New(redisClient)
40+
41+
return redisClient
3442
}
3543

36-
// Get redisclient
3744
func GetRedisClient() *redis.Client {
3845
return redisClient
3946
}
47+
48+
func GetRedisLock() *redislock.Client {
49+
return redisLock
50+
}
51+
52+
func ObtainRedisLock(ctx context.Context) (*redislock.Lock, error) {
53+
// Retry every 100ms, for up-to 10x
54+
backoff := redislock.LimitRetry(redislock.LinearBackoff(100*time.Millisecond), 10)
55+
56+
// Obtain lock with retry
57+
lock, err := redisLock.Obtain(ctx, matchmakingRedisLock, time.Second, &redislock.Options{
58+
RetryStrategy: backoff,
59+
})
60+
if err == redislock.ErrNotObtained {
61+
fmt.Println("Could not obtain lock!")
62+
return nil, err
63+
} else if err != nil {
64+
log.Fatalln(err)
65+
return nil, err
66+
}
67+
return lock, err
68+
}

apps/matching-service/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module matching-service
33
go 1.23.1
44

55
require (
6+
github.com/bsm/redislock v0.9.4
67
github.com/gorilla/websocket v1.5.3
78
github.com/joho/godotenv v1.5.1
89
github.com/redis/go-redis/v9 v9.6.2

apps/matching-service/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
22
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
33
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
44
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
5+
github.com/bsm/redislock v0.9.4 h1:X/Wse1DPpiQgHbVYRE9zv6m070UcKoOGekgvpNhiSvw=
6+
github.com/bsm/redislock v0.9.4/go.mod h1:Epf7AJLiSFwLCiZcfi6pWFO/8eAYrYpQXFxEDPoDeAk=
57
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
68
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
79
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=

apps/matching-service/handlers/websocket.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,18 @@ func cleanUpUser(username string) {
9696
redisClient := databases.GetRedisClient()
9797
ctx := context.Background()
9898

99-
err := redisClient.Watch(ctx, func(tx *redis.Tx) error {
99+
// Obtain lock with retry
100+
lock, err := databases.ObtainRedisLock(ctx)
101+
if err != nil {
102+
return
103+
}
104+
defer lock.Release(ctx)
105+
106+
if err := redisClient.Watch(ctx, func(tx *redis.Tx) error {
100107
// Cleanup Redis
101108
databases.CleanUpUser(tx, username, ctx)
102109
return nil
103-
})
104-
if err != nil {
110+
}); err != nil {
105111
return
106112
}
107113
}

apps/matching-service/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import (
1313

1414
func main() {
1515
setUpEnvironment()
16-
databases.SetupRedisClient()
16+
client := databases.SetupRedisClient()
17+
defer client.Close()
1718
setupRoutes()
1819
startServer()
1920
}

apps/matching-service/processes/performmatches.go

Lines changed: 52 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -18,78 +18,72 @@ import (
1818
// until a match is found or no match and the user is enqueued to the queue.
1919
func PerformMatching(rdb *redis.Client, matchRequest models.MatchRequest, ctx context.Context, errorChan chan error) {
2020
currentUsername := matchRequest.Username
21-
keys := []string{databases.MatchmakingQueueRedisKey, currentUsername}
22-
for _, topic := range matchRequest.Topics {
23-
keys = append(keys, topic)
21+
22+
// Obtain lock with retry
23+
lock, err := databases.ObtainRedisLock(ctx)
24+
if err != nil {
25+
return
2426
}
25-
for true {
27+
defer lock.Release(ctx)
2628

27-
err := rdb.Watch(ctx, func(tx *redis.Tx) error {
28-
queuedUsernames, err := databases.GetAllQueuedUsers(tx, ctx)
29-
if err != nil {
30-
return err
31-
}
29+
if err := rdb.Watch(ctx, func(tx *redis.Tx) error {
30+
queuedUsernames, err := databases.GetAllQueuedUsers(tx, ctx)
31+
if err != nil {
32+
return err
33+
}
3234

33-
// Check that user is not part of the existing queue
34-
for _, username := range queuedUsernames {
35-
if username == currentUsername {
36-
return models.ExistingUserError
37-
}
35+
// Check that user is not part of the existing queue
36+
for _, username := range queuedUsernames {
37+
if username == currentUsername {
38+
return models.ExistingUserError
3839
}
40+
}
3941

40-
databases.AddUser(tx, matchRequest, ctx)
42+
databases.AddUser(tx, matchRequest, ctx)
4143

42-
// Log queue before and after matchmaking
43-
databases.PrintMatchingQueue(tx, "Before Matchmaking", ctx)
44-
defer databases.PrintMatchingQueue(tx, "After Matchmaking", ctx)
44+
// Log queue before and after matchmaking
45+
databases.PrintMatchingQueue(tx, "Before Matchmaking", ctx)
46+
defer databases.PrintMatchingQueue(tx, "After Matchmaking", ctx)
47+
// Find a matching user if any
48+
matchFound, err := databases.FindMatchingUser(tx, currentUsername, ctx)
49+
if err != nil {
50+
log.Println("Error finding matching user:", err)
51+
return err
52+
}
4553

46-
// Find a matching user if any
47-
matchFound, err := databases.FindMatchingUser(tx, currentUsername, ctx)
54+
if matchFound != nil {
55+
matchedUsername := matchFound.MatchedUser
56+
matchedTopic := matchFound.Topic
57+
matchedDifficulty := matchFound.Difficulty
58+
59+
// Generate a random match ID
60+
matchId, err := utils.GenerateMatchID()
4861
if err != nil {
49-
log.Println("Error finding matching user:", err)
50-
return err
62+
log.Println("Unable to randomly generate matchID")
5163
}
5264

53-
if matchFound != nil {
54-
matchedUsername := matchFound.MatchedUser
55-
matchedTopic := matchFound.Topic
56-
matchedDifficulty := matchFound.Difficulty
57-
58-
// Generate a random match ID
59-
matchId, err := utils.GenerateMatchID()
60-
if err != nil {
61-
log.Println("Unable to randomly generate matchID")
62-
}
65+
// Log down which users got matched
66+
matchFound.MatchID = matchId
67+
log.Printf("Users %s and %s matched on the topic: %s with difficulty: %s", currentUsername, matchedUsername, matchedTopic, matchedDifficulty)
6368

64-
// Log down which users got matched
65-
matchFound.MatchID = matchId
66-
log.Printf("Users %s and %s matched on the topic: %s with difficulty: %s", currentUsername, matchedUsername, matchedTopic, matchedDifficulty)
69+
// Clean up redis for this match
70+
databases.CleanUpUser(tx, currentUsername, ctx)
71+
databases.CleanUpUser(tx, matchedUsername, ctx)
6772

68-
// Clean up redis for this match
69-
databases.CleanUpUser(tx, currentUsername, ctx)
70-
databases.CleanUpUser(tx, matchedUsername, ctx)
71-
72-
publishMatch(tx, ctx, currentUsername, matchedUsername, matchFound)
73-
publishMatch(tx, ctx, matchedUsername, currentUsername, matchFound)
74-
}
73+
publishMatch(tx, ctx, currentUsername, matchedUsername, matchFound)
74+
publishMatch(tx, ctx, matchedUsername, currentUsername, matchFound)
75+
}
7576

76-
time.Sleep(time.Duration(time.Second * 1))
77+
time.Sleep(time.Duration(time.Second * 1))
7778

78-
return nil
79-
}, keys...)
80-
if err != nil {
81-
// return
82-
if errors.Is(err, models.ExistingUserError) {
83-
errorChan <- err
84-
break
85-
} else {
86-
// transaction failed, so will retry
87-
println(fmt.Errorf("transaction execution failed: %v", err))
88-
}
89-
}
90-
if err == nil {
91-
// transaction succeeded
92-
break
79+
return nil
80+
}); err != nil {
81+
// return
82+
if errors.Is(err, models.ExistingUserError) {
83+
errorChan <- err
84+
} else {
85+
// transaction failed, no retry
86+
println(fmt.Errorf("transaction execution failed: %v", err))
9387
}
9488
}
9589
}

0 commit comments

Comments
 (0)