@@ -3,6 +3,8 @@ const amqp = require('amqplib');
3
3
const { v4 : uuidv4 } = require ( 'uuid' ) ;
4
4
const { addMatchedPair, getCurrentMatchedPair } = require ( '../database/matchedPairDb' ) ;
5
5
const MatchedPair = require ( '../models/matchedPairModel' ) ;
6
+ const matchedPairDb = require ( '../database/matchedPairDb' ) ;
7
+
6
8
7
9
const refreshDuration = 3000 ; // 3 seconds
8
10
const waitingDuration = 3000 ;
@@ -14,6 +16,7 @@ let availabilityCache = new Set();
14
16
15
17
// Find matching pair based on the selected criteria : language, proficiency, difficulty, topic
16
18
async function findMatch ( request ) {
19
+
17
20
return new Promise ( async ( resolve ) => {
18
21
let connection ;
19
22
let channel ;
@@ -25,22 +28,23 @@ async function findMatch(request) {
25
28
26
29
console . log ( 'Successfully connected to RabbitMQ' ) ;
27
30
28
- const criteria = `${ request . language }
29
- .${ request . proficiency }
30
- .${ request . difficulty }
31
- .${ request . topic } ` ;
31
+ const criteria = `${ request . language || 'None' }
32
+ .${ request . proficiency || 'None' }
33
+ .${ request . difficulty || 'None' }
34
+ .${ request . topic || 'None' } ` ;
32
35
33
36
checkCancel = setInterval ( async ( ) => {
34
37
if ( isCancelled . has ( parseInt ( request . id ) ) ) {
35
38
clearInterval ( checkCancel ) ;
36
- resolve ( { isMatched : false , collaboratorId : null , request : request } ) ;
39
+ resolve ( { status : 'cancel' , isMatched : false , collaboratorId : null , request : request } ) ;
37
40
38
41
} else {
39
42
const checkMatchedPair = await getCurrentMatchedPair ( request . id ) ;
40
43
41
44
if ( checkMatchedPair ) {
42
45
clearInterval ( checkCancel ) ;
43
46
resolve ( {
47
+ status : 'success' ,
44
48
isMatched : true ,
45
49
collaboratorId : String ( checkMatchedPair . id1 ) === String ( request . id ) ?
46
50
parseInt ( checkMatchedPair . id2 ) : parseInt ( checkMatchedPair . id1 ) ,
@@ -63,10 +67,11 @@ async function findMatch(request) {
63
67
if ( ! isMatched ) {
64
68
console . log ( `Matched pair could not be found for ${ request . id } ` ) ;
65
69
66
- resolve ( { isMatched : false , collaboratorId : null , request : request } ) ;
70
+ resolve ( { status : 'error' , isMatched : false , collaboratorId : null , request : request } ) ;
67
71
68
72
} else if ( stored ) {
69
73
resolve ( {
74
+ status : 'success' ,
70
75
isMatched : true ,
71
76
collaboratorId : parseInt ( collaboratorId ) ,
72
77
request : request
@@ -87,13 +92,16 @@ async function findMatch(request) {
87
92
await addMatchedPair ( matchedPair ) ;
88
93
89
94
resolve ( {
95
+ status : 'success' ,
90
96
isMatched : true ,
91
97
collaboratorId : parseInt ( collaboratorId ) ,
92
98
request : request
93
99
} ) ;
94
100
}
95
101
} catch ( error ) {
96
102
console . log ( 'Error finding match: ' , error ) ;
103
+ resolve ( { status : 'error' , message : error . message , isMatched : false , collaboratorId : null , request : request } ) ;
104
+
97
105
} finally {
98
106
availabilityCache . delete ( request . id ) ;
99
107
@@ -107,6 +115,18 @@ async function findMatch(request) {
107
115
} ) ;
108
116
}
109
117
118
+ function criteriaMatches ( requestCriteria , currentCriteria ) {
119
+ const fields = [ 'language' , 'proficiency' , 'difficulty' , 'topic' ] ;
120
+ for ( let field of fields ) {
121
+ if ( requestCriteria [ field ] !== "None" &&
122
+ currentCriteria [ field ] !== "None" &&
123
+ requestCriteria [ field ] !== currentCriteria [ field ] ) {
124
+ return false ;
125
+ }
126
+ }
127
+ return true ;
128
+ }
129
+
110
130
// Add new request into the queue, 'topic' exchange type is used to route the message
111
131
async function addRequestIntoQueue ( channel , criteria , request ) {
112
132
try {
@@ -167,29 +187,26 @@ async function listenToMatchingQueue(channel, criteria, request) {
167
187
channel . consume ( queueName , async ( message ) => {
168
188
const currentRequest = JSON . parse ( message . content . toString ( ) ) ;
169
189
const checkActivePair = await getCurrentMatchedPair ( currentRequest . request . id ) ;
170
-
171
- if ( checkActivePair &&
172
- ( String ( checkActivePair . id1 ) === String ( request . id ) ||
173
- String ( checkActivePair . id2 ) === String ( request . id ) ) ) {
174
-
190
+
191
+ // Check if there is an active pair and if the criteria still match
192
+ if ( checkActivePair && criteriaMatches ( request , currentRequest . request ) ) {
193
+ const collaboratorId = String ( checkActivePair . id1 ) === String ( request . id ) ? checkActivePair . id2 : checkActivePair . id1 ;
175
194
resolve ( {
176
195
stored : true ,
177
196
isMatched : true ,
178
197
id : request . id ,
179
- collaboratorId : currentRequest . request . id
198
+ collaboratorId : collaboratorId
180
199
} ) ;
181
200
182
201
} else if ( checkActivePair ||
183
202
isCancelled . has ( parseInt ( currentRequest . request . id ) ) ) {
184
203
185
204
console . log ( `Remove match ${ currentRequest . request . id } ` ) ;
186
-
187
205
availabilityCache . delete ( currentRequest . request . id ) ;
188
206
channel . ack ( message ) ;
189
207
190
208
} else if ( ! matched &&
191
- currentRequest . request . id !== request . id &&
192
- currentRequest . criteria === criteria &&
209
+ currentRequest . request . id !== request . id && criteriaMatches ( request , currentRequest . request ) &&
193
210
availabilityCache . has ( currentRequest . request . id ) ) {
194
211
195
212
console . log ( `Found a match for ${ request . id } ` ) ;
@@ -219,13 +236,42 @@ async function listenToMatchingQueue(channel, criteria, request) {
219
236
220
237
// Cancel matching service
221
238
239
+ //async function cancelMatch(requestId) {
240
+ // isCancelled.add(parseInt(requestId));
241
+ // availabilityCache.delete(requestId);
242
+ //
243
+ // console.log(`Matching service is cancelled for ${requestId}`);
244
+
245
+ // return true;
246
+ //}
247
+
222
248
async function cancelMatch ( requestId ) {
249
+ // Adding the requestId to the cancelled set
223
250
isCancelled . add ( parseInt ( requestId ) ) ;
251
+
252
+ // Removing the requestId from the availability cache
224
253
availabilityCache . delete ( requestId ) ;
225
254
226
255
console . log ( `Matching service is cancelled for ${ requestId } ` ) ;
227
256
257
+ // Check if there's an ongoing session for the given requestId
258
+ const currentMatchedPair = await matchedPairDb . getCurrentMatchedPair ( requestId ) ;
259
+ if ( currentMatchedPair ) {
260
+ console . log ( `Found ongoing session for ${ requestId } . Terminating...` ) ;
261
+
262
+ try {
263
+ await matchedPairDb . endSession ( currentMatchedPair . sessionId ) ;
264
+ console . log ( `Successfully terminated session for ${ requestId } ` ) ;
265
+ } catch ( error ) {
266
+ console . error ( `Error while terminating session for ${ requestId } :` , error ) ;
267
+ throw error ;
268
+ }
269
+ } else {
270
+ console . log ( `No ongoing session found for ${ requestId } ` ) ;
271
+ }
272
+
228
273
return true ;
229
274
}
230
275
276
+
231
277
module . exports = { findMatch, cancelMatch } ;
0 commit comments