@@ -7,16 +7,27 @@ import (
7
7
"matching-service/processes"
8
8
"matching-service/utils"
9
9
"net/http"
10
+ "strings"
11
+ "sync"
10
12
11
13
"github.com/gorilla/websocket"
12
14
)
13
15
14
- var upgrader = websocket.Upgrader {
15
- CheckOrigin : func (r * http.Request ) bool {
16
- // Allow all connections by skipping the origin check (set more restrictions in production)
17
- return true
18
- },
19
- }
16
+ var (
17
+ upgrader = websocket.Upgrader {
18
+ CheckOrigin : func (r * http.Request ) bool {
19
+ // Allow all connections by skipping the origin check (set more restrictions in production)
20
+ return true
21
+ },
22
+ }
23
+ // A map to hold active WebSocket connections per username
24
+ activeConnections = make (map [string ]* websocket.Conn )
25
+ // A map to hold user's match ctx cancel function
26
+ matchContexts = make (map [string ]context.CancelFunc )
27
+ // A map to hold user's match channels
28
+ matchFoundChannels = make (map [string ]chan models.MatchFound )
29
+ mu sync.Mutex // Mutex for thread-safe access to activeConnections
30
+ )
20
31
21
32
// handleConnections manages WebSocket connections and matching logic.
22
33
func HandleWebSocketConnections (w http.ResponseWriter , r * http.Request ) {
@@ -42,25 +53,48 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) {
42
53
return
43
54
}
44
55
56
+ // Store WebSocket connection in the activeConnections map.
57
+ mu .Lock ()
58
+ // Checks if user is already an existing websocket connection
59
+ if _ , exists := activeConnections [matchRequest .Username ]; exists {
60
+ mu .Unlock ()
61
+ log .Printf ("User %s is already connected, rejecting new connection." , matchRequest .Username )
62
+ ws .WriteJSON (models.MatchRejected {
63
+ Type : "match_rejected" ,
64
+ Message : "You are already in a matchmaking queue. Please disconnect before reconnecting." ,
65
+ })
66
+ ws .Close ()
67
+ return
68
+ }
69
+ activeConnections [matchRequest .Username ] = ws
70
+ matchCtx , matchCancel := context .WithCancel (context .Background ())
71
+ matchContexts [matchRequest .Username ] = matchCancel
72
+
73
+ matchFoundChan := make (chan models.MatchFound )
74
+ matchFoundChannels [matchRequest .Username ] = matchFoundChan
75
+ mu .Unlock ()
76
+
45
77
// Create a context for cancellation
46
78
ctx , cancel := context .WithCancel (context .Background ())
47
79
defer cancel () // Ensure cancel is called to release resources
48
80
81
+ processes .EnqueueUser (processes .GetRedisClient (), matchRequest .Username , ctx )
82
+ processes .AddUserToTopicSets (processes .GetRedisClient (), matchRequest , ctx )
83
+ processes .StoreUserDetails (processes .GetRedisClient (), matchRequest , ctx )
84
+
49
85
timeoutCtx , timeoutCancel , err := createTimeoutContext ()
50
86
if err != nil {
51
87
log .Printf ("Error creating timeout context: %v" , err )
52
88
return
53
89
}
54
90
defer timeoutCancel ()
55
91
56
- matchFoundChan := make (chan models.MatchFound )
57
-
58
92
// Start goroutines for handling messages and performing matching.
59
93
go processes .ReadMessages (ws , ctx , cancel )
60
- go processes .PerformMatching (matchRequest , ctx , matchFoundChan ) // Perform matching
94
+ go processes .PerformMatching (matchRequest , context . Background (), matchFoundChannels ) // Perform matching
61
95
62
96
// Wait for a match, timeout, or cancellation.
63
- waitForResult (ws , ctx , timeoutCtx , matchFoundChan )
97
+ waitForResult (ws , ctx , timeoutCtx , matchCtx , matchFoundChan , matchRequest . Username )
64
98
}
65
99
66
100
// readMatchRequest reads the initial match request from the WebSocket connection.
@@ -69,7 +103,13 @@ func readMatchRequest(ws *websocket.Conn) (models.MatchRequest, error) {
69
103
if err := ws .ReadJSON (& matchRequest ); err != nil {
70
104
return matchRequest , err
71
105
}
72
- log .Printf ("Received match request: %v" , matchRequest )
106
+ // Get the remote address (client's IP and port)
107
+ clientAddr := ws .RemoteAddr ().String ()
108
+
109
+ // Extract the port (after the last ':')
110
+ clientPort := clientAddr [strings .LastIndex (clientAddr , ":" )+ 1 :]
111
+
112
+ log .Printf ("Received match request: %v from client port: %s" , matchRequest , clientPort )
73
113
return matchRequest , nil
74
114
}
75
115
@@ -84,25 +124,53 @@ func createTimeoutContext() (context.Context, context.CancelFunc, error) {
84
124
}
85
125
86
126
// waitForResult waits for a match result, timeout, or cancellation.
87
- func waitForResult (ws * websocket.Conn , ctx , timeoutCtx context.Context , matchFoundChan chan models.MatchFound ) {
127
+ func waitForResult (ws * websocket.Conn , ctx , timeoutCtx , matchCtx context.Context , matchFoundChan chan models.MatchFound , username string ) {
88
128
select {
89
129
case <- ctx .Done ():
90
130
log .Println ("Matching cancelled" )
131
+ // Cleanup Redis
132
+ processes .CleanUpUser (processes .GetRedisClient (), username , context .Background ())
133
+ // Remove the match context and active
134
+ if _ , exists := matchContexts [username ]; exists {
135
+ delete (matchContexts , username )
136
+ }
137
+ if _ , exists := activeConnections [username ]; exists {
138
+ delete (activeConnections , username )
139
+ }
140
+ if _ , exists := matchFoundChannels [username ]; exists {
141
+ delete (matchFoundChannels , username )
142
+ }
143
+
91
144
return
92
145
case <- timeoutCtx .Done ():
93
146
log .Println ("Connection timed out" )
147
+ // Cleanup Redis
148
+ processes .CleanUpUser (processes .GetRedisClient (), username , context .Background ())
149
+ // Remove the match context and active
150
+ if _ , exists := matchContexts [username ]; exists {
151
+ delete (matchContexts , username )
152
+ }
153
+ if _ , exists := activeConnections [username ]; exists {
154
+ delete (activeConnections , username )
155
+ }
156
+ if _ , exists := matchFoundChannels [username ]; exists {
157
+ delete (matchFoundChannels , username )
158
+ }
159
+
94
160
sendTimeoutResponse (ws )
95
161
return
162
+ case <- matchCtx .Done ():
163
+ log .Println ("Match found for user: " + username )
164
+ return
96
165
case result , ok := <- matchFoundChan :
97
166
if ! ok {
98
167
// Channel closed without a match, possibly due to context cancellation
99
168
log .Println ("Match channel closed without finding a match" )
100
169
return
101
170
}
102
- log .Println ("Match found" )
103
- if err := ws .WriteJSON (result ); err != nil {
104
- log .Printf ("write error: %v" , err )
105
- }
171
+ log .Println ("Match found for user: " + username )
172
+ // Notify the users about the match
173
+ notifyMatch (result .User , result .MatchedUser , result )
106
174
return
107
175
}
108
176
}
@@ -117,3 +185,46 @@ func sendTimeoutResponse(ws *websocket.Conn) {
117
185
log .Printf ("write error: %v" , err )
118
186
}
119
187
}
188
+
189
+ func notifyMatch (username , matchedUsername string , result models.MatchFound ) {
190
+ mu .Lock ()
191
+ defer mu .Unlock ()
192
+
193
+ // Send message to the first user
194
+ if userConn , userExists := activeConnections [username ]; userExists {
195
+ if err := userConn .WriteJSON (result ); err != nil {
196
+ log .Printf ("Error sending message to user %s: %v\n " , username , err )
197
+ }
198
+ }
199
+
200
+ // Send message to the matched user
201
+ if matchedUserConn , matchedUserExists := activeConnections [matchedUsername ]; matchedUserExists {
202
+ result .User , result .MatchedUser = result .MatchedUser , result .User // Swap User and MatchedUser values
203
+ if err := matchedUserConn .WriteJSON (result ); err != nil {
204
+ log .Printf ("Error sending message to user %s: %v\n " , username , err )
205
+ }
206
+ }
207
+
208
+ // Remove the match context for both users and cancel for matched user
209
+ if cancelFunc , exists := matchContexts [username ]; exists {
210
+ cancelFunc ()
211
+ delete (matchContexts , username )
212
+ }
213
+
214
+ if cancelFunc2 , exists := matchContexts [matchedUsername ]; exists {
215
+ cancelFunc2 ()
216
+ delete (matchContexts , matchedUsername )
217
+ }
218
+
219
+ // Remove the match channels
220
+ if _ , exists := matchFoundChannels [username ]; exists {
221
+ delete (matchFoundChannels , username )
222
+ }
223
+ if _ , exists := matchFoundChannels [matchedUsername ]; exists {
224
+ delete (matchFoundChannels , matchedUsername )
225
+ }
226
+
227
+ // Remove users from the activeConnections map
228
+ delete (activeConnections , username )
229
+ delete (activeConnections , matchedUsername )
230
+ }
0 commit comments