Skip to content

Commit d88dd8a

Browse files
authored
Merge pull request #34 from CS3219-AY2425S1/d4-make-producer-consumer-queues
Merge D4 code into main branch
2 parents ba763ec + 4d84d23 commit d88dd8a

File tree

21 files changed

+712
-162
lines changed

21 files changed

+712
-162
lines changed

Backend/MatchingService/Dockerfile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM node:20
2+
3+
WORKDIR /app
4+
5+
COPY package*.json ./
6+
RUN npm install
7+
8+
COPY . .
9+
10+
EXPOSE 3003
11+
CMD ["npm", "start"]

Backend/MatchingService/app.js

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,32 @@
11
const express = require('express');
22
const cors = require("cors");
33
const dotenv = require("dotenv");
4-
//const matchmakingRouter = require("./controllers/matchmaking");
5-
//const { consumeQueue } = require('./rabbitmq/subscriber');
6-
//const { setupRabbitMQ } = require('./rabbitmq/setup');
7-
//const { publishToQueue } = require('./rabbitmq/publisher')
4+
const matchmakingRouter = require("./controllers/matchmaking");
5+
const { consumeQueue, consumeDLQ } = require('./rabbitmq/subscriber');
6+
const { setupRabbitMQ } = require('./rabbitmq/setup');
87

98
dotenv.config();
109

1110
const app = express();
1211
app.use(cors());
1312
app.use(express.json());
13+
app.use(express.urlencoded({ extended: true }));
14+
app.options("*", cors());
1415

1516
app.use('/api/match', matchmakingRouter);
1617

1718
// TODO: Start consuming RabbitMQ queues
18-
/*
19+
1920
setupRabbitMQ().then(() => {
2021

2122
consumeQueue().catch(console.error);
22-
publishToQueue("user_234", "easy", "python")
23-
publishToQueue("user_100", "easy", "java")
23+
consumeDLQ().catch(console.error);
24+
25+
// publishToQueue({userId: "user_1", difficulty: "easy", language: "java"})
26+
// publishToQueue({userId: "user_2", difficulty: "easy", language: "python"})
27+
// publishToQueue({userId: "user_3", difficulty: "easy", language: "java"})
2428

2529
})
26-
*/
30+
2731

2832
module.exports = app;
Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,37 @@
1-
// TODO: WRITE API FOR MATCHING USER, REMEMBER TO DEAL WITH CORS ALLOW ACCESS ORIGIN ERROR
1+
// WRITE API FOR MATCHING USER, REMEMBER TO DEAL WITH CORS ALLOW ACCESS ORIGIN ERROR
2+
// Cors settled in app.js
3+
24

3-
/*
45
const express = require('express');
56
const router = express.Router();
67
const { publishToQueue } = require('../rabbitmq/publisher');
78

89
// Route for frontend to send user matching info
9-
router.post('/match', async (req, res) => {
10-
const { userId, language, difficulty } = req.body;
10+
router.post('/enterMatchmaking', async (req, res) => {
11+
const { userId, difficulty, language } = req.body;
1112

1213
try {
1314
// Publish user info to RabbitMQ
14-
await publishToQueue(userId, language, difficulty);
15+
await publishToQueue({userId: userId, difficulty: difficulty, language: language});
1516
res.status(200).send('User info sent for matching.');
1617
} catch (error) {
1718
console.error('Error publishing user info:', error);
1819
res.status(500).send('Error in matchmaking process.');
1920
}
2021
});
2122

22-
module.exports = router;
23-
*/
23+
// This is for the alternative where the player also listens to a queue after entering matchmaking
24+
/*
25+
router.post('/waitMatch', async (req, res) => {
26+
try {
27+
// Start consuming RabbitMQ queues
28+
// await consumeQueue();
29+
res.status(200).send('Waiting for match...');
30+
} catch (error) {
31+
console.error('Error consuming RabbitMQ queue:', error);
32+
res.status(500).send('Error in matchmaking process.');
33+
}
34+
})
35+
*/
36+
37+
module.exports = router;

Backend/MatchingService/package-lock.json

Lines changed: 22 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Backend/MatchingService/package.json

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
"amqplib": "^0.10.4",
44
"cors": "^2.8.5",
55
"dotenv": "^16.4.5",
6-
"express": "^4.21.1"
6+
"express": "^4.21.1",
7+
"ws": "^8.18.0"
8+
},
9+
"scripts": {
10+
"test": "echo \"Error: no test specified\" && exit 1",
11+
"start": "node index.js"
712
}
813
}
Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,63 @@
11
const amqp = require('amqplib');
2-
// TODO: Write function to publish to rabbitMQ
2+
const { matching_exchange_name } = require('./setup.js');
3+
4+
let channel = null; // Store a persistent channel connection
5+
6+
async function connectToRabbitMQ() {
7+
if (!channel) {
8+
try {
9+
const connection = await amqp.connect(process.env.RABBITMQ_URL);
10+
channel = await connection.createChannel();
11+
console.log("RabbitMQ channel created");
12+
} catch (error) {
13+
console.error('Error creating RabbitMQ channel:', error);
14+
}
15+
}
16+
return channel;
17+
}
318

4-
/*
5-
async function publishToQueue(userId, difficulty, language) {
19+
async function publishToQueue({userId, difficulty, language}) {
620
try {
7-
const connection = await amqp.connect(process.env.RABBITMQ_URL);
8-
const channel = await connection.createChannel();
9-
const matching_exchange_name = 'matching_exchange';
21+
const channel = await connectToRabbitMQ(); // Reuse persistent connection
1022
const routingKey = `${difficulty}.${language}`;
11-
const queueName = `${difficulty}.${language}`;
1223

13-
if (queueInfo) {
14-
channel.publish(matching_exchange_name, routingKey, Buffer.from(JSON.stringify({ userId, language, difficulty })));
24+
// Publish the message to the exchange
25+
const messageSent = channel.publish(
26+
matching_exchange_name,
27+
routingKey,
28+
Buffer.from(JSON.stringify({ userId, difficulty, language }))
29+
);
1530

16-
console.log(`Published user: ${userId} with routing key: ${routingKey}`);
31+
if (messageSent) {
32+
console.log(`Message sent: ${userId} -> ${routingKey}`);
1733
} else {
18-
console.log(`Cannot publish message: Queue ${queueName} does not exist`);
34+
console.error(`Message NOT sent: ${userId} -> ${routingKey}`);
1935
}
20-
21-
22-
await channel.close();
23-
await connection.close();
2436
} catch (error) {
2537
console.error('Error publishing to RabbitMQ:', error);
2638
}
2739
}
2840

29-
module.exports = { publishToQueue };
30-
*/
31-
32-
33-
34-
35-
41+
async function publishCancelRequest({ userId }) {
42+
try {
43+
const channel = await connectToRabbitMQ(); // Reuse persistent connection
44+
const routingKey = 'cancel'; // Define a routing key for cancellation
45+
46+
// Publish the cancel message to the exchange
47+
const messageSent = channel.publish(
48+
matching_exchange_name,
49+
routingKey,
50+
Buffer.from(JSON.stringify({ userId }))
51+
);
52+
53+
if (messageSent) {
54+
console.log(`Cancel request sent: ${userId}`);
55+
} else {
56+
console.error(`Cancel request NOT sent: ${userId}`);
57+
}
58+
} catch (error) {
59+
console.error('Error publishing cancel request to RabbitMQ:', error);
60+
}
61+
}
3662

63+
module.exports = { publishToQueue, publishCancelRequest };
Lines changed: 44 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,71 @@
11
const amqp = require("amqplib");
22

3+
const matching_exchange_name = "matching_exchange";
4+
const dead_letter_exchange_name = "dead_letter_exchange";
5+
const dead_letter_queue_name = "dead_letter_queue";
6+
const cancel_queue_name = "cancel_queue";
7+
const queueNames = [
8+
'easy.python',
9+
'easy.java',
10+
'easy.cplusplus',
11+
'medium.python',
12+
'medium.java',
13+
'medium.cplusplus',
14+
'hard.python',
15+
'hard.java',
16+
'hard.cplusplus',
17+
];
18+
319
async function setupRabbitMQ() {
420
try {
5-
const connection = await amqp.connect(process.env.RABBITMQ_URL)
6-
.catch((error) => {
7-
console.error("Error connecting to RabbitMQ:", error);
8-
return null;
9-
});
21+
const connection = await amqp.connect(process.env.RABBITMQ_URL);
1022

1123
if (!connection) {
1224
return;
1325
}
1426

1527
const channel = await connection.createChannel();
1628

17-
// Declare matching exchange to be bind to queues
18-
const matching_exchange_name = "matching_exchange";
19-
await channel.assertExchange(matching_exchange_name, "topic", { durable: false });
29+
// Declare the matching exchange (topic)
30+
await channel.assertExchange(matching_exchange_name, "topic", { durable: true });
2031

21-
// Declare dead letter exchange
22-
const dead_letter_exchange_name = "dead_letter_exchange";
23-
await channel.assertExchange(dead_letter_exchange_name, "fanout", { durable: false });
32+
// Declare the dead-letter exchange (fanout)
33+
await channel.assertExchange(dead_letter_exchange_name, "fanout", { durable: true });
2434

25-
const queueNames = [
26-
'easy.python',
27-
'easy.java',
28-
'easy.cplusplus',
29-
'medium.python',
30-
'medium.java',
31-
'medium.cplusplus',
32-
'hard.python',
33-
'hard.java',
34-
'hard.cplusplus'
35-
]
35+
// Declare and bind all main queues with TTL and DLQ bindings
36+
for (let queueName of queueNames) {
37+
await channel.deleteQueue(queueName); // Ensure we start fresh for each setup
3638

37-
// Create and bind queues to exchange with the routing keys
38-
for (let name of queueNames) {
39-
/*
40-
try {
41-
await channel.deleteQueue(name);
42-
} catch (err) {
43-
console.log(`Queue ${name} does not exist or could not be deleted: ${err.message}`);
44-
}
45-
*/
46-
await channel.assertQueue(name,
47-
{ durable: false, // durable=false ensures queue will survive broker restarts
48-
arguments: {
49-
'x-dead-letter-exchange': dead_letter_exchange_name // set dead letter exchange
50-
}
51-
52-
});
39+
await channel.assertQueue(queueName, {
40+
durable: true,
41+
arguments: {
42+
'x-message-ttl': 10000, // 60 seconds TTL
43+
'x-dead-letter-exchange': dead_letter_exchange_name // Bind to dead-letter exchange
44+
}
45+
});
5346

54-
await channel.bindQueue(name, matching_exchange_name, name); // e.g. messages with routing key easy.python goes to easy.python queue
47+
await channel.bindQueue(queueName, matching_exchange_name, queueName); // Bind to exchange
5548
}
5649

57-
// Create and bind queue to exchange (if we want only 1 queue)
58-
// await channel.assertQueue(name, { durable: false })
59-
// await channel.bindQueue(name, matching_exchange_name, '#') // all messages go to this queue because of a wildcard pattern
50+
// Delete DLQ before asserting it
51+
await channel.deleteQueue(dead_letter_queue_name);
6052

61-
// Create and bind dead letter queue
62-
// const dead_letter_queue_name = "dead_letter_queue";
63-
// await channel.assertQueue(deadLetterQueueName, { durable: false });
64-
// await channel.bindQueue(deadLetterQueueName, deadLetterExchangeName, ''); // Bind all messages to this queue
53+
// Declare the dead-letter queue and bind it to the dead-letter exchange
54+
await channel.assertQueue(dead_letter_queue_name, { durable: true });
55+
await channel.bindQueue(dead_letter_queue_name, dead_letter_exchange_name, ''); // Bind with no routing key
6556

57+
// Declare and bind the cancel queue
58+
await channel.deleteQueue(cancel_queue_name); // Delete any existing cancel queue
59+
await channel.assertQueue(cancel_queue_name, { durable: true }); // Declare the cancel queue
60+
await channel.bindQueue(cancel_queue_name, matching_exchange_name, 'cancel'); // Bind with the "cancel" routing key
6661

67-
console.log("RabbitMQ setup complete with queues and bindings.")
62+
console.log("RabbitMQ setup complete with queues, DLQ, and bindings.");
6863

6964
await channel.close();
7065
await connection.close();
7166
} catch (error) {
72-
console.log('Error setting up RabbitMQ:', error);
67+
console.error("Error setting up RabbitMQ:", error);
7368
}
7469
}
7570

76-
module.exports = { setupRabbitMQ };
77-
78-
setupRabbitMQ()
71+
module.exports = { setupRabbitMQ, matching_exchange_name, queueNames, dead_letter_queue_name , cancel_queue_name};

0 commit comments

Comments
 (0)