Skip to content

Commit 011b606

Browse files
committed
Fix unacknowledged msg after cancelling match
1 parent b0f62eb commit 011b606

File tree

2 files changed

+38
-24
lines changed

2 files changed

+38
-24
lines changed

Backend/MatchingService/rabbitmq/setup.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ const queueNames = [
1313
'medium.cplusplus',
1414
'hard.python',
1515
'hard.java',
16-
'hard.cplusplus'
16+
'hard.cplusplus',
1717
];
1818

1919
async function setupRabbitMQ() {
@@ -27,17 +27,17 @@ async function setupRabbitMQ() {
2727
const channel = await connection.createChannel();
2828

2929
// Declare the matching exchange (topic)
30-
await channel.assertExchange(matching_exchange_name, "topic", { durable: false });
30+
await channel.assertExchange(matching_exchange_name, "topic", { durable: true });
3131

3232
// Declare the dead-letter exchange (fanout)
33-
await channel.assertExchange(dead_letter_exchange_name, "fanout", { durable: false });
33+
await channel.assertExchange(dead_letter_exchange_name, "fanout", { durable: true });
3434

3535
// Declare and bind all main queues with TTL and DLQ bindings
3636
for (let queueName of queueNames) {
3737
await channel.deleteQueue(queueName); // Ensure we start fresh for each setup
3838

3939
await channel.assertQueue(queueName, {
40-
durable: false,
40+
durable: true,
4141
arguments: {
4242
'x-message-ttl': 10000, // 60 seconds TTL
4343
'x-dead-letter-exchange': dead_letter_exchange_name // Bind to dead-letter exchange
@@ -51,12 +51,12 @@ async function setupRabbitMQ() {
5151
await channel.deleteQueue(dead_letter_queue_name);
5252

5353
// Declare the dead-letter queue and bind it to the dead-letter exchange
54-
await channel.assertQueue(dead_letter_queue_name, { durable: false });
54+
await channel.assertQueue(dead_letter_queue_name, { durable: true });
5555
await channel.bindQueue(dead_letter_queue_name, dead_letter_exchange_name, ''); // Bind with no routing key
5656

5757
// Declare and bind the cancel queue
5858
await channel.deleteQueue(cancel_queue_name); // Delete any existing cancel queue
59-
await channel.assertQueue(cancel_queue_name, { durable: false }); // Declare the cancel queue
59+
await channel.assertQueue(cancel_queue_name, { durable: true }); // Declare the cancel queue
6060
await channel.bindQueue(cancel_queue_name, matching_exchange_name, 'cancel'); // Bind with the "cancel" routing key
6161

6262
console.log("RabbitMQ setup complete with queues, DLQ, and bindings.");

Backend/MatchingService/rabbitmq/subscriber.js

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,20 @@ function matchUsers(channel, msg, userId, language, difficulty) {
2323
waitingUsers[criteriaKey] = [];
2424
}
2525

26-
waitingUsers[criteriaKey].push({ userId, msg }); // Store both userId and the message
26+
// Store both the userId, message, and the channel in waitingUsers
27+
waitingUsers[criteriaKey].push({ userId, msg, channel });
2728
console.log(`User ${userId} added to ${criteriaKey}. Waiting list: ${waitingUsers[criteriaKey].length}`);
2829

2930
// Check if there are 2 or more users waiting for this criteria
3031
if (waitingUsers[criteriaKey].length >= 2) {
3132
const matchedUsers = waitingUsers[criteriaKey].splice(0, 2); // Match the first two users
3233
console.log(`Matched users: ${matchedUsers.map(user => user.userId)}`);
3334

34-
// Send match success (this could trigger WebSocket communication)
35-
// notifyUsers(matchedUsers.map(user => user.userId));
35+
// Notify users of the match
3636
notifyUsers(matchedUsers.map(user => user.userId), 'Match found!', 'match');
3737

3838
// Acknowledge the messages for both matched users
39-
matchedUsers.forEach(({ msg }) => {
39+
matchedUsers.forEach(({ msg, channel }) => {
4040
acknowledgeMessage(channel, msg);
4141
});
4242

@@ -46,6 +46,7 @@ function matchUsers(channel, msg, userId, language, difficulty) {
4646
return false;
4747
}
4848

49+
4950
async function acknowledgeMessage(channel, msg) {
5051
return new Promise((resolve, reject) => {
5152
try {
@@ -104,7 +105,6 @@ async function rejectMessage(channel, msg, userId) {
104105

105106
async function consumeQueue() {
106107
try {
107-
// Connect
108108
const connection = await amqp.connect(process.env.RABBITMQ_URL);
109109
const channel = await connection.createChannel();
110110

@@ -123,26 +123,23 @@ async function consumeQueue() {
123123
// Call matchUsers with channel, message, and user details
124124
const matched = matchUsers(channel, msg, userId, language, difficulty);
125125

126-
// If not matched, set a timeout for rejection
127126
if (!matched) {
128127
console.log(`No match for ${userId}, waiting for rejection timeout.`);
129128

130-
// Set a timeout for rejection after 10 seconds
131129
const timeoutId = setTimeout(async () => {
132130
await rejectMessage(channel, msg, userId);
133131
}, 10000); // 10 seconds delay
134132

135-
// Store the timeout ID
136133
timeoutMap[userId] = timeoutId;
137134
}
138135
}
139-
});
136+
}, { noAck: false }); // Ensure manual acknowledgment
140137
}
141138

142139
console.log("Listening to matchmaking queues");
143140

144141
await consumeCancelQueue();
145-
console.log("Listening to Cancel Queue")
142+
console.log("Listening to Cancel Queue");
146143
} catch (error) {
147144
console.error('Error consuming RabbitMQ queue:', error);
148145
}
@@ -184,45 +181,62 @@ async function consumeCancelQueue() {
184181
await channel.consume('cancel_queue', async (msg) => {
185182
if (msg !== null) {
186183
const { userId } = JSON.parse(msg.content.toString());
187-
188184
console.log(`Received cancel request for user: ${userId}`);
189185

190186
// Process the cancel request
191187
await cancelMatching(channel, msg, userId);
192188
}
193-
});
189+
}, { noAck: false }); // Ensure manual acknowledgment
194190

195191
console.log("Listening for cancel requests");
196192
} catch (error) {
197193
console.error('Error consuming cancel queue:', error);
198194
}
199195
}
200196

201-
async function cancelMatching(channel, msg, userId) {
197+
async function cancelMatching(cancelChannel, cancelMsg, userId) {
202198
try {
203-
// Loop through waitingUsers to find the user
199+
let foundOriginalMsg = false;
200+
201+
// Loop through waitingUsers to find the original message for the user
204202
Object.keys(waitingUsers).forEach(criteriaKey => {
205203
const userIndex = waitingUsers[criteriaKey].findIndex(user => user.userId === userId);
206204

207205
if (userIndex !== -1) {
206+
const { msg, channel } = waitingUsers[criteriaKey][userIndex]; // Get original msg and its channel
207+
208+
// Acknowledge the original matchmaking message from the queue (e.g., easy.python)
209+
if (msg && channel) {
210+
console.log(`Acknowledging original message for user ${userId} in queue ${criteriaKey}`);
211+
channel.ack(msg); // Use the same channel that consumed the message to acknowledge it
212+
foundOriginalMsg = true;
213+
}
214+
215+
// Remove the user from the waiting list
208216
waitingUsers[criteriaKey].splice(userIndex, 1);
209217
console.log(`User ${userId} removed from waiting list for ${criteriaKey}`);
210218
}
211219
});
212220

213-
// Clean up the timeout
221+
// If original message not found, log a warning
222+
if (!foundOriginalMsg) {
223+
console.warn(`Original message for user ${userId} not found in matchmaking queues.`);
224+
}
225+
226+
// Clear any timeouts for the user
214227
if (timeoutMap[userId]) {
215228
clearTimeout(timeoutMap[userId]);
216229
delete timeoutMap[userId];
217230
}
218231

219-
// Acknowledge the cancel message
220-
channel.ack(msg);
221-
232+
// Acknowledge the cancel message from the cancel queue
233+
cancelChannel.ack(cancelMsg);
222234
console.log(`Cancel processed for user ${userId}`);
235+
223236
} catch (error) {
224237
console.error(`Failed to process cancel for user ${userId}:`, error);
225238
}
226239
}
227240

241+
228242
module.exports = { consumeQueue, consumeDLQ };

0 commit comments

Comments
 (0)