Skip to content

Commit 31ba28e

Browse files
authored
Merge pull request #29 from CS3219-AY2425S1/update-consumer-logic
Update consumer logic with timeOut
2 parents 4f133b3 + b3c9236 commit 31ba28e

File tree

6 files changed

+163
-95
lines changed

6 files changed

+163
-95
lines changed

Backend/MatchingService/app.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ const express = require('express');
22
const cors = require("cors");
33
const dotenv = require("dotenv");
44
const matchmakingRouter = require("./controllers/matchmaking");
5-
const { consumeQueue } = require('./rabbitmq/subscriber');
5+
const { consumeQueue, consumeDLQ } = require('./rabbitmq/subscriber');
66
const { setupRabbitMQ } = require('./rabbitmq/setup');
77
const { publishToQueue } = require('./rabbitmq/publisher')
88

@@ -21,8 +21,11 @@ app.use('/api/match', matchmakingRouter);
2121
setupRabbitMQ().then(() => {
2222

2323
consumeQueue().catch(console.error);
24-
publishToQueue({userId: "user_234", difficulty: "easy", language: "python"})
25-
publishToQueue({userId: "user_234", difficulty: "easy", language: "java"})
24+
consumeDLQ().catch(console.error);
25+
26+
publishToQueue({userId: "user_1", difficulty: "easy", language: "java"})
27+
publishToQueue({userId: "user_2", difficulty: "easy", language: "python"})
28+
publishToQueue({userId: "user_3", difficulty: "easy", language: "java"})
2629

2730
})
2831

Backend/MatchingService/rabbitmq/publisher.js

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ async function publishToQueue({userId, difficulty, language}) {
88
const channel = await connection.createChannel();
99
const routingKey = `${difficulty}.${language}`;
1010

11-
// Connect to the exchange (just in case it does not exist)
12-
await channel.assertExchange(matching_exchange_name, 'topic', { durable: false });
13-
1411
// Publish the message to the exchange
1512
const messageSent = channel.publish(
1613
matching_exchange_name,
Lines changed: 26 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
const amqp = require("amqplib");
2+
23
const matching_exchange_name = "matching_exchange";
4+
const dead_letter_exchange_name = "dead_letter_exchange";
5+
const dead_letter_queue_name = "dead_letter_queue";
36
const queueNames = [
47
'easy.python',
58
'easy.java',
@@ -10,71 +13,53 @@ const queueNames = [
1013
'hard.python',
1114
'hard.java',
1215
'hard.cplusplus'
13-
]
16+
];
1417

1518
async function setupRabbitMQ() {
1619
try {
17-
const connection = await amqp.connect(process.env.RABBITMQ_URL)
20+
const connection = await amqp.connect(process.env.RABBITMQ_URL);
1821

1922
if (!connection) {
2023
return;
2124
}
2225

2326
const channel = await connection.createChannel();
2427

25-
// Declare matching exchange to be bind to queues
26-
28+
// Declare the matching exchange (topic)
2729
await channel.assertExchange(matching_exchange_name, "topic", { durable: false });
2830

29-
// Declare dead letter exchange
30-
const dead_letter_exchange_name = "dead_letter_exchange";
31+
// Declare the dead-letter exchange (fanout)
3132
await channel.assertExchange(dead_letter_exchange_name, "fanout", { durable: false });
3233

34+
// Declare and bind all main queues with TTL and DLQ bindings
35+
for (let queueName of queueNames) {
36+
await channel.deleteQueue(queueName); // Ensure we start fresh for each setup
3337

38+
await channel.assertQueue(queueName, {
39+
durable: false,
40+
arguments: {
41+
'x-message-ttl': 10000, // 60 seconds TTL
42+
'x-dead-letter-exchange': dead_letter_exchange_name // Bind to dead-letter exchange
43+
}
44+
});
3445

35-
// Create and bind queues to exchange with the routing keys
36-
for (let name of queueNames) {
37-
/*
38-
try {
39-
await channel.deleteQueue(name);
40-
} catch (err) {
41-
console.log(`Queue ${name} does not exist or could not be deleted: ${err.message}`);
42-
}
43-
*/
44-
// this is required to add TTL and dead letter exchange to the queue
45-
await channel.deleteQueue(name);
46-
47-
await channel.assertQueue(name,
48-
{ durable: false, // durable=false ensures queue will survive broker restarts
49-
arguments: {
50-
'x-message-ttl': 60000, // set message time to live to 60 seconds
51-
'x-dead-letter-exchange': dead_letter_exchange_name // set dead letter exchange
52-
}
53-
54-
});
55-
56-
await channel.bindQueue(name, matching_exchange_name, name); // e.g. messages with routing key easy.python goes to easy.python queue
46+
await channel.bindQueue(queueName, matching_exchange_name, queueName); // Bind to exchange
5747
}
5848

59-
// Create and bind queue to exchange (if we want only 1 queue)
60-
// await channel.assertQueue(name, { durable: false })
61-
// await channel.bindQueue(name, matching_exchange_name, '#') // all messages go to this queue because of a wildcard pattern
62-
63-
// Create and bind dead letter queue
64-
// const dead_letter_queue_name = "dead_letter_queue";
65-
// await channel.assertQueue(deadLetterQueueName, { durable: false });
66-
// await channel.bindQueue(deadLetterQueueName, deadLetterExchangeName, ''); // Bind all messages to this queue
49+
// Delete DLQ before asserting it
50+
await channel.deleteQueue(dead_letter_queue_name);
6751

52+
// Declare the dead-letter queue and bind it to the dead-letter exchange
53+
await channel.assertQueue(dead_letter_queue_name, { durable: false });
54+
await channel.bindQueue(dead_letter_queue_name, dead_letter_exchange_name, ''); // Bind with no routing key
6855

69-
console.log("RabbitMQ setup complete with queues and bindings.")
56+
console.log("RabbitMQ setup complete with queues, DLQ, and bindings.");
7057

7158
await channel.close();
7259
await connection.close();
7360
} catch (error) {
74-
console.log('Error setting up RabbitMQ:', error);
61+
console.error("Error setting up RabbitMQ:", error);
7562
}
7663
}
7764

78-
module.exports = { setupRabbitMQ, matching_exchange_name, queueNames };
79-
80-
setupRabbitMQ()
65+
module.exports = { setupRabbitMQ, matching_exchange_name, queueNames, dead_letter_queue_name };
Lines changed: 116 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,147 @@
11
const amqp = require('amqplib');
22
const { queueNames } = require('./setup.js');
3-
const { matchUsers } = require('../services/matchingService.js');
3+
// const { matchUsers } = require('../services/matchingService.js');
4+
const { notifyUsers } = require('../websocket/websocket');
45

56
// TODO: Subscribe and acknowledge messages with user info when timeout/user matched
67

78
// To remember what goes in a subscriber use some Acronym
89
// Connect, Assert, Process, E - for Acknowledge
910

11+
const dead_letter_queue_name = "dead_letter_queue";
12+
const timeoutMap = {};
13+
14+
// Local dictionary to store waiting users
15+
const waitingUsers = {};
16+
17+
// using promises to handle errors and ensure clearing of timer.
18+
function matchUsers(channel, msg, userId, language, difficulty) {
19+
const criteriaKey = `${difficulty}.${language}`;
20+
21+
// If the criteria key does not exist, create it
22+
if (!waitingUsers[criteriaKey]) {
23+
waitingUsers[criteriaKey] = [];
24+
}
25+
26+
waitingUsers[criteriaKey].push({ userId, msg }); // Store both userId and the message
27+
console.log(`User ${userId} added to ${criteriaKey}. Waiting list: ${waitingUsers[criteriaKey].length}`);
28+
29+
// Check if there are 2 or more users waiting for this criteria
30+
if (waitingUsers[criteriaKey].length >= 2) {
31+
const matchedUsers = waitingUsers[criteriaKey].splice(0, 2); // Match the first two users
32+
console.log(`Matched users: ${matchedUsers.map(user => user.userId)}`);
33+
34+
// Send match success (this could trigger WebSocket communication)
35+
// notifyUsers(matchedUsers.map(user => user.userId));
36+
notifyUsers(matchedUsers.map(user => user.userId), 'Match found!', 'match');
37+
38+
// Acknowledge the messages for both matched users
39+
matchedUsers.forEach(({ msg }) => {
40+
acknowledgeMessage(channel, msg);
41+
});
42+
43+
return true;
44+
}
45+
46+
return false;
47+
}
48+
49+
async function acknowledgeMessage(channel, msg) {
50+
return new Promise((resolve, reject) => {
51+
try {
52+
channel.ack(msg);
53+
console.log(`Acknowledged message for user: ${JSON.parse(msg.content).userId}`);
54+
clearTimeout(timeoutMap[JSON.parse(msg.content).userId]); // Clear any pending timeout
55+
delete timeoutMap[JSON.parse(msg.content).userId]; // Clean up
56+
resolve();
57+
} catch (error) {
58+
console.error(`Failed to acknowledge message:`, error);
59+
reject(error);
60+
}
61+
});
62+
}
63+
64+
async function rejectMessage(channel, msg, userId) {
65+
return new Promise((resolve, reject) => {
66+
try {
67+
channel.reject(msg, false); // Reject without requeuing
68+
console.log(`Rejected message for user: ${userId}`);
69+
resolve();
70+
} catch (error) {
71+
console.error(`Failed to reject message for user ${userId}:`, error);
72+
reject(error);
73+
}
74+
});
75+
}
76+
1077
async function consumeQueue() {
1178
try {
1279
// Connect
1380
const connection = await amqp.connect(process.env.RABBITMQ_URL);
1481
const channel = await connection.createChannel();
1582

16-
// Queues already created in setup.js
83+
console.log("Waiting for users...");
1784

18-
console.log("Waiting for users...")
19-
20-
// Process + subscribe to each queue
85+
// Process + subscribe to each matchmaking queue
2186
for (let queueName of queueNames) {
22-
await channel.consume(queueName, (msg) => {
87+
await channel.consume(queueName, async (msg) => {
2388
if (msg !== null) {
2489
const userData = JSON.parse(msg.content.toString());
2590
const { userId, language, difficulty } = userData;
2691

2792
// Perform the matching logic
2893
console.log(`Received user ${userId} with ${language} and ${difficulty}`);
29-
matchUsers(userId, language, difficulty);
94+
95+
// Call matchUsers with channel, message, and user details
96+
const matched = matchUsers(channel, msg, userId, language, difficulty);
97+
98+
// If not matched, set a timeout for rejection
99+
if (!matched) {
100+
console.log(`No match for ${userId}, waiting for rejection timeout.`);
30101

31-
// E- Acknowledge
32-
channel.ack(msg);
102+
// Set a timeout for rejection after 10 seconds
103+
const timeoutId = setTimeout(async () => {
104+
await rejectMessage(channel, msg, userId);
105+
}, 10000); // 10 seconds delay
106+
107+
// Store the timeout ID
108+
timeoutMap[userId] = timeoutId;
109+
}
33110
}
34111
});
35112
}
113+
114+
console.log("Listening to matchmaking queues");
36115
} catch (error) {
37116
console.error('Error consuming RabbitMQ queue:', error);
38117
}
39118
}
40119

41-
module.exports = { consumeQueue };
120+
async function consumeDLQ() {
121+
try {
122+
const connection = await amqp.connect(process.env.RABBITMQ_URL);
123+
const channel = await connection.createChannel();
124+
125+
// Consume messages from the DLQ
126+
await channel.consume(dead_letter_queue_name, (msg) => {
127+
if (msg !== null) {
128+
const messageContent = JSON.parse(msg.content.toString());
129+
const { userId, difficulty, language } = messageContent;
130+
131+
console.log(`Received message from DLQ for user: ${userId}`);
132+
133+
// Notify the user via WebSocket
134+
notifyUsers(userId, `Match not found for ${difficulty} ${language}, please try again.`, 'rejection');
135+
136+
// Acknowledge the message (so it's removed from the DLQ)
137+
channel.ack(msg);
138+
}
139+
});
140+
141+
console.log(`Listening to Dead Letter Queue for unmatched users...`);
142+
} catch (error) {
143+
console.error('Error consuming from DLQ:', error);
144+
}
145+
}
146+
147+
module.exports = { consumeQueue, consumeDLQ };

Backend/MatchingService/services/matchingService.js

Lines changed: 0 additions & 32 deletions
This file was deleted.

Backend/MatchingService/websocket/websocket.js

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
// TODO: Write socket logic to connect backend to frontend here
2-
3-
/*
41
const WebSocket = require('ws');
52
const wss = new WebSocket.Server({ port: 8080 });
63

@@ -16,13 +13,25 @@ wss.on('connection', (ws) => {
1613
});
1714
});
1815

19-
function notifyUsers(userId, message) {
16+
/**
17+
* Notify users through WebSocket.
18+
* @param {string|array} userId - User ID or an array of user IDs to notify.
19+
* @param {string} message - The message to send.
20+
* @param {string} type - The type of message (e.g., 'match' or 'rejection').
21+
*/
22+
function notifyUsers(userId, message, type) {
23+
console.log(`Notifying user: ${userId}, Message: ${message}, Type: ${type}`); // Log message details
24+
2025
wss.clients.forEach((client) => {
2126
if (client.readyState === WebSocket.OPEN) {
22-
client.send(JSON.stringify({ userId, message }));
27+
// Construct the payload to include userId, message, and type
28+
client.send(JSON.stringify({
29+
userId,
30+
message,
31+
type // This allows the frontend to differentiate between match and rejection messages
32+
}));
2333
}
2434
});
2535
}
2636

2737
module.exports = { notifyUsers };
28-
*/

0 commit comments

Comments
 (0)