@@ -140,6 +140,9 @@ async function consumeQueue() {
140
140
}
141
141
142
142
console . log ( "Listening to matchmaking queues" ) ;
143
+
144
+ await consumeCancelQueue ( ) ;
145
+ console . log ( "Listening to Cancel Queue" )
143
146
} catch ( error ) {
144
147
console . error ( 'Error consuming RabbitMQ queue:' , error ) ;
145
148
}
@@ -172,4 +175,54 @@ async function consumeDLQ() {
172
175
}
173
176
}
174
177
178
+ async function consumeCancelQueue ( ) {
179
+ try {
180
+ const connection = await amqp . connect ( process . env . RABBITMQ_URL ) ;
181
+ const channel = await connection . createChannel ( ) ;
182
+
183
+ // Subscribe to the cancel queue
184
+ await channel . consume ( 'cancel_queue' , async ( msg ) => {
185
+ if ( msg !== null ) {
186
+ const { userId } = JSON . parse ( msg . content . toString ( ) ) ;
187
+
188
+ console . log ( `Received cancel request for user: ${ userId } ` ) ;
189
+
190
+ // Process the cancel request
191
+ await cancelMatching ( channel , msg , userId ) ;
192
+ }
193
+ } ) ;
194
+
195
+ console . log ( "Listening for cancel requests" ) ;
196
+ } catch ( error ) {
197
+ console . error ( 'Error consuming cancel queue:' , error ) ;
198
+ }
199
+ }
200
+
201
+ async function cancelMatching ( channel , msg , userId ) {
202
+ try {
203
+ // Loop through waitingUsers to find the user
204
+ Object . keys ( waitingUsers ) . forEach ( criteriaKey => {
205
+ const userIndex = waitingUsers [ criteriaKey ] . findIndex ( user => user . userId === userId ) ;
206
+
207
+ if ( userIndex !== - 1 ) {
208
+ waitingUsers [ criteriaKey ] . splice ( userIndex , 1 ) ;
209
+ console . log ( `User ${ userId } removed from waiting list for ${ criteriaKey } ` ) ;
210
+ }
211
+ } ) ;
212
+
213
+ // Clean up the timeout
214
+ if ( timeoutMap [ userId ] ) {
215
+ clearTimeout ( timeoutMap [ userId ] ) ;
216
+ delete timeoutMap [ userId ] ;
217
+ }
218
+
219
+ // Acknowledge the cancel message
220
+ channel . ack ( msg ) ;
221
+
222
+ console . log ( `Cancel processed for user ${ userId } ` ) ;
223
+ } catch ( error ) {
224
+ console . error ( `Failed to process cancel for user ${ userId } :` , error ) ;
225
+ }
226
+ }
227
+
175
228
module . exports = { consumeQueue, consumeDLQ } ;
0 commit comments