@@ -3,6 +3,8 @@ const amqp = require('amqplib');
3
3
const { v4 : uuidv4 } = require ( 'uuid' ) ;
4
4
const { addMatchedPair, getCurrentMatchedPair } = require ( '../database/matchedPairDb' ) ;
5
5
const MatchedPair = require ( '../models/matchedPairModel' ) ;
6
+ const matchedPairDb = require ( '../database/matchedPairDb' ) ;
7
+
6
8
7
9
const refreshDuration = 3000 ; // 3 seconds
8
10
const waitingDuration = 3000 ;
@@ -14,6 +16,7 @@ let availabilityCache = new Set();
14
16
15
17
// Find matching pair based on the selected criteria : language, proficiency, difficulty, topic
16
18
async function findMatch ( request ) {
19
+
17
20
return new Promise ( async ( resolve ) => {
18
21
let connection ;
19
22
let channel ;
@@ -25,10 +28,10 @@ async function findMatch(request) {
25
28
26
29
console . log ( 'Successfully connected to RabbitMQ' ) ;
27
30
28
- const criteria = `${ request . language }
29
- .${ request . proficiency }
30
- .${ request . difficulty }
31
- .${ request . topic } ` ;
31
+ const criteria = `${ request . language || 'None' }
32
+ .${ request . proficiency || 'None' }
33
+ .${ request . difficulty || 'None' }
34
+ .${ request . topic || 'None' } ` ;
32
35
33
36
checkCancel = setInterval ( async ( ) => {
34
37
if ( isCancelled . has ( parseInt ( request . id ) ) ) {
@@ -94,6 +97,8 @@ async function findMatch(request) {
94
97
}
95
98
} catch ( error ) {
96
99
console . log ( 'Error finding match: ' , error ) ;
100
+ throw error ;
101
+
97
102
} finally {
98
103
availabilityCache . delete ( request . id ) ;
99
104
@@ -107,6 +112,18 @@ async function findMatch(request) {
107
112
} ) ;
108
113
}
109
114
115
+ function criteriaMatches ( requestCriteria , currentCriteria ) {
116
+ const fields = [ 'language' , 'proficiency' , 'difficulty' , 'topic' ] ;
117
+ for ( let field of fields ) {
118
+ if ( requestCriteria [ field ] !== "None" &&
119
+ currentCriteria [ field ] !== "None" &&
120
+ requestCriteria [ field ] !== currentCriteria [ field ] ) {
121
+ return false ;
122
+ }
123
+ }
124
+ return true ;
125
+ }
126
+
110
127
// Add new request into the queue, 'topic' exchange type is used to route the message
111
128
async function addRequestIntoQueue ( channel , criteria , request ) {
112
129
try {
@@ -167,29 +184,26 @@ async function listenToMatchingQueue(channel, criteria, request) {
167
184
channel . consume ( queueName , async ( message ) => {
168
185
const currentRequest = JSON . parse ( message . content . toString ( ) ) ;
169
186
const checkActivePair = await getCurrentMatchedPair ( currentRequest . request . id ) ;
170
-
171
- if ( checkActivePair &&
172
- ( String ( checkActivePair . id1 ) === String ( request . id ) ||
173
- String ( checkActivePair . id2 ) === String ( request . id ) ) ) {
174
-
187
+
188
+ // Check if there is an active pair and if the criteria still match
189
+ if ( checkActivePair && criteriaMatches ( request , currentRequest . request ) ) {
190
+ const collaboratorId = String ( checkActivePair . id1 ) === String ( request . id ) ? checkActivePair . id2 : checkActivePair . id1 ;
175
191
resolve ( {
176
192
stored : true ,
177
193
isMatched : true ,
178
194
id : request . id ,
179
- collaboratorId : currentRequest . request . id
195
+ collaboratorId : collaboratorId
180
196
} ) ;
181
197
182
198
} else if ( checkActivePair ||
183
199
isCancelled . has ( parseInt ( currentRequest . request . id ) ) ) {
184
200
185
201
console . log ( `Remove match ${ currentRequest . request . id } ` ) ;
186
-
187
202
availabilityCache . delete ( currentRequest . request . id ) ;
188
203
channel . ack ( message ) ;
189
204
190
205
} else if ( ! matched &&
191
- currentRequest . request . id !== request . id &&
192
- currentRequest . criteria === criteria &&
206
+ currentRequest . request . id !== request . id && criteriaMatches ( request , currentRequest . request ) &&
193
207
availabilityCache . has ( currentRequest . request . id ) ) {
194
208
195
209
console . log ( `Found a match for ${ request . id } ` ) ;
@@ -219,13 +233,42 @@ async function listenToMatchingQueue(channel, criteria, request) {
219
233
220
234
// Cancel matching service
221
235
236
+ //async function cancelMatch(requestId) {
237
+ // isCancelled.add(parseInt(requestId));
238
+ // availabilityCache.delete(requestId);
239
+ //
240
+ // console.log(`Matching service is cancelled for ${requestId}`);
241
+
242
+ // return true;
243
+ //}
244
+
222
245
async function cancelMatch ( requestId ) {
246
+ // Adding the requestId to the cancelled set
223
247
isCancelled . add ( parseInt ( requestId ) ) ;
248
+
249
+ // Removing the requestId from the availability cache
224
250
availabilityCache . delete ( requestId ) ;
225
251
226
252
console . log ( `Matching service is cancelled for ${ requestId } ` ) ;
227
253
254
+ // Check if there's an ongoing session for the given requestId
255
+ const currentMatchedPair = await matchedPairDb . getCurrentMatchedPair ( requestId ) ;
256
+ if ( currentMatchedPair ) {
257
+ console . log ( `Found ongoing session for ${ requestId } . Terminating...` ) ;
258
+
259
+ try {
260
+ await matchedPairDb . endSession ( currentMatchedPair . sessionId ) ;
261
+ console . log ( `Successfully terminated session for ${ requestId } ` ) ;
262
+ } catch ( error ) {
263
+ console . error ( `Error while terminating session for ${ requestId } :` , error ) ;
264
+ throw error ;
265
+ }
266
+ } else {
267
+ console . log ( `No ongoing session found for ${ requestId } ` ) ;
268
+ }
269
+
228
270
return true ;
229
271
}
230
272
273
+
231
274
module . exports = { findMatch, cancelMatch } ;
0 commit comments