@@ -10,71 +10,10 @@ const { notifyUsers } = require('../websocket/websocket');
10
10
11
11
const dead_letter_queue_name = "dead_letter_queue" ;
12
12
const timeoutMap = { } ;
13
+
13
14
// Local dictionary to store waiting users
14
15
const waitingUsers = { } ;
15
16
16
- // manual rejection of message that is constantly being consumed. Not using TTL feature.
17
- // Race condition: upon matching user 2 with user 1, the flag for whether user 1 is matched is still false.
18
- // async function consumeQueue() {
19
- // try {
20
- // // Connect
21
- // const connection = await amqp.connect(process.env.RABBITMQ_URL);
22
- // const channel = await connection.createChannel();
23
-
24
- // console.log("Waiting for users...");
25
-
26
- // // Process + subscribe to each matchmaking queue
27
- // for (let queueName of queueNames) {
28
- // await channel.consume(queueName, (msg) => {
29
- // if (msg !== null) {
30
- // const userData = JSON.parse(msg.content.toString());
31
- // const { userId, language, difficulty } = userData;
32
-
33
- // // Perform the matching logic
34
- // console.log(`Received user ${userId} with ${language} and ${difficulty}`);
35
- // const matched = matchUsers(userId, language, difficulty);
36
-
37
- // // Flag to track if a match is found
38
- // // let isMatched = false;
39
- // if (timeoutMap[userId]) {
40
- // clearTimeout(timeoutMap[userId]);
41
- // delete timeoutMap[userId];
42
- // }
43
-
44
- // // Only acknowledge if a match was found
45
- // if (matched) {
46
- // // isMatched = true; // Set the flag
47
- // channel.ack(msg);
48
- // console.log(`Matched user ${userId}`);
49
-
50
- // // clearTimeout(timeoutMap[userId]);
51
- // // delete timeoutMap[userId];
52
- // } else {
53
- // console.log(`No match for ${userId}, waiting for TTL to expire.`);
54
-
55
- // // Set a timeout for rejection only if not matched
56
- // const timeoutId = setTimeout(() => {
57
- // // if (!isMatched) { // Check if matched after timeout
58
- // console.log(`is the user matched upon timeout? ${matched}`);
59
- // console.log(`Rejecting user ${userId} after 10 seconds.`);
60
- // channel.reject(msg, false); // Reject without requeuing
61
- // // }
62
- // }, 10000); // 10 seconds delay
63
-
64
- // timeoutMap[userId] = timeoutId;
65
- // }
66
- // }
67
- // });
68
- // }
69
-
70
- // console.log("Listening to matchmaking queues");
71
- // } catch (error) {
72
- // console.error('Error consuming RabbitMQ queue:', error);
73
- // }
74
- // }
75
-
76
-
77
-
78
17
// using promises to handle errors and ensure clearing of timer.
79
18
function matchUsers ( channel , msg , userId , language , difficulty ) {
80
19
const criteriaKey = `${ difficulty } .${ language } ` ;
@@ -93,7 +32,8 @@ function matchUsers(channel, msg, userId, language, difficulty) {
93
32
console . log ( `Matched users: ${ matchedUsers . map ( user => user . userId ) } ` ) ;
94
33
95
34
// Send match success (this could trigger WebSocket communication)
96
- notifyUsers ( matchedUsers . map ( user => user . userId ) ) ;
35
+ // notifyUsers(matchedUsers.map(user => user.userId));
36
+ notifyUsers ( matchedUsers . map ( user => user . userId ) , 'Match found!' , 'match' ) ;
97
37
98
38
// Acknowledge the messages for both matched users
99
39
matchedUsers . forEach ( ( { msg } ) => {
@@ -191,7 +131,7 @@ async function consumeDLQ() {
191
131
console . log ( `Received message from DLQ for user: ${ userId } ` ) ;
192
132
193
133
// Notify the user via WebSocket
194
- notifyUsers ( userId , `Match not found for ${ difficulty } ${ language } , please try again.` ) ;
134
+ notifyUsers ( userId , `Match not found for ${ difficulty } ${ language } , please try again.` , 'rejection' ) ;
195
135
196
136
// Acknowledge the message (so it's removed from the DLQ)
197
137
channel . ack ( msg ) ;
0 commit comments