Skip to content

Commit 003d526

Browse files
committed
Add persistent store of rooms
Currently this does not handle proper reconnection and destruction yet, but the happy path will work. Reconnection currently ALWAYS works if you have the Room ID and are the correct users, only if you have previously connected to the room.
1 parent 1b02dd9 commit 003d526

File tree

5 files changed

+199
-25
lines changed

5 files changed

+199
-25
lines changed

collab/main.go

Lines changed: 92 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net/http"
99
"os"
1010
"sync"
11+
"strconv"
1112

1213
"github.com/gin-gonic/gin"
1314
"github.com/gorilla/websocket"
@@ -140,7 +141,11 @@ func (h *Hub) Run() {
140141
}
141142

142143
// ServeWs handles WebSocket requests
143-
func serveWs(hub *Hub, c *gin.Context, roomMappings *verify.RoomMappings) {
144+
func serveWs(
145+
hub *Hub, c *gin.Context,
146+
roomMappings *verify.RoomMappings,
147+
persistMappings *verify.PersistMappings,
148+
) {
144149
roomID := c.Query("roomID")
145150
if roomID == "" {
146151
http.Error(c.Writer, "roomID required", http.StatusBadRequest)
@@ -156,19 +161,40 @@ func serveWs(hub *Hub, c *gin.Context, roomMappings *verify.RoomMappings) {
156161
client := &Client{conn: conn, roomID: roomID}
157162
hub.register <- client
158163

159-
go handleMessages(client, hub, roomMappings)
164+
go handleMessages(client, hub, roomMappings, persistMappings)
160165
}
161166

162-
func authenticateClient(token string, client *Client, roomMappings *verify.RoomMappings) bool {
167+
func authenticateClient(
168+
token string, match string, client *Client,
169+
roomMappings *verify.RoomMappings,
170+
persistMappings *verify.PersistMappings,
171+
) bool {
163172
ok, userID := verifyToken(token)
164173
if !ok {
165-
log.Println("bruh")
174+
log.Println("bad token in request")
166175
return false
167176
}
168-
return verify.VerifyRoom(roomMappings, client.roomID, userID)
177+
return verify.VerifyRoomAndMoveToPersist(
178+
roomMappings, client.roomID, userID, match, persistMappings)
169179
}
170180

171-
func handleMessages(client *Client, hub *Hub, roomMappings *verify.RoomMappings) {
181+
func authenticateClientNoMatch(
182+
token string, client *Client,
183+
persistMappings *verify.PersistMappings,
184+
) bool {
185+
ok, userID := verifyToken(token)
186+
if !ok {
187+
log.Println("bad token in request")
188+
return false
189+
}
190+
return verify.VerifyPersist(persistMappings, client.roomID, userID)
191+
}
192+
193+
func handleMessages(
194+
client *Client, hub *Hub,
195+
roomMappings *verify.RoomMappings,
196+
persistMappings *verify.PersistMappings,
197+
) {
172198
defer func() {
173199
hub.unregister <- client
174200
}()
@@ -187,16 +213,41 @@ func handleMessages(client *Client, hub *Hub, roomMappings *verify.RoomMappings)
187213
}
188214
// Handle authentication message
189215
if msgData["type"] == "auth" {
190-
token, ok := msgData["token"].(string)
191-
if !ok || !authenticateClient(token, client, roomMappings) {
192-
log.Println("Authentication failed")
193-
client.conn.WriteMessage(websocket.TextMessage, []byte("Authentication failed"))
194-
client.conn.Close()
195-
break
196-
}
197-
client.authenticated = true
198-
log.Println("Client authenticated successfully")
199-
}
216+
token, tokenOk := msgData["token"].(string)
217+
if !tokenOk {
218+
log.Println("Authentication failed - no token attached")
219+
client.conn.WriteMessage(
220+
websocket.TextMessage,
221+
[]byte("Authentication failed - no JWT token"),
222+
)
223+
client.conn.Close()
224+
break
225+
}
226+
isSuccess := false
227+
match, matchOk := msgData["matchHash"].(string)
228+
if matchOk && !authenticateClient(token, match, client, roomMappings, persistMappings) {
229+
log.Println(
230+
"failed to find a matching room from match hash, proceeding with persistence check",
231+
)
232+
}
233+
// I will ping the persistent map even if I've found it in the original map
234+
if !authenticateClientNoMatch(token, client, persistMappings) {
235+
log.Println("failed to find a persistent room")
236+
isSuccess = false
237+
} else {
238+
isSuccess = true
239+
}
240+
if !isSuccess {
241+
client.conn.WriteMessage(
242+
websocket.TextMessage,
243+
[]byte("Authentication failed - failed to find a matching room"),
244+
)
245+
client.conn.Close()
246+
break
247+
}
248+
client.authenticated = true
249+
log.Println("Client authenticated successfully")
250+
}
200251

201252
if msgData["type"] == "close_session" {
202253
closeMessage := Message{
@@ -250,11 +301,34 @@ func main() {
250301
if REDIS_URI == "" {
251302
REDIS_URI = "localhost:9190"
252303
}
253-
roomMappings := verify.InitialiseRoomMappings(REDIS_URI, 1)
304+
305+
REDIS_ROOM_MAPPING := 1
306+
REDIS_ROOM_PERSIST := 2
307+
308+
if os.Getenv("REDIS_ROOM_MAPPING") != "" {
309+
num, err := strconv.Atoi(os.Getenv("REDIS_ROOM_MAPPING"))
310+
if err != nil {
311+
log.Fatal("DB no of room map is badly formatted" + err.Error())
312+
} else {
313+
REDIS_ROOM_MAPPING = num
314+
}
315+
}
316+
317+
if os.Getenv("REDIS_ROOM_PERSIST") != "" {
318+
num, err := strconv.Atoi(os.Getenv("REDIS_ROOM_PERSIST"))
319+
if err != nil {
320+
log.Fatal("DB no of room persistance store is badly formatted" + err.Error())
321+
} else {
322+
REDIS_ROOM_PERSIST = num
323+
}
324+
}
325+
326+
roomMappings := verify.InitialiseRoomMappings(REDIS_URI, REDIS_ROOM_MAPPING)
327+
persistMappings := verify.InitialisePersistMappings(REDIS_URI, REDIS_ROOM_PERSIST);
254328

255329
// WebSocket connection endpoint
256330
r.GET("/ws", func(c *gin.Context) {
257-
serveWs(hub, c, roomMappings)
331+
serveWs(hub, c, roomMappings, persistMappings)
258332
})
259333

260334
// Status endpoint

collab/verify/persistMappings.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package verify
2+
3+
import (
4+
"context"
5+
"log"
6+
7+
redis "github.com/go-redis/redis/v8"
8+
)
9+
10+
// same as room mappings, but separated for type safety
11+
type PersistMappings struct {
12+
Conn *redis.Client
13+
}
14+
15+
func InitialisePersistMappings(addr string, db_num int) *PersistMappings {
16+
conn := redis.NewClient(&redis.Options{
17+
Addr: addr,
18+
DB: db_num,
19+
})
20+
21+
return &PersistMappings{
22+
Conn: conn,
23+
}
24+
}
25+
26+
func VerifyPersist(persistMappings *PersistMappings, roomID string, userID string) bool {
27+
data, err := persistMappings.Conn.HGetAll(context.Background(), userID).Result()
28+
if err != nil {
29+
log.Printf("Error retrieving data for userID %s: %v", userID, err)
30+
return false
31+
}
32+
33+
return data["roomId"] == roomID
34+
}

collab/verify/roomMappings.go

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,44 @@ func InitialiseRoomMappings(addr string, db_num int) *RoomMappings {
2323
}
2424
}
2525

26-
func VerifyRoom(roomMappings *RoomMappings, roomID string, userID string) bool {
27-
data, err := roomMappings.Conn.HGetAll(context.Background(), userID).Result()
26+
func VerifyRoomAndMoveToPersist(
27+
roomMappings *RoomMappings,
28+
roomID string,
29+
userId string,
30+
matchHash string,
31+
persistMappings *PersistMappings,
32+
) bool {
33+
ctx := context.Background()
34+
data, err := roomMappings.Conn.HGetAll(ctx, matchHash).Result()
2835
if err != nil {
29-
log.Printf("Error retrieving data for userID %s: %v", userID, err)
36+
log.Printf("Error retrieving data for matchHash %s: %v", matchHash, err)
3037
return false
3138
}
3239

33-
return data["roomId"] == roomID
40+
if data["roomId"] != roomID || data["thisUser"] != userId {
41+
log.Printf("Mismatch in room data and user data")
42+
return false
43+
}
44+
45+
roomMappings.Conn.Del(ctx, matchHash);
46+
persistentRoom := map[string]interface{}{
47+
"roomId": roomID,
48+
"otherUser": data["otherUser"],
49+
"requestTime": data["requestTime"],
50+
51+
"title": data["title"],
52+
"titleSlug": data["titleSlug"],
53+
"difficulty": data["difficulty"],
54+
"topicTags": data["topicTags"],
55+
"content": data["content"],
56+
"schemas": data["schemas"],
57+
"id": data["id"],
58+
}
59+
60+
// this always overrides the persistent room
61+
if err := persistMappings.Conn.HSet(ctx, userId, persistentRoom).Err(); err != nil {
62+
log.Printf("error sending room to persistent storage: %s", err.Error())
63+
}
64+
65+
return true
3466
}

matching-service/main.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"log"
55
"os"
6+
"strconv"
67

78
"matching-service/consumer"
89
models "matching-service/models"
@@ -99,9 +100,30 @@ func main() {
99100
if REDIS_URI == "" {
100101
REDIS_URI = "localhost://9190"
101102
}
103+
104+
REDIS_CLIENT_MAPPING := 0
105+
REDIS_ROOM_MAPPING := 1
106+
107+
if os.Getenv("REDIS_CLIENT_MAPPING") != "" {
108+
num, err := strconv.Atoi(os.Getenv("REDIS_CLIENT_MAPPING"))
109+
if err != nil {
110+
log.Fatal("DB no of client map is badly formatted" + err.Error())
111+
} else {
112+
REDIS_CLIENT_MAPPING = num
113+
}
114+
}
115+
116+
if os.Getenv("REDIS_ROOM_MAPPING") != "" {
117+
num, err := strconv.Atoi(os.Getenv("REDIS_ROOM_MAPPING"))
118+
if err != nil {
119+
log.Fatal("DB no of room map is badly formatted" + err.Error())
120+
} else {
121+
REDIS_ROOM_MAPPING = num
122+
}
123+
}
102124

103-
clientMappings := storage.InitialiseClientMappings(REDIS_URI, 0)
104-
roomMappings := storage.InitialiseRoomMappings(REDIS_URI, 1)
125+
clientMappings := storage.InitialiseClientMappings(REDIS_URI, REDIS_CLIENT_MAPPING)
126+
roomMappings := storage.InitialiseRoomMappings(REDIS_URI, REDIS_ROOM_MAPPING)
105127

106128

107129
logger.Log.Info("Beginning consumption from message queue")

storage-blob-api/main.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"log"
55
"os"
6+
"strconv"
67

78
"github.com/gin-gonic/gin"
89
"github.com/joho/godotenv"
@@ -52,8 +53,19 @@ func main() {
5253
if REDIS_URI == "" {
5354
REDIS_URI = "localhost://9190"
5455
}
56+
57+
REDIS_ROOM_MAPPING := 1
58+
59+
if os.Getenv("REDIS_ROOM_MAPPING") != "" {
60+
num, err := strconv.Atoi(os.Getenv("REDIS_ROOM_MAPPING"))
61+
if err != nil {
62+
log.Fatal("DB no of room map is badly formatted" + err.Error())
63+
} else {
64+
REDIS_ROOM_MAPPING = num
65+
}
66+
}
5567

56-
roomMappings := storage.InitialiseRoomMappings(REDIS_URI, 1)
68+
roomMappings := storage.InitialiseRoomMappings(REDIS_URI, REDIS_ROOM_MAPPING)
5769

5870
router := gin.Default()
5971
transport.SetCors(router, ORIGIN)

0 commit comments

Comments
 (0)