Skip to content

Commit 450ac4e

Browse files
committed
Set up queue per match criteria
1 parent 59f7419 commit 450ac4e

File tree

4 files changed

+129
-5
lines changed

4 files changed

+129
-5
lines changed

backend/matching-service/config/rabbitmq.ts

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,95 @@
11
import amqplib, { Connection } from "amqplib";
22
import dotenv from "dotenv";
3-
import { matchUsers } from "../src/utils/mq_utils";
3+
import { matchUsers, matchUsersInQueue } from "../src/utils/mq_utils";
44
import { MatchRequestItem } from "../src/handlers/matchHandler";
55

66
dotenv.config();
77

88
let mrConnection: Connection;
99
const queue = "match_requests";
1010

11+
let mrConnectionNew: Connection;
12+
const queues: string[] = [];
13+
14+
enum Complexities {
15+
EASY = "Easy",
16+
MEDIUM = "Medium",
17+
HARD = "Hard",
18+
}
19+
20+
enum Categories {
21+
STRINGS = "Strings",
22+
ALGORITHMS = "Algorithms",
23+
DATA_STRUCTURES = "Data Structures",
24+
BIT_MANIPULATION = "Bit Manipulation",
25+
RECURSION = "Recursion",
26+
DYNAMIC_PROGRAMMING = "Dynamic Programming",
27+
ARRAYS = "Arrays",
28+
TREE = "Tree",
29+
}
30+
31+
enum LANGUAGES {
32+
PYTHON = "Python",
33+
JAVA = "Java",
34+
C = "C",
35+
}
36+
37+
export const pendingRequestsPerQueue = new Map<
38+
string,
39+
Map<string, MatchRequestItem>
40+
>();
41+
42+
const initQueueNames = () => {
43+
for (const complexity of Object.values(Complexities)) {
44+
for (const category of Object.values(Categories)) {
45+
for (const language of Object.values(LANGUAGES)) {
46+
queues.push(`${complexity}_${category}_${language}`);
47+
}
48+
}
49+
}
50+
};
51+
52+
export const connectToRabbitMq = async () => {
53+
try {
54+
initQueueNames();
55+
mrConnectionNew = await amqplib.connect(`${process.env.RABBITMQ_ADDR}`);
56+
for (const queue of queues) {
57+
await setUpQueue(queue);
58+
pendingRequestsPerQueue.set(queue, new Map<string, MatchRequestItem>());
59+
}
60+
} catch (error) {
61+
console.error(error);
62+
process.exit(1);
63+
}
64+
};
65+
66+
const setUpQueue = async (queueName: string) => {
67+
const consumerChannel = await mrConnectionNew.createChannel();
68+
await consumerChannel.assertQueue(queueName);
69+
70+
consumerChannel.consume(queueName, (msg) => {
71+
console.log(`consume from queue: ${queueName}`);
72+
if (msg !== null) {
73+
matchUsersInQueue(queueName, msg.content.toString());
74+
consumerChannel.ack(msg);
75+
}
76+
});
77+
};
78+
79+
export const sendToQueue = async (data: MatchRequestItem) => {
80+
try {
81+
const queueName = `${data.complexities[0]}_${data.categories[0]}_${data.languages[0]}`;
82+
const senderChannel = await mrConnectionNew.createChannel();
83+
senderChannel.sendToQueue(queueName, Buffer.from(JSON.stringify(data)));
84+
return true;
85+
} catch (error) {
86+
console.log(error);
87+
return false;
88+
}
89+
};
90+
91+
// ----------------
92+
1193
export const connectRabbitMq = async () => {
1294
try {
1395
mrConnection = await amqplib.connect(`${process.env.RABBITMQ_ADDR}`);

backend/matching-service/server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import http from "http";
22
import app, { allowedOrigins } from "./app.ts";
33
import { handleWebsocketMatchEvents } from "./src/handlers/websocketHandler.ts";
44
import { Server } from "socket.io";
5-
import { connectRabbitMq } from "./config/rabbitmq.ts";
5+
import { connectRabbitMq, connectToRabbitMq } from "./config/rabbitmq.ts";
66

77
const server = http.createServer(app);
88
export const io = new Server(server, {
@@ -20,7 +20,8 @@ io.on("connection", (socket) => {
2020
const PORT = process.env.PORT || 3002;
2121

2222
if (process.env.NODE_ENV !== "test") {
23-
connectRabbitMq()
23+
connectToRabbitMq()
24+
// connectRabbitMq()
2425
.then(() => {
2526
console.log("RabbitMq connected!");
2627

backend/matching-service/src/handlers/matchHandler.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { v4 as uuidv4 } from "uuid";
2-
import { sendRabbitMq } from "../../config/rabbitmq";
2+
import { sendToQueue } from "../../config/rabbitmq";
33
import { sendMatchFound } from "./websocketHandler";
44

55
interface Match {
@@ -53,7 +53,7 @@ export const sendMatchRequest = async (
5353
rejectedPartnerId: rejectedPartnerId,
5454
};
5555

56-
const sent = await sendRabbitMq(matchItem);
56+
const sent = await sendToQueue(matchItem);
5757
return sent;
5858
};
5959

backend/matching-service/src/utils/mq_utils.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,47 @@
1+
import { pendingRequestsPerQueue } from "../../config/rabbitmq";
12
import { createMatch, MatchRequestItem } from "../handlers/matchHandler";
23
import { isActiveRequest, isUserConnected } from "../handlers/websocketHandler";
34

5+
export const matchUsersInQueue = (queueName: string, newRequest: string) => {
6+
const pendingRequests = pendingRequestsPerQueue.get(queueName)!;
7+
const newRequestJson = JSON.parse(newRequest) as MatchRequestItem;
8+
const newRequestUid = newRequestJson.user.id;
9+
10+
for (const [uid, pendingRequest] of pendingRequests) {
11+
if (
12+
isExpired(pendingRequest) ||
13+
!isUserConnected(uid) ||
14+
!isActiveRequest(uid, pendingRequest.id) ||
15+
uid === newRequestUid
16+
) {
17+
pendingRequests.delete(uid);
18+
continue;
19+
}
20+
21+
if (
22+
isExpired(newRequestJson) ||
23+
!isUserConnected(newRequestUid) ||
24+
!isActiveRequest(newRequestUid, newRequestJson.id)
25+
) {
26+
return;
27+
}
28+
29+
if (
30+
uid === newRequestJson.rejectedPartnerId ||
31+
newRequestUid === pendingRequest.rejectedPartnerId
32+
) {
33+
continue;
34+
}
35+
36+
pendingRequests.delete(uid);
37+
createMatch(pendingRequest, newRequestJson);
38+
return;
39+
}
40+
pendingRequests.set(newRequestUid, newRequestJson);
41+
};
42+
43+
// ----------------
44+
445
const matchingRequests = new Map<string, MatchRequestItem>();
546

647
export const matchUsers = (newRequest: string) => {

0 commit comments

Comments
 (0)