@@ -2,40 +2,69 @@ import amqplib, { Connection } from "amqplib";
2
2
import dotenv from "dotenv" ;
3
3
import { matchUsers } from "../src/utils/mq_utils" ;
4
4
import { MatchRequestItem } from "../src/handlers/matchHandler" ;
5
+ import { Complexities , Categories , Languages } from "../src/utils/constants" ;
5
6
6
7
dotenv . config ( ) ;
7
8
8
9
let mrConnection : Connection ;
9
- const queue = "match_requests" ;
10
+ const queues : string [ ] = [ ] ;
11
+ const pendingQueueRequests = new Map < string , Map < string , MatchRequestItem > > ( ) ;
10
12
11
- export const connectRabbitMq = async ( ) => {
13
+ const initQueueNames = ( ) => {
14
+ for ( const complexity of Object . values ( Complexities ) ) {
15
+ for ( const category of Object . values ( Categories ) ) {
16
+ for ( const language of Object . values ( Languages ) ) {
17
+ queues . push ( `${ complexity } _${ category } _${ language } ` ) ;
18
+ }
19
+ }
20
+ }
21
+ } ;
22
+
23
+ const setUpQueue = async ( queueName : string ) => {
24
+ const consumerChannel = await mrConnection . createChannel ( ) ;
25
+ await consumerChannel . assertQueue ( queueName ) ;
26
+
27
+ consumerChannel . consume ( queueName , ( msg ) => {
28
+ if ( msg !== null ) {
29
+ matchUsers ( queueName , msg . content . toString ( ) ) ;
30
+ consumerChannel . ack ( msg ) ;
31
+ }
32
+ } ) ;
33
+ } ;
34
+
35
+ export const connectToRabbitMq = async ( ) => {
12
36
try {
37
+ initQueueNames ( ) ;
13
38
mrConnection = await amqplib . connect ( `${ process . env . RABBITMQ_ADDR } ` ) ;
14
- const consumerChannel = await mrConnection . createChannel ( ) ;
15
- await consumerChannel . assertQueue ( queue ) ;
16
-
17
- consumerChannel . consume ( queue , ( msg ) => {
18
- if ( msg !== null ) {
19
- matchUsers ( msg . content . toString ( ) ) ;
20
- consumerChannel . ack ( msg ) ;
21
- }
22
- } ) ;
39
+ for ( const queue of queues ) {
40
+ await setUpQueue ( queue ) ;
41
+ pendingQueueRequests . set ( queue , new Map < string , MatchRequestItem > ( ) ) ;
42
+ }
23
43
} catch ( error ) {
24
44
console . error ( error ) ;
25
45
process . exit ( 1 ) ;
26
46
}
27
47
} ;
28
48
29
- export const sendRabbitMq = async (
49
+ export const sendToQueue = async (
50
+ complexity : string ,
51
+ category : string ,
52
+ language : string ,
30
53
data : MatchRequestItem
31
54
) : Promise < boolean > => {
32
55
try {
56
+ const queueName = `${ complexity } _${ category } _${ language } ` ;
33
57
const senderChannel = await mrConnection . createChannel ( ) ;
34
- senderChannel . sendToQueue ( queue , Buffer . from ( JSON . stringify ( data ) ) ) ;
35
- console . log ( "Sent to queue:" , JSON . stringify ( data ) ) ;
58
+ senderChannel . sendToQueue ( queueName , Buffer . from ( JSON . stringify ( data ) ) ) ;
36
59
return true ;
37
60
} catch ( error ) {
38
61
console . log ( error ) ;
39
62
return false ;
40
63
}
41
64
} ;
65
+
66
+ export const getPendingRequests = (
67
+ queueName : string
68
+ ) : Map < string , MatchRequestItem > => {
69
+ return pendingQueueRequests . get ( queueName ) ! ;
70
+ } ;
0 commit comments