Skip to content

Commit 4d84d23

Browse files
authored
Merge pull request #33 from CS3219-AY2425S1/websocket-implementation
Websocket implementation and Matching UI
2 parents 31ba28e + 011b606 commit 4d84d23

File tree

16 files changed

+479
-55
lines changed

16 files changed

+479
-55
lines changed

Backend/MatchingService/Dockerfile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM node:20
2+
3+
WORKDIR /app
4+
5+
COPY package*.json ./
6+
RUN npm install
7+
8+
COPY . .
9+
10+
EXPOSE 3003
11+
CMD ["npm", "start"]

Backend/MatchingService/app.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ const dotenv = require("dotenv");
44
const matchmakingRouter = require("./controllers/matchmaking");
55
const { consumeQueue, consumeDLQ } = require('./rabbitmq/subscriber');
66
const { setupRabbitMQ } = require('./rabbitmq/setup');
7-
const { publishToQueue } = require('./rabbitmq/publisher')
87

98
dotenv.config();
109

@@ -23,9 +22,9 @@ setupRabbitMQ().then(() => {
2322
consumeQueue().catch(console.error);
2423
consumeDLQ().catch(console.error);
2524

26-
publishToQueue({userId: "user_1", difficulty: "easy", language: "java"})
27-
publishToQueue({userId: "user_2", difficulty: "easy", language: "python"})
28-
publishToQueue({userId: "user_3", difficulty: "easy", language: "java"})
25+
// publishToQueue({userId: "user_1", difficulty: "easy", language: "java"})
26+
// publishToQueue({userId: "user_2", difficulty: "easy", language: "python"})
27+
// publishToQueue({userId: "user_3", difficulty: "easy", language: "java"})
2928

3029
})
3130

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,24 @@
11
const amqp = require('amqplib');
2-
// Connect -> Exchange -> Queue -> Bind -> Publish
32
const { matching_exchange_name } = require('./setup.js');
43

4+
let channel = null; // Store a persistent channel connection
5+
6+
async function connectToRabbitMQ() {
7+
if (!channel) {
8+
try {
9+
const connection = await amqp.connect(process.env.RABBITMQ_URL);
10+
channel = await connection.createChannel();
11+
console.log("RabbitMQ channel created");
12+
} catch (error) {
13+
console.error('Error creating RabbitMQ channel:', error);
14+
}
15+
}
16+
return channel;
17+
}
18+
519
async function publishToQueue({userId, difficulty, language}) {
620
try {
7-
const connection = await amqp.connect(process.env.RABBITMQ_URL);
8-
const channel = await connection.createChannel();
21+
const channel = await connectToRabbitMQ(); // Reuse persistent connection
922
const routingKey = `${difficulty}.${language}`;
1023

1124
// Publish the message to the exchange
@@ -20,18 +33,31 @@ async function publishToQueue({userId, difficulty, language}) {
2033
} else {
2134
console.error(`Message NOT sent: ${userId} -> ${routingKey}`);
2235
}
23-
24-
await channel.close();
25-
await connection.close();
2636
} catch (error) {
2737
console.error('Error publishing to RabbitMQ:', error);
2838
}
2939
}
3040

31-
module.exports = { publishToQueue };
32-
33-
34-
41+
async function publishCancelRequest({ userId }) {
42+
try {
43+
const channel = await connectToRabbitMQ(); // Reuse persistent connection
44+
const routingKey = 'cancel'; // Define a routing key for cancellation
3545

46+
// Publish the cancel message to the exchange
47+
const messageSent = channel.publish(
48+
matching_exchange_name,
49+
routingKey,
50+
Buffer.from(JSON.stringify({ userId }))
51+
);
3652

53+
if (messageSent) {
54+
console.log(`Cancel request sent: ${userId}`);
55+
} else {
56+
console.error(`Cancel request NOT sent: ${userId}`);
57+
}
58+
} catch (error) {
59+
console.error('Error publishing cancel request to RabbitMQ:', error);
60+
}
61+
}
3762

63+
module.exports = { publishToQueue, publishCancelRequest };

Backend/MatchingService/rabbitmq/setup.js

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const amqp = require("amqplib");
33
const matching_exchange_name = "matching_exchange";
44
const dead_letter_exchange_name = "dead_letter_exchange";
55
const dead_letter_queue_name = "dead_letter_queue";
6+
const cancel_queue_name = "cancel_queue";
67
const queueNames = [
78
'easy.python',
89
'easy.java',
@@ -12,7 +13,7 @@ const queueNames = [
1213
'medium.cplusplus',
1314
'hard.python',
1415
'hard.java',
15-
'hard.cplusplus'
16+
'hard.cplusplus',
1617
];
1718

1819
async function setupRabbitMQ() {
@@ -26,17 +27,17 @@ async function setupRabbitMQ() {
2627
const channel = await connection.createChannel();
2728

2829
// Declare the matching exchange (topic)
29-
await channel.assertExchange(matching_exchange_name, "topic", { durable: false });
30+
await channel.assertExchange(matching_exchange_name, "topic", { durable: true });
3031

3132
// Declare the dead-letter exchange (fanout)
32-
await channel.assertExchange(dead_letter_exchange_name, "fanout", { durable: false });
33+
await channel.assertExchange(dead_letter_exchange_name, "fanout", { durable: true });
3334

3435
// Declare and bind all main queues with TTL and DLQ bindings
3536
for (let queueName of queueNames) {
3637
await channel.deleteQueue(queueName); // Ensure we start fresh for each setup
3738

3839
await channel.assertQueue(queueName, {
39-
durable: false,
40+
durable: true,
4041
arguments: {
4142
'x-message-ttl': 10000, // 60 seconds TTL
4243
'x-dead-letter-exchange': dead_letter_exchange_name // Bind to dead-letter exchange
@@ -50,9 +51,14 @@ async function setupRabbitMQ() {
5051
await channel.deleteQueue(dead_letter_queue_name);
5152

5253
// Declare the dead-letter queue and bind it to the dead-letter exchange
53-
await channel.assertQueue(dead_letter_queue_name, { durable: false });
54+
await channel.assertQueue(dead_letter_queue_name, { durable: true });
5455
await channel.bindQueue(dead_letter_queue_name, dead_letter_exchange_name, ''); // Bind with no routing key
5556

57+
// Declare and bind the cancel queue
58+
await channel.deleteQueue(cancel_queue_name); // Delete any existing cancel queue
59+
await channel.assertQueue(cancel_queue_name, { durable: true }); // Declare the cancel queue
60+
await channel.bindQueue(cancel_queue_name, matching_exchange_name, 'cancel'); // Bind with the "cancel" routing key
61+
5662
console.log("RabbitMQ setup complete with queues, DLQ, and bindings.");
5763

5864
await channel.close();
@@ -62,4 +68,4 @@ async function setupRabbitMQ() {
6268
}
6369
}
6470

65-
module.exports = { setupRabbitMQ, matching_exchange_name, queueNames, dead_letter_queue_name };
71+
module.exports = { setupRabbitMQ, matching_exchange_name, queueNames, dead_letter_queue_name , cancel_queue_name};

Backend/MatchingService/rabbitmq/subscriber.js

Lines changed: 105 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,20 @@ function matchUsers(channel, msg, userId, language, difficulty) {
2323
waitingUsers[criteriaKey] = [];
2424
}
2525

26-
waitingUsers[criteriaKey].push({ userId, msg }); // Store both userId and the message
26+
// Store both the userId, message, and the channel in waitingUsers
27+
waitingUsers[criteriaKey].push({ userId, msg, channel });
2728
console.log(`User ${userId} added to ${criteriaKey}. Waiting list: ${waitingUsers[criteriaKey].length}`);
2829

2930
// Check if there are 2 or more users waiting for this criteria
3031
if (waitingUsers[criteriaKey].length >= 2) {
3132
const matchedUsers = waitingUsers[criteriaKey].splice(0, 2); // Match the first two users
3233
console.log(`Matched users: ${matchedUsers.map(user => user.userId)}`);
3334

34-
// Send match success (this could trigger WebSocket communication)
35-
// notifyUsers(matchedUsers.map(user => user.userId));
35+
// Notify users of the match
3636
notifyUsers(matchedUsers.map(user => user.userId), 'Match found!', 'match');
3737

3838
// Acknowledge the messages for both matched users
39-
matchedUsers.forEach(({ msg }) => {
39+
matchedUsers.forEach(({ msg, channel }) => {
4040
acknowledgeMessage(channel, msg);
4141
});
4242

@@ -46,6 +46,7 @@ function matchUsers(channel, msg, userId, language, difficulty) {
4646
return false;
4747
}
4848

49+
4950
async function acknowledgeMessage(channel, msg) {
5051
return new Promise((resolve, reject) => {
5152
try {
@@ -64,19 +65,46 @@ async function acknowledgeMessage(channel, msg) {
6465
async function rejectMessage(channel, msg, userId) {
6566
return new Promise((resolve, reject) => {
6667
try {
68+
// Get user data from the message to find the correct key in waitingUsers
69+
const userData = JSON.parse(msg.content.toString());
70+
const { language, difficulty } = userData;
71+
72+
// Correctly creating the criteriaKey using template literals
73+
const criteriaKey = `${difficulty}.${language}`;
74+
75+
76+
// Find the user in the waitingUsers list and remove them
77+
if (waitingUsers[criteriaKey]) {
78+
// Find the index of the user in the waiting list
79+
const userIndex = waitingUsers[criteriaKey].findIndex(user => user.userId === userId);
80+
81+
if (userIndex !== -1) {
82+
// Remove the user from the waiting list
83+
waitingUsers[criteriaKey].splice(userIndex, 1);
84+
console.log(`Removed user ${userId} from waiting list for ${criteriaKey}`);
85+
}
86+
}
87+
88+
// Reject the message without requeuing
6789
channel.reject(msg, false); // Reject without requeuing
6890
console.log(`Rejected message for user: ${userId}`);
91+
92+
// Clean up the timeoutMap
93+
if (timeoutMap[userId]) {
94+
clearTimeout(timeoutMap[userId]);
95+
delete timeoutMap[userId];
96+
}
97+
6998
resolve();
7099
} catch (error) {
71-
console.error(`Failed to reject message for user ${userId}:`, error);
100+
console.error(`Failed to reject message for user ${userId}:, error`);
72101
reject(error);
73102
}
74103
});
75104
}
76105

77106
async function consumeQueue() {
78107
try {
79-
// Connect
80108
const connection = await amqp.connect(process.env.RABBITMQ_URL);
81109
const channel = await connection.createChannel();
82110

@@ -95,23 +123,23 @@ async function consumeQueue() {
95123
// Call matchUsers with channel, message, and user details
96124
const matched = matchUsers(channel, msg, userId, language, difficulty);
97125

98-
// If not matched, set a timeout for rejection
99126
if (!matched) {
100127
console.log(`No match for ${userId}, waiting for rejection timeout.`);
101128

102-
// Set a timeout for rejection after 10 seconds
103129
const timeoutId = setTimeout(async () => {
104130
await rejectMessage(channel, msg, userId);
105131
}, 10000); // 10 seconds delay
106132

107-
// Store the timeout ID
108133
timeoutMap[userId] = timeoutId;
109134
}
110135
}
111-
});
136+
}, { noAck: false }); // Ensure manual acknowledgment
112137
}
113138

114139
console.log("Listening to matchmaking queues");
140+
141+
await consumeCancelQueue();
142+
console.log("Listening to Cancel Queue");
115143
} catch (error) {
116144
console.error('Error consuming RabbitMQ queue:', error);
117145
}
@@ -144,4 +172,71 @@ async function consumeDLQ() {
144172
}
145173
}
146174

175+
async function consumeCancelQueue() {
176+
try {
177+
const connection = await amqp.connect(process.env.RABBITMQ_URL);
178+
const channel = await connection.createChannel();
179+
180+
// Subscribe to the cancel queue
181+
await channel.consume('cancel_queue', async (msg) => {
182+
if (msg !== null) {
183+
const { userId } = JSON.parse(msg.content.toString());
184+
console.log(`Received cancel request for user: ${userId}`);
185+
186+
// Process the cancel request
187+
await cancelMatching(channel, msg, userId);
188+
}
189+
}, { noAck: false }); // Ensure manual acknowledgment
190+
191+
console.log("Listening for cancel requests");
192+
} catch (error) {
193+
console.error('Error consuming cancel queue:', error);
194+
}
195+
}
196+
197+
async function cancelMatching(cancelChannel, cancelMsg, userId) {
198+
try {
199+
let foundOriginalMsg = false;
200+
201+
// Loop through waitingUsers to find the original message for the user
202+
Object.keys(waitingUsers).forEach(criteriaKey => {
203+
const userIndex = waitingUsers[criteriaKey].findIndex(user => user.userId === userId);
204+
205+
if (userIndex !== -1) {
206+
const { msg, channel } = waitingUsers[criteriaKey][userIndex]; // Get original msg and its channel
207+
208+
// Acknowledge the original matchmaking message from the queue (e.g., easy.python)
209+
if (msg && channel) {
210+
console.log(`Acknowledging original message for user ${userId} in queue ${criteriaKey}`);
211+
channel.ack(msg); // Use the same channel that consumed the message to acknowledge it
212+
foundOriginalMsg = true;
213+
}
214+
215+
// Remove the user from the waiting list
216+
waitingUsers[criteriaKey].splice(userIndex, 1);
217+
console.log(`User ${userId} removed from waiting list for ${criteriaKey}`);
218+
}
219+
});
220+
221+
// If original message not found, log a warning
222+
if (!foundOriginalMsg) {
223+
console.warn(`Original message for user ${userId} not found in matchmaking queues.`);
224+
}
225+
226+
// Clear any timeouts for the user
227+
if (timeoutMap[userId]) {
228+
clearTimeout(timeoutMap[userId]);
229+
delete timeoutMap[userId];
230+
}
231+
232+
// Acknowledge the cancel message from the cancel queue
233+
cancelChannel.ack(cancelMsg);
234+
console.log(`Cancel processed for user ${userId}`);
235+
236+
} catch (error) {
237+
console.error(`Failed to process cancel for user ${userId}:`, error);
238+
}
239+
}
240+
241+
147242
module.exports = { consumeQueue, consumeDLQ };

0 commit comments

Comments
 (0)