Skip to content

Commit fa3d6c9

Browse files
committed
Update consumer logic with timeOut
1 parent 4f133b3 commit fa3d6c9

File tree

5 files changed

+284
-75
lines changed

5 files changed

+284
-75
lines changed

Backend/MatchingService/app.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ const express = require('express');
22
const cors = require("cors");
33
const dotenv = require("dotenv");
44
const matchmakingRouter = require("./controllers/matchmaking");
5-
const { consumeQueue } = require('./rabbitmq/subscriber');
5+
const { consumeQueue, consumeDLQ } = require('./rabbitmq/subscriber');
66
const { setupRabbitMQ } = require('./rabbitmq/setup');
77
const { publishToQueue } = require('./rabbitmq/publisher')
88

@@ -21,8 +21,11 @@ app.use('/api/match', matchmakingRouter);
2121
setupRabbitMQ().then(() => {
2222

2323
consumeQueue().catch(console.error);
24-
publishToQueue({userId: "user_234", difficulty: "easy", language: "python"})
25-
publishToQueue({userId: "user_234", difficulty: "easy", language: "java"})
24+
consumeDLQ().catch(console.error);
25+
26+
publishToQueue({userId: "user_1", difficulty: "easy", language: "java"})
27+
publishToQueue({userId: "user_2", difficulty: "easy", language: "python"})
28+
publishToQueue({userId: "user_3", difficulty: "easy", language: "java"})
2629

2730
})
2831

Lines changed: 24 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
const amqp = require("amqplib");
2+
23
const matching_exchange_name = "matching_exchange";
4+
const dead_letter_exchange_name = "dead_letter_exchange";
5+
const dead_letter_queue_name = "dead_letter_queue";
36
const queueNames = [
47
'easy.python',
58
'easy.java',
@@ -10,71 +13,50 @@ const queueNames = [
1013
'hard.python',
1114
'hard.java',
1215
'hard.cplusplus'
13-
]
16+
];
1417

1518
async function setupRabbitMQ() {
1619
try {
17-
const connection = await amqp.connect(process.env.RABBITMQ_URL)
20+
const connection = await amqp.connect(process.env.RABBITMQ_URL);
1821

1922
if (!connection) {
2023
return;
2124
}
2225

2326
const channel = await connection.createChannel();
2427

25-
// Declare matching exchange to be bind to queues
26-
28+
// Declare the matching exchange (topic)
2729
await channel.assertExchange(matching_exchange_name, "topic", { durable: false });
2830

29-
// Declare dead letter exchange
30-
const dead_letter_exchange_name = "dead_letter_exchange";
31+
// Declare the dead-letter exchange (fanout)
3132
await channel.assertExchange(dead_letter_exchange_name, "fanout", { durable: false });
3233

34+
// Declare and bind all main queues with TTL and DLQ bindings
35+
for (let queueName of queueNames) {
36+
await channel.deleteQueue(queueName); // Ensure we start fresh for each setup
3337

38+
await channel.assertQueue(queueName, {
39+
durable: false,
40+
arguments: {
41+
'x-message-ttl': 10000, // 60 seconds TTL
42+
'x-dead-letter-exchange': dead_letter_exchange_name // Bind to dead-letter exchange
43+
}
44+
});
3445

35-
// Create and bind queues to exchange with the routing keys
36-
for (let name of queueNames) {
37-
/*
38-
try {
39-
await channel.deleteQueue(name);
40-
} catch (err) {
41-
console.log(`Queue ${name} does not exist or could not be deleted: ${err.message}`);
42-
}
43-
*/
44-
// this is required to add TTL and dead letter exchange to the queue
45-
await channel.deleteQueue(name);
46-
47-
await channel.assertQueue(name,
48-
{ durable: false, // durable=false ensures queue will survive broker restarts
49-
arguments: {
50-
'x-message-ttl': 60000, // set message time to live to 60 seconds
51-
'x-dead-letter-exchange': dead_letter_exchange_name // set dead letter exchange
52-
}
53-
54-
});
55-
56-
await channel.bindQueue(name, matching_exchange_name, name); // e.g. messages with routing key easy.python goes to easy.python queue
46+
await channel.bindQueue(queueName, matching_exchange_name, queueName); // Bind to exchange
5747
}
5848

59-
// Create and bind queue to exchange (if we want only 1 queue)
60-
// await channel.assertQueue(name, { durable: false })
61-
// await channel.bindQueue(name, matching_exchange_name, '#') // all messages go to this queue because of a wildcard pattern
49+
// Declare the dead-letter queue and bind it to the dead-letter exchange
50+
await channel.assertQueue(dead_letter_queue_name, { durable: false });
51+
await channel.bindQueue(dead_letter_queue_name, dead_letter_exchange_name, ''); // Bind with no routing key
6252

63-
// Create and bind dead letter queue
64-
// const dead_letter_queue_name = "dead_letter_queue";
65-
// await channel.assertQueue(deadLetterQueueName, { durable: false });
66-
// await channel.bindQueue(deadLetterQueueName, deadLetterExchangeName, ''); // Bind all messages to this queue
67-
68-
69-
console.log("RabbitMQ setup complete with queues and bindings.")
53+
console.log("RabbitMQ setup complete with queues, DLQ, and bindings.");
7054

7155
await channel.close();
7256
await connection.close();
7357
} catch (error) {
74-
console.log('Error setting up RabbitMQ:', error);
58+
console.error("Error setting up RabbitMQ:", error);
7559
}
7660
}
7761

78-
module.exports = { setupRabbitMQ, matching_exchange_name, queueNames };
79-
80-
setupRabbitMQ()
62+
module.exports = { setupRabbitMQ, matching_exchange_name, queueNames, dead_letter_queue_name };
Lines changed: 176 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,207 @@
11
const amqp = require('amqplib');
22
const { queueNames } = require('./setup.js');
3-
const { matchUsers } = require('../services/matchingService.js');
3+
// const { matchUsers } = require('../services/matchingService.js');
4+
const { notifyUsers } = require('../websocket/websocket');
45

56
// TODO: Subscribe and acknowledge messages with user info when timeout/user matched
67

78
// To remember what goes in a subscriber use some Acronym
89
// Connect, Assert, Process, E - for Acknowledge
910

11+
const dead_letter_queue_name = "dead_letter_queue";
12+
const timeoutMap = {};
13+
// Local dictionary to store waiting users
14+
const waitingUsers = {};
15+
16+
// manual rejection of message that is constantly being consumed. Not using TTL feature.
17+
// Race condition: upon matching user 2 with user 1, the flag for whether user 1 is matched is still false.
18+
// async function consumeQueue() {
19+
// try {
20+
// // Connect
21+
// const connection = await amqp.connect(process.env.RABBITMQ_URL);
22+
// const channel = await connection.createChannel();
23+
24+
// console.log("Waiting for users...");
25+
26+
// // Process + subscribe to each matchmaking queue
27+
// for (let queueName of queueNames) {
28+
// await channel.consume(queueName, (msg) => {
29+
// if (msg !== null) {
30+
// const userData = JSON.parse(msg.content.toString());
31+
// const { userId, language, difficulty } = userData;
32+
33+
// // Perform the matching logic
34+
// console.log(`Received user ${userId} with ${language} and ${difficulty}`);
35+
// const matched = matchUsers(userId, language, difficulty);
36+
37+
// // Flag to track if a match is found
38+
// // let isMatched = false;
39+
// if (timeoutMap[userId]) {
40+
// clearTimeout(timeoutMap[userId]);
41+
// delete timeoutMap[userId];
42+
// }
43+
44+
// // Only acknowledge if a match was found
45+
// if (matched) {
46+
// // isMatched = true; // Set the flag
47+
// channel.ack(msg);
48+
// console.log(`Matched user ${userId}`);
49+
50+
// // clearTimeout(timeoutMap[userId]);
51+
// // delete timeoutMap[userId];
52+
// } else {
53+
// console.log(`No match for ${userId}, waiting for TTL to expire.`);
54+
55+
// // Set a timeout for rejection only if not matched
56+
// const timeoutId = setTimeout(() => {
57+
// // if (!isMatched) { // Check if matched after timeout
58+
// console.log(`is the user matched upon timeout? ${matched}`);
59+
// console.log(`Rejecting user ${userId} after 10 seconds.`);
60+
// channel.reject(msg, false); // Reject without requeuing
61+
// // }
62+
// }, 10000); // 10 seconds delay
63+
64+
// timeoutMap[userId] = timeoutId;
65+
// }
66+
// }
67+
// });
68+
// }
69+
70+
// console.log("Listening to matchmaking queues");
71+
// } catch (error) {
72+
// console.error('Error consuming RabbitMQ queue:', error);
73+
// }
74+
// }
75+
76+
77+
78+
// using promises to handle errors and ensure clearing of timer.
79+
function matchUsers(channel, msg, userId, language, difficulty) {
80+
const criteriaKey = `${difficulty}.${language}`;
81+
82+
// If the criteria key does not exist, create it
83+
if (!waitingUsers[criteriaKey]) {
84+
waitingUsers[criteriaKey] = [];
85+
}
86+
87+
waitingUsers[criteriaKey].push({ userId, msg }); // Store both userId and the message
88+
console.log(`User ${userId} added to ${criteriaKey}. Waiting list: ${waitingUsers[criteriaKey].length}`);
89+
90+
// Check if there are 2 or more users waiting for this criteria
91+
if (waitingUsers[criteriaKey].length >= 2) {
92+
const matchedUsers = waitingUsers[criteriaKey].splice(0, 2); // Match the first two users
93+
console.log(`Matched users: ${matchedUsers.map(user => user.userId)}`);
94+
95+
// Send match success (this could trigger WebSocket communication)
96+
notifyUsers(matchedUsers.map(user => user.userId));
97+
98+
// Acknowledge the messages for both matched users
99+
matchedUsers.forEach(({ msg }) => {
100+
acknowledgeMessage(channel, msg);
101+
});
102+
103+
return true;
104+
}
105+
106+
return false;
107+
}
108+
109+
async function acknowledgeMessage(channel, msg) {
110+
return new Promise((resolve, reject) => {
111+
try {
112+
channel.ack(msg);
113+
console.log(`Acknowledged message for user: ${JSON.parse(msg.content).userId}`);
114+
clearTimeout(timeoutMap[JSON.parse(msg.content).userId]); // Clear any pending timeout
115+
delete timeoutMap[JSON.parse(msg.content).userId]; // Clean up
116+
resolve();
117+
} catch (error) {
118+
console.error(`Failed to acknowledge message:`, error);
119+
reject(error);
120+
}
121+
});
122+
}
123+
124+
async function rejectMessage(channel, msg, userId) {
125+
return new Promise((resolve, reject) => {
126+
try {
127+
channel.reject(msg, false); // Reject without requeuing
128+
console.log(`Rejected message for user: ${userId}`);
129+
resolve();
130+
} catch (error) {
131+
console.error(`Failed to reject message for user ${userId}:`, error);
132+
reject(error);
133+
}
134+
});
135+
}
136+
10137
async function consumeQueue() {
11138
try {
12139
// Connect
13140
const connection = await amqp.connect(process.env.RABBITMQ_URL);
14141
const channel = await connection.createChannel();
15142

16-
// Queues already created in setup.js
17-
18-
console.log("Waiting for users...")
143+
console.log("Waiting for users...");
19144

20-
// Process + subscribe to each queue
145+
// Process + subscribe to each matchmaking queue
21146
for (let queueName of queueNames) {
22-
await channel.consume(queueName, (msg) => {
147+
await channel.consume(queueName, async (msg) => {
23148
if (msg !== null) {
24149
const userData = JSON.parse(msg.content.toString());
25150
const { userId, language, difficulty } = userData;
26151

27152
// Perform the matching logic
28153
console.log(`Received user ${userId} with ${language} and ${difficulty}`);
29-
matchUsers(userId, language, difficulty);
154+
155+
// Call matchUsers with channel, message, and user details
156+
const matched = matchUsers(channel, msg, userId, language, difficulty);
30157

31-
// E- Acknowledge
32-
channel.ack(msg);
158+
// If not matched, set a timeout for rejection
159+
if (!matched) {
160+
console.log(`No match for ${userId}, waiting for rejection timeout.`);
161+
162+
// Set a timeout for rejection after 10 seconds
163+
const timeoutId = setTimeout(async () => {
164+
await rejectMessage(channel, msg, userId);
165+
}, 10000); // 10 seconds delay
166+
167+
// Store the timeout ID
168+
timeoutMap[userId] = timeoutId;
169+
}
33170
}
34171
});
35172
}
173+
174+
console.log("Listening to matchmaking queues");
36175
} catch (error) {
37176
console.error('Error consuming RabbitMQ queue:', error);
38177
}
39178
}
40179

41-
module.exports = { consumeQueue };
180+
async function consumeDLQ() {
181+
try {
182+
const connection = await amqp.connect(process.env.RABBITMQ_URL);
183+
const channel = await connection.createChannel();
184+
185+
// Consume messages from the DLQ
186+
await channel.consume(dead_letter_queue_name, (msg) => {
187+
if (msg !== null) {
188+
const messageContent = JSON.parse(msg.content.toString());
189+
const { userId, difficulty, language } = messageContent;
190+
191+
console.log(`Received message from DLQ for user: ${userId}`);
192+
193+
// Notify the user via WebSocket
194+
notifyUsers(userId, `Match not found for ${difficulty} ${language}, please try again.`);
195+
196+
// Acknowledge the message (so it's removed from the DLQ)
197+
channel.ack(msg);
198+
}
199+
});
200+
201+
console.log(`Listening to Dead Letter Queue for unmatched users...`);
202+
} catch (error) {
203+
console.error('Error consuming from DLQ:', error);
204+
}
205+
}
206+
207+
module.exports = { consumeQueue, consumeDLQ };

0 commit comments

Comments
 (0)