Skip to content

Commit f88dfb4

Browse files
committed
Setup Producer and Subscriber logic
Include frontend call to enterMatchmaking Add required dependencies to package.json
1 parent 66a1743 commit f88dfb4

File tree

9 files changed

+116
-58
lines changed

9 files changed

+116
-58
lines changed

Backend/MatchingService/app.js

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,30 @@
11
const express = require('express');
22
const cors = require("cors");
33
const dotenv = require("dotenv");
4-
//const matchmakingRouter = require("./controllers/matchmaking");
5-
//const { consumeQueue } = require('./rabbitmq/subscriber');
6-
//const { setupRabbitMQ } = require('./rabbitmq/setup');
7-
//const { publishToQueue } = require('./rabbitmq/publisher')
4+
const matchmakingRouter = require("./controllers/matchmaking");
5+
const { consumeQueue } = require('./rabbitmq/subscriber');
6+
const { setupRabbitMQ } = require('./rabbitmq/setup');
7+
const { publishToQueue } = require('./rabbitmq/publisher')
88

99
dotenv.config();
1010

1111
const app = express();
1212
app.use(cors());
1313
app.use(express.json());
14+
app.use(express.urlencoded({ extended: true }));
15+
app.options("*", cors());
1416

1517
app.use('/api/match', matchmakingRouter);
1618

1719
// TODO: Start consuming RabbitMQ queues
18-
/*
20+
1921
setupRabbitMQ().then(() => {
2022

2123
consumeQueue().catch(console.error);
22-
publishToQueue("user_234", "easy", "python")
23-
publishToQueue("user_100", "easy", "java")
24+
publishToQueue({userId: "user_234", difficulty: "easy", language: "python"})
25+
publishToQueue({userId: "user_234", difficulty: "easy", language: "java"})
2426

2527
})
26-
*/
28+
2729

2830
module.exports = app;
Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,37 @@
1-
// TODO: WRITE API FOR MATCHING USER, REMEMBER TO DEAL WITH CORS ALLOW ACCESS ORIGIN ERROR
1+
// WRITE API FOR MATCHING USER, REMEMBER TO DEAL WITH CORS ALLOW ACCESS ORIGIN ERROR
2+
// Cors settled in app.js
3+
24

3-
/*
45
const express = require('express');
56
const router = express.Router();
67
const { publishToQueue } = require('../rabbitmq/publisher');
78

89
// Route for frontend to send user matching info
9-
router.post('/match', async (req, res) => {
10-
const { userId, language, difficulty } = req.body;
10+
router.post('/enterMatchmaking', async (req, res) => {
11+
const { userId, difficulty, language } = req.body;
1112

1213
try {
1314
// Publish user info to RabbitMQ
14-
await publishToQueue(userId, language, difficulty);
15+
await publishToQueue({userId: userId, difficulty: difficulty, language: language});
1516
res.status(200).send('User info sent for matching.');
1617
} catch (error) {
1718
console.error('Error publishing user info:', error);
1819
res.status(500).send('Error in matchmaking process.');
1920
}
2021
});
2122

22-
module.exports = router;
23-
*/
23+
// This is for the alternative where the player also listens to a queue after entering matchmaking
24+
/*
25+
router.post('/waitMatch', async (req, res) => {
26+
try {
27+
// Start consuming RabbitMQ queues
28+
// await consumeQueue();
29+
res.status(200).send('Waiting for match...');
30+
} catch (error) {
31+
console.error('Error consuming RabbitMQ queue:', error);
32+
res.status(500).send('Error in matchmaking process.');
33+
}
34+
})
35+
*/
36+
37+
module.exports = router;

Backend/MatchingService/package-lock.json

Lines changed: 22 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Backend/MatchingService/package.json

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
"amqplib": "^0.10.4",
44
"cors": "^2.8.5",
55
"dotenv": "^16.4.5",
6-
"express": "^4.21.1"
6+
"express": "^4.21.1",
7+
"ws": "^8.18.0"
8+
},
9+
"scripts": {
10+
"test": "echo \"Error: no test specified\" && exit 1",
11+
"start": "node index.js"
712
}
813
}

Backend/MatchingService/rabbitmq/publisher.js

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,29 @@
11
const amqp = require('amqplib');
2-
// TODO: Write function to publish to rabbitMQ
2+
// Connect -> Exchange -> Queue -> Bind -> Publish
3+
const { matching_exchange_name } = require('./setup.js');
34

4-
/*
5-
async function publishToQueue(userId, difficulty, language) {
5+
async function publishToQueue({userId, difficulty, language}) {
66
try {
77
const connection = await amqp.connect(process.env.RABBITMQ_URL);
88
const channel = await connection.createChannel();
9-
const matching_exchange_name = 'matching_exchange';
109
const routingKey = `${difficulty}.${language}`;
11-
const queueName = `${difficulty}.${language}`;
1210

13-
if (queueInfo) {
14-
channel.publish(matching_exchange_name, routingKey, Buffer.from(JSON.stringify({ userId, language, difficulty })));
11+
// Connect to the exchange (just in case it does not exist)
12+
await channel.assertExchange(matching_exchange_name, 'topic', { durable: false });
1513

16-
console.log(`Published user: ${userId} with routing key: ${routingKey}`);
14+
// Publish the message to the exchange
15+
const messageSent = channel.publish(
16+
matching_exchange_name,
17+
routingKey,
18+
Buffer.from(JSON.stringify({ userId, difficulty, language }))
19+
);
20+
21+
if (messageSent) {
22+
console.log(`Message sent: ${userId} -> ${routingKey}`);
1723
} else {
18-
console.log(`Cannot publish message: Queue ${queueName} does not exist`);
24+
console.error(`Message NOT sent: ${userId} -> ${routingKey}`);
1925
}
2026

21-
2227
await channel.close();
2328
await connection.close();
2429
} catch (error) {
@@ -27,7 +32,6 @@ async function publishToQueue(userId, difficulty, language) {
2732
}
2833

2934
module.exports = { publishToQueue };
30-
*/
3135

3236

3337

Backend/MatchingService/rabbitmq/setup.js

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
11
const amqp = require("amqplib");
2+
const matching_exchange_name = "matching_exchange";
3+
const queueNames = [
4+
'easy.python',
5+
'easy.java',
6+
'easy.cplusplus',
7+
'medium.python',
8+
'medium.java',
9+
'medium.cplusplus',
10+
'hard.python',
11+
'hard.java',
12+
'hard.cplusplus'
13+
]
214

315
async function setupRabbitMQ() {
416
try {
517
const connection = await amqp.connect(process.env.RABBITMQ_URL)
6-
.catch((error) => {
7-
console.error("Error connecting to RabbitMQ:", error);
8-
return null;
9-
});
1018

1119
if (!connection) {
1220
return;
@@ -15,24 +23,14 @@ async function setupRabbitMQ() {
1523
const channel = await connection.createChannel();
1624

1725
// Declare matching exchange to be bind to queues
18-
const matching_exchange_name = "matching_exchange";
26+
1927
await channel.assertExchange(matching_exchange_name, "topic", { durable: false });
2028

2129
// Declare dead letter exchange
2230
const dead_letter_exchange_name = "dead_letter_exchange";
2331
await channel.assertExchange(dead_letter_exchange_name, "fanout", { durable: false });
2432

25-
const queueNames = [
26-
'easy.python',
27-
'easy.java',
28-
'easy.cplusplus',
29-
'medium.python',
30-
'medium.java',
31-
'medium.cplusplus',
32-
'hard.python',
33-
'hard.java',
34-
'hard.cplusplus'
35-
]
33+
3634

3735
// Create and bind queues to exchange with the routing keys
3836
for (let name of queueNames) {
@@ -43,9 +41,13 @@ async function setupRabbitMQ() {
4341
console.log(`Queue ${name} does not exist or could not be deleted: ${err.message}`);
4442
}
4543
*/
44+
// this is required to add TTL and dead letter exchange to the queue
45+
await channel.deleteQueue(name);
46+
4647
await channel.assertQueue(name,
4748
{ durable: false, // durable=false ensures queue will survive broker restarts
4849
arguments: {
50+
'x-message-ttl': 60000, // set message time to live to 60 seconds
4951
'x-dead-letter-exchange': dead_letter_exchange_name // set dead letter exchange
5052
}
5153

@@ -73,6 +75,6 @@ async function setupRabbitMQ() {
7375
}
7476
}
7577

76-
module.exports = { setupRabbitMQ };
78+
module.exports = { setupRabbitMQ, matching_exchange_name, queueNames };
7779

7880
setupRabbitMQ()
Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,34 @@
11
const amqp = require('amqplib');
2+
const { queueNames } = require('./setup.js');
3+
const { matchUsers } = require('../services/matchingService.js');
24

35
// TODO: Subscribe and acknowledge messages with user info when timeout/user matched
46

5-
//const { matchUsers } = require('../services/matchingService');
7+
// To remember what goes in a subscriber use some Acronym
8+
// Connect, Assert, Process, E - for Acknowledge
69

7-
/*
810
async function consumeQueue() {
911
try {
12+
// Connect
1013
const connection = await amqp.connect(process.env.RABBITMQ_URL);
1114
const channel = await connection.createChannel();
12-
const exchange = 'matching_exchange';
1315

14-
// Consuming messages from multiple queues (already created in setup)
15-
const queueNames = ['easy.python', 'easy.java', 'medium.python', 'medium.java', 'hard.python', 'hard.java'];
16+
// Queues already created in setup.js
1617

1718
console.log("Waiting for users...")
1819

20+
// Process + subscribe to each queue
1921
for (let queueName of queueNames) {
20-
channel.consume(queueName, (msg) => {
22+
await channel.consume(queueName, (msg) => {
2123
if (msg !== null) {
2224
const userData = JSON.parse(msg.content.toString());
23-
// const { userId, language, difficulty } = userData;
25+
const { userId, language, difficulty } = userData;
2426

2527
// Perform the matching logic
26-
// matchUsers(userId, language, difficulty);
27-
console.log(userData);
28+
console.log(`Received user ${userId} with ${language} and ${difficulty}`);
29+
matchUsers(userId, language, difficulty);
2830

31+
// E- Acknowledge
2932
channel.ack(msg);
3033
}
3134
});
@@ -35,4 +38,4 @@ async function consumeQueue() {
3538
}
3639
}
3740

38-
*/
41+
module.exports = { consumeQueue };

Backend/MatchingService/services/matchingService.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
// TODO: Matching users logic
22

33

4-
/*
5-
const { notifyUsers } = require('./websocket/websocket)
4+
const { notifyUsers } = require('../websocket/websocket')
5+
// local dict to store waiting users
66
const waitingUsers = {};
77

88
function matchUsers(userId, language, difficulty) {
99
const criteriaKey = `${difficulty}.${language}`;
1010

11+
// if the criteria key does not exist, create it
1112
if (!waitingUsers[criteriaKey]) {
1213
waitingUsers[criteriaKey] = [];
1314
}
@@ -29,4 +30,3 @@ function matchUsers(userId, language, difficulty) {
2930
}
3031

3132
module.exports = { matchUsers };
32-
*/
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import axios from 'axios';
2+
3+
const baseUrl = 'http://localhost:3003';
4+
5+
const enterMatchmaking = async (user) => {
6+
return await axios.post(`${baseUrl}/api/match/enterQueue`, user);
7+
}

0 commit comments

Comments
 (0)