|
1 | 1 | import amqplib, { Connection } from "amqplib";
|
2 | 2 | import dotenv from "dotenv";
|
3 |
| -import { matchUsers, matchUsersInQueue } from "../src/utils/mq_utils"; |
| 3 | +import { matchUsers } from "../src/utils/mq_utils"; |
4 | 4 | import { MatchRequestItem } from "../src/handlers/matchHandler";
|
| 5 | +import { Complexities, Categories, Languages } from "../src/utils/constants"; |
5 | 6 |
|
6 | 7 | dotenv.config();
|
7 | 8 |
|
8 | 9 | let mrConnection: Connection;
|
9 |
| -const queue = "match_requests"; |
10 |
| - |
11 |
| -let mrConnectionNew: Connection; |
12 | 10 | const queues: string[] = [];
|
13 |
| - |
14 |
| -enum Complexities { |
15 |
| - EASY = "Easy", |
16 |
| - MEDIUM = "Medium", |
17 |
| - HARD = "Hard", |
18 |
| -} |
19 |
| - |
20 |
| -enum Categories { |
21 |
| - STRINGS = "Strings", |
22 |
| - ALGORITHMS = "Algorithms", |
23 |
| - DATA_STRUCTURES = "Data Structures", |
24 |
| - BIT_MANIPULATION = "Bit Manipulation", |
25 |
| - RECURSION = "Recursion", |
26 |
| - DYNAMIC_PROGRAMMING = "Dynamic Programming", |
27 |
| - ARRAYS = "Arrays", |
28 |
| - TREE = "Tree", |
29 |
| -} |
30 |
| - |
31 |
| -enum LANGUAGES { |
32 |
| - PYTHON = "Python", |
33 |
| - JAVA = "Java", |
34 |
| - C = "C", |
35 |
| -} |
36 |
| - |
37 |
| -export const pendingRequestsPerQueue = new Map< |
38 |
| - string, |
39 |
| - Map<string, MatchRequestItem> |
40 |
| ->(); |
| 11 | +const pendingQueueRequests = new Map<string, Map<string, MatchRequestItem>>(); |
41 | 12 |
|
42 | 13 | const initQueueNames = () => {
|
43 | 14 | for (const complexity of Object.values(Complexities)) {
|
44 | 15 | for (const category of Object.values(Categories)) {
|
45 |
| - for (const language of Object.values(LANGUAGES)) { |
| 16 | + for (const language of Object.values(Languages)) { |
46 | 17 | queues.push(`${complexity}_${category}_${language}`);
|
47 | 18 | }
|
48 | 19 | }
|
49 | 20 | }
|
50 | 21 | };
|
51 | 22 |
|
52 |
| -export const connectToRabbitMq = async () => { |
53 |
| - try { |
54 |
| - initQueueNames(); |
55 |
| - mrConnectionNew = await amqplib.connect(`${process.env.RABBITMQ_ADDR}`); |
56 |
| - for (const queue of queues) { |
57 |
| - await setUpQueue(queue); |
58 |
| - pendingRequestsPerQueue.set(queue, new Map<string, MatchRequestItem>()); |
59 |
| - } |
60 |
| - } catch (error) { |
61 |
| - console.error(error); |
62 |
| - process.exit(1); |
63 |
| - } |
64 |
| -}; |
65 |
| - |
66 | 23 | const setUpQueue = async (queueName: string) => {
|
67 |
| - const consumerChannel = await mrConnectionNew.createChannel(); |
| 24 | + const consumerChannel = await mrConnection.createChannel(); |
68 | 25 | await consumerChannel.assertQueue(queueName);
|
69 | 26 |
|
70 | 27 | consumerChannel.consume(queueName, (msg) => {
|
71 |
| - console.log(`consume from queue: ${queueName}`); |
72 | 28 | if (msg !== null) {
|
73 |
| - matchUsersInQueue(queueName, msg.content.toString()); |
| 29 | + matchUsers(queueName, msg.content.toString()); |
74 | 30 | consumerChannel.ack(msg);
|
75 | 31 | }
|
76 | 32 | });
|
77 | 33 | };
|
78 | 34 |
|
79 |
| -export const sendToQueue = async (data: MatchRequestItem) => { |
80 |
| - try { |
81 |
| - const queueName = `${data.complexities[0]}_${data.categories[0]}_${data.languages[0]}`; |
82 |
| - const senderChannel = await mrConnectionNew.createChannel(); |
83 |
| - senderChannel.sendToQueue(queueName, Buffer.from(JSON.stringify(data))); |
84 |
| - return true; |
85 |
| - } catch (error) { |
86 |
| - console.log(error); |
87 |
| - return false; |
88 |
| - } |
89 |
| -}; |
90 |
| - |
91 |
| -// ---------------- |
92 |
| - |
93 |
| -export const connectRabbitMq = async () => { |
| 35 | +export const connectToRabbitMq = async () => { |
94 | 36 | try {
|
| 37 | + initQueueNames(); |
95 | 38 | mrConnection = await amqplib.connect(`${process.env.RABBITMQ_ADDR}`);
|
96 |
| - const consumerChannel = await mrConnection.createChannel(); |
97 |
| - await consumerChannel.assertQueue(queue); |
98 |
| - |
99 |
| - consumerChannel.consume(queue, (msg) => { |
100 |
| - if (msg !== null) { |
101 |
| - matchUsers(msg.content.toString()); |
102 |
| - consumerChannel.ack(msg); |
103 |
| - } |
104 |
| - }); |
| 39 | + for (const queue of queues) { |
| 40 | + await setUpQueue(queue); |
| 41 | + pendingQueueRequests.set(queue, new Map<string, MatchRequestItem>()); |
| 42 | + } |
105 | 43 | } catch (error) {
|
106 | 44 | console.error(error);
|
107 | 45 | process.exit(1);
|
108 | 46 | }
|
109 | 47 | };
|
110 | 48 |
|
111 |
| -export const sendRabbitMq = async ( |
| 49 | +export const sendToQueue = async ( |
| 50 | + complexity: string, |
| 51 | + category: string, |
| 52 | + language: string, |
112 | 53 | data: MatchRequestItem
|
113 |
| -): Promise<boolean> => { |
| 54 | +) => { |
114 | 55 | try {
|
| 56 | + const queueName = `${complexity}_${category}_${language}`; |
115 | 57 | const senderChannel = await mrConnection.createChannel();
|
116 |
| - senderChannel.sendToQueue(queue, Buffer.from(JSON.stringify(data))); |
| 58 | + senderChannel.sendToQueue(queueName, Buffer.from(JSON.stringify(data))); |
117 | 59 | return true;
|
118 | 60 | } catch (error) {
|
119 | 61 | console.log(error);
|
120 | 62 | return false;
|
121 | 63 | }
|
122 | 64 | };
|
| 65 | + |
| 66 | +export const getPendingRequests = ( |
| 67 | + queueName: string |
| 68 | +): Map<string, MatchRequestItem> => { |
| 69 | + return pendingQueueRequests.get(queueName)!; |
| 70 | +}; |
0 commit comments