Skip to content

Commit 0207b5b

Browse files
committed
Fix bugs in subscriber.js
1 parent 7bc532a commit 0207b5b

File tree

3 files changed

+102
-96
lines changed

3 files changed

+102
-96
lines changed

Backend/MatchingService/rabbitmq/publisher.js

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,25 +37,26 @@ async function publishToQueue({ userId, difficulty, category }) {
3737
}
3838
}
3939

40-
async function publishCancelRequest({ userId }) {
41-
try {
42-
const channel = await connectToRabbitMQ();
43-
const routingKey = 'cancel';
44-
45-
const messageSent = channel.publish(
46-
matching_exchange_name,
47-
routingKey,
48-
Buffer.from(JSON.stringify({ userId }))
49-
);
50-
51-
if (messageSent) {
52-
console.log(`Cancel request sent: ${userId}`);
53-
} else {
54-
console.error(`Cancel request NOT sent: ${userId}`);
55-
}
56-
} catch (error) {
57-
console.error('Error publishing cancel request to RabbitMQ:', error);
58-
}
59-
}
60-
61-
module.exports = { publishToQueue, publishCancelRequest };
40+
// async function publishCancelRequest({ userId }) {
41+
// try {
42+
// const channel = await connectToRabbitMQ();
43+
// const routingKey = 'cancel';
44+
45+
// const messageSent = channel.publish(
46+
// matching_exchange_name,
47+
// routingKey,
48+
// Buffer.from(JSON.stringify({ userId }))
49+
// );
50+
51+
// if (messageSent) {
52+
// console.log(`Cancel request sent: ${userId}`);
53+
// } else {
54+
// console.error(`Cancel request NOT sent: ${userId}`);
55+
// }
56+
// } catch (error) {
57+
// console.error('Error publishing cancel request to RabbitMQ:', error);
58+
// }
59+
// }
60+
61+
// module.exports = { publishToQueue, publishCancelRequest };
62+
module.exports = { publishToQueue };

Backend/MatchingService/rabbitmq/setup.js

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const generator = require("../utils/generateQueues");
44
const matching_exchange_name = "matching_exchange";
55
const dead_letter_exchange_name = "dead_letter_exchange";
66
const dead_letter_queue_name = "dead_letter_queue";
7-
const cancel_queue_name = "cancel_queue";
7+
// const cancel_queue_name = "cancel_queue";
88
const difficulties = ["easy", "medium", "hard", "any"];
99
const axios = require('axios');
1010
const categoryAPIUrl = 'http://localhost:3001/api/categories';
@@ -57,10 +57,10 @@ async function setupRabbitMQ() {
5757
await channel.assertQueue(dead_letter_queue_name, { durable: true });
5858
await channel.bindQueue(dead_letter_queue_name, dead_letter_exchange_name, ''); // Bind with no routing key
5959

60-
// Delete and recreate the cancel queue
61-
await channel.deleteQueue(cancel_queue_name);
62-
await channel.assertQueue(cancel_queue_name, { durable: true });
63-
await channel.bindQueue(cancel_queue_name, matching_exchange_name, 'cancel'); // Bind with the "cancel" routing key
60+
// // Delete and recreate the cancel queue
61+
// await channel.deleteQueue(cancel_queue_name);
62+
// await channel.assertQueue(cancel_queue_name, { durable: true });
63+
// await channel.bindQueue(cancel_queue_name, matching_exchange_name, 'cancel'); // Bind with the "cancel" routing key
6464

6565
console.log("RabbitMQ setup complete with queues, DLQ, and bindings.");
6666

@@ -71,4 +71,5 @@ async function setupRabbitMQ() {
7171
}
7272
}
7373

74-
module.exports = { setupRabbitMQ, matching_exchange_name, queueNamesPromise, dead_letter_queue_name, cancel_queue_name };
74+
// module.exports = { setupRabbitMQ, matching_exchange_name, queueNamesPromise, dead_letter_queue_name, cancel_queue_name };
75+
module.exports = { setupRabbitMQ, matching_exchange_name, queueNamesPromise, dead_letter_queue_name };

Backend/MatchingService/rabbitmq/subscriber.js

Lines changed: 72 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ function matchUsers(channel, msg, userId, difficulty, category) {
6969

7070
if (waitingUsers[criteriaKey].length >= 2) {
7171
const matchedUsers = waitingUsers[criteriaKey].splice(0, 2);
72-
removeMatchedUsersFromOtherLists(matchedUsers, categoryKey);
72+
removeMatchedUsersFromOtherLists(matchedUsers, criteriaKey);
7373
console.log("waitingusers after strict matching: ", waitingUsers)
7474
notifyMatch(channel, matchedUsers, category);
7575
return true;
@@ -81,7 +81,7 @@ function matchUsers(channel, msg, userId, difficulty, category) {
8181
console.log(`Fallback: User ${userId} added to ${categoryKey}. Waiting list: ${waitingUsers[categoryKey].length}`);
8282
if (waitingUsers[categoryKey].length >= 2) {
8383
const matchedUsers = waitingUsers[categoryKey].splice(0, 2);
84-
removeMatchedUsersFromOtherLists(matchedUsers, criteriaKey);
84+
removeMatchedUsersFromOtherLists(matchedUsers, categoryKey);
8585
console.log("waitingusers after lenient matching: ", waitingUsers)
8686
notifyMatch(channel, matchedUsers, category);
8787
return true;
@@ -99,6 +99,7 @@ function removeMatchedUsersFromOtherLists(matchedUsers, keyToSkip) {
9999
);
100100
}
101101
}
102+
console.log("waiting users after removing: ", waitingUsers);
102103
}
103104

104105
async function notifyMatch(channel, matchedUsers, category) {
@@ -191,19 +192,21 @@ async function acknowledgeMessage(channel, msg) {
191192
// });
192193
// }
193194

194-
async function rejectMessage(channel, msg, userId) {
195+
async function rejectMessage(channel, msg, user) {
195196
return new Promise((resolve, reject) => {
196197
try {
197198
const userData = JSON.parse(msg.content.toString());
198199
channel.reject(msg, false); // Reject without requeuing
199-
console.log(`Rejected message for user: ${userId}`);
200-
if (timeoutMap[userId]) {
201-
clearTimeout(timeoutMap[userId]);
202-
delete timeoutMap[userId];
200+
console.log(`Rejected message for user: ${user.userId}`);
201+
if (timeoutMap[user.userId]) {
202+
clearTimeout(timeoutMap[user.userId]);
203+
delete timeoutMap[user.userId];
203204
}
205+
removeMatchedUsersFromOtherLists([user], 'all')
206+
204207
resolve();
205208
} catch (error) {
206-
console.error(`Failed to reject message for user ${userId}:`, error);
209+
console.error(`Failed to reject message for user ${user.userId}:`, error);
207210
reject(error);
208211
}
209212
});
@@ -275,7 +278,8 @@ async function consumeQueue() {
275278
if (!matched) {
276279
console.log(`No match for ${userId}, waiting for rejection timeout.`);
277280
const timeoutId = setTimeout(async () => {
278-
await rejectMessage(channel, msg, userId);
281+
const categorykey = "any." + category
282+
await rejectMessage(channel, msg, userData);
279283
}, 10000); // 10 seconds delay
280284

281285
timeoutMap[userId] = timeoutId;
@@ -286,8 +290,8 @@ async function consumeQueue() {
286290

287291
console.log("Listening to matchmaking queues");
288292

289-
await consumeCancelQueue();
290-
console.log("Listening to Cancel Queue");
293+
//await consumeCancelQueue();
294+
//console.log("Listening to Cancel Queue");
291295
} catch (error) {
292296
console.error('Error consuming RabbitMQ queue:', error);
293297
}
@@ -320,72 +324,72 @@ async function consumeDLQ() {
320324
}
321325
}
322326

323-
async function consumeCancelQueue() {
324-
try {
325-
const connection = await amqp.connect(process.env.RABBITMQ_URL);
326-
const channel = await connection.createChannel();
327+
// async function consumeCancelQueue() {
328+
// try {
329+
// const connection = await amqp.connect(process.env.RABBITMQ_URL);
330+
// const channel = await connection.createChannel();
327331

328-
// Subscribe to the cancel queue
329-
await channel.consume('cancel_queue', async (msg) => {
330-
if (msg !== null) {
331-
const { userId } = JSON.parse(msg.content.toString());
332-
console.log(`Received cancel request for user: ${userId}`);
332+
// // Subscribe to the cancel queue
333+
// await channel.consume('cancel_queue', async (msg) => {
334+
// if (msg !== null) {
335+
// const { userId } = JSON.parse(msg.content.toString());
336+
// console.log(`Received cancel request for user: ${userId}`);
333337

334-
// Process the cancel request
335-
await cancelMatching(channel, msg, userId);
336-
}
337-
}, { noAck: false }); // Ensure manual acknowledgment
338+
// // Process the cancel request
339+
// await cancelMatching(channel, msg, userId);
340+
// }
341+
// }, { noAck: false }); // Ensure manual acknowledgment
338342

339-
console.log("Listening for cancel requests");
340-
} catch (error) {
341-
console.error('Error consuming cancel queue:', error);
342-
}
343-
}
343+
// console.log("Listening for cancel requests");
344+
// } catch (error) {
345+
// console.error('Error consuming cancel queue:', error);
346+
// }
347+
// }
344348

345-
async function cancelMatching(cancelChannel, cancelMsg, userId) {
346-
try {
347-
let foundOriginalMsg = false;
348-
349-
// Loop through waitingUsers to find the original message for the user
350-
Object.keys(waitingUsers).forEach(criteriaKey => {
351-
const userIndex = waitingUsers[criteriaKey].findIndex(user => user.userId === userId);
352-
// const userIndexCat = waitingUsers[categoryKey].findIndex(user => user.userId === userId);
353-
if (userIndex !== -1) {
354-
const { msg, channel } = waitingUsers[criteriaKey][userIndex]; // Get original msg and its channel
355-
356-
// Acknowledge the original matchmaking message from the queue (e.g., easy.python)
357-
if (msg && channel) {
358-
console.log(`Acknowledging original message for user ${userId} in queue ${criteriaKey}`);
359-
channel.ack(msg); // Use the same channel that consumed the message to acknowledge it
360-
foundOriginalMsg = true;
361-
}
349+
// async function cancelMatching(cancelChannel, cancelMsg, userId) {
350+
// try {
351+
// let foundOriginalMsg = false;
352+
353+
// // Loop through waitingUsers to find the original message for the user
354+
// Object.keys(waitingUsers).forEach(criteriaKey => {
355+
// const userIndex = waitingUsers[criteriaKey].findIndex(user => user.userId === userId);
356+
// // const userIndexCat = waitingUsers[categoryKey].findIndex(user => user.userId === userId);
357+
// if (userIndex !== -1) {
358+
// const { msg, channel } = waitingUsers[criteriaKey][userIndex]; // Get original msg and its channel
359+
360+
// // Acknowledge the original matchmaking message from the queue (e.g., easy.python)
361+
// if (msg && channel) {
362+
// console.log(`Acknowledging original message for user ${userId} in queue ${criteriaKey}`);
363+
// channel.ack(msg); // Use the same channel that consumed the message to acknowledge it
364+
// foundOriginalMsg = true;
365+
// }
362366

363-
// Remove the user from the waiting list
364-
waitingUsers[criteriaKey].splice(userIndex, 1);
365-
// waitingUsers[categoryKey].splice(userIndex, 1);
366-
console.log(`User ${userId} removed from waiting list for ${criteriaKey}`);
367-
}
368-
});
367+
// // Remove the user from the waiting list
368+
// waitingUsers[criteriaKey].splice(userIndex, 1);
369+
// // waitingUsers[categoryKey].splice(userIndex, 1);
370+
// console.log(`User ${userId} removed from waiting list for ${criteriaKey}`);
371+
// }
372+
// });
369373

370-
// If original message not found, log a warning
371-
if (!foundOriginalMsg) {
372-
console.warn(`Original message for user ${userId} not found in matchmaking queues.`);
373-
}
374+
// // If original message not found, log a warning
375+
// if (!foundOriginalMsg) {
376+
// console.warn(`Original message for user ${userId} not found in matchmaking queues.`);
377+
// }
374378

375-
// Clear any timeouts for the user
376-
if (timeoutMap[userId]) {
377-
clearTimeout(timeoutMap[userId]);
378-
delete timeoutMap[userId];
379-
}
379+
// // Clear any timeouts for the user
380+
// if (timeoutMap[userId]) {
381+
// clearTimeout(timeoutMap[userId]);
382+
// delete timeoutMap[userId];
383+
// }
380384

381-
// Acknowledge the cancel message from the cancel queue
382-
cancelChannel.ack(cancelMsg);
383-
console.log(`Cancel processed for user ${userId}`);
385+
// // Acknowledge the cancel message from the cancel queue
386+
// cancelChannel.ack(cancelMsg);
387+
// console.log(`Cancel processed for user ${userId}`);
384388

385-
} catch (error) {
386-
console.error(`Failed to process cancel for user ${userId}:`, error);
387-
}
388-
}
389+
// } catch (error) {
390+
// console.error(`Failed to process cancel for user ${userId}:`, error);
391+
// }
392+
// }
389393

390394

391395
module.exports = { consumeQueue, consumeDLQ };

0 commit comments

Comments
 (0)