Skip to content

Commit f961011

Browse files
authored
Merge pull request #81 from CS3219-AY2425S1/improvement/amqp_over_ws
Improvement/amqp over ws
2 parents 9c3aba9 + f25206e commit f961011

File tree

16 files changed

+173
-350
lines changed

16 files changed

+173
-350
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
# production
1818
/build
1919

20+
# pnpm stores
21+
*/.pnpm-store
22+
2023
# misc
2124
.DS_Store
2225
*.pem
@@ -28,6 +31,7 @@ yarn-error.log*
2831

2932
# local env files
3033
.env*.local
34+
*/.env
3135

3236
# vercel
3337
.vercel

matching-service/package.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@
1919
"cors": "^2.8.5",
2020
"dotenv": "^16.4.5",
2121
"express": "^4.21.1",
22-
"redis": "^4.7.0",
23-
"ws": "^8.18.0"
22+
"redis": "^4.7.0"
2423
},
2524
"devDependencies": {
2625
"@types/amqplib": "^0.10.5",
2726
"@types/cors": "^2.8.17",
2827
"@types/express": "^5.0.0",
29-
"@types/ws": "^8.5.12",
3028
"nodemon": "^3.1.7",
3129
"prettier": "^3.3.3",
3230
"ts-node": "^10.9.2",

matching-service/src/app.ts

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
import express from "express";
22
import cors from "cors";
3-
import http from "http";
43
import messageRoutes from "./routes/messageRoute";
54
import { initRabbitMQ } from "./services/rabbitMqService";
65
import redisClient from "./config/redisConfig";
76
import { startBackgroundTransfer } from "./services/matchingService";
8-
import WebSocketService from "./services/webSocketService";
97

108
const app = express();
11-
const server = http.createServer(app);
12-
const webSocketService = new WebSocketService(server);
139

1410
const PORT = process.env.PORT || 5001; // 5001 to prevent conflicts
1511

@@ -25,15 +21,7 @@ app.use(cors({ origin: true, credentials: true }));
2521
// Mainly to check health or state of service
2622
app.use(`${apiVersion}/`, messageRoutes);
2723

28-
export const notifyMatch = async (
29-
user1Id: string,
30-
user2Id: string,
31-
matchData: any,
32-
) => {
33-
await webSocketService.notifyMatch(user1Id, user2Id, matchData);
34-
};
35-
36-
server.listen(PORT, async () => {
24+
app.listen(PORT, async () => {
3725
await initRabbitMQ();
3826

3927
try {

matching-service/src/model/CustomWebSocket.ts

Lines changed: 0 additions & 5 deletions
This file was deleted.

matching-service/src/model/userModel.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,24 @@ export const transferToTopicQueue = async (user: User): Promise<void> => {
6161
console.error(`Error storing user in topic queue for topic: ${user.topic}`);
6262
}
6363
};
64+
65+
export const removeUserFromQueues = async (userId: string) => {
66+
const allTopics = await redisClient.keys("topic:*");
67+
const allDifficulties = await redisClient.keys("difficulty:*");
68+
69+
const removeFromQueue = async (key: string) => {
70+
const users = await redisClient.zRange(key, 0, -1);
71+
for (const userString of users) {
72+
const user = JSON.parse(userString);
73+
if (user._id === userId) {
74+
await removeUserFromKey(key, userString);
75+
console.log(`Removed user ${userId} from queue ${key}`);
76+
break;
77+
}
78+
}
79+
};
80+
81+
for (const key of [...allTopics, ...allDifficulties]) {
82+
await removeFromQueue(key);
83+
}
84+
};

matching-service/src/services/matchingService.ts

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ import {
44
transferToTopicQueue,
55
removeUserFromKey,
66
transferToDifficultyQueue,
7+
removeUserFromQueues,
78
} from "../model/userModel";
89
import { User } from "../types";
910
import { sendToQueue } from "./rabbitMqService";
10-
import { notifyMatch } from "../app";
11-
1211
/**
1312
* Check match in queue, if there is a match, we will remove that user from the queue, and return
1413
* a response to rabbit.
@@ -63,12 +62,8 @@ export const processOldUsers = async (): Promise<void> => {
6362
status: "matched",
6463
match: match,
6564
});
66-
console.log(
67-
`Notifying match for OLD users: ${match._id} and ${user._id}`,
68-
);
69-
notifyMatch(match._id, user._id, { user1: match, user2: user });
70-
return;
71-
// continue;
65+
66+
continue;
7267
}
7368

7469
// Transfer to difficulty queue if there is no match
@@ -98,24 +93,29 @@ export const processOldUsers = async (): Promise<void> => {
9893
};
9994

10095
// Process a new user and attempt matching
101-
export const processNewUser = async (user: User): Promise<void> => {
102-
const match = await checkMatch(`topic:${user.topic}`);
96+
export const processNewMessage = async (user: User): Promise<void> => {
97+
if (user.type == "match") {
98+
const match = await checkMatch(`topic:${user.topic}`);
10399

104-
// match found
105-
if (match) {
106-
await sendToQueue(match._id, {
107-
status: "matched",
108-
match: user,
109-
});
110-
await sendToQueue(user._id, {
111-
status: "matched",
112-
match: match,
113-
});
114-
// Call notifyMatch here
115-
notifyMatch(match._id, user._id, { user1: match, user2: user });
116-
} else {
117-
// Add to the topic queue if no match
118-
await transferToTopicQueue(user);
100+
// match found
101+
if (match) {
102+
await sendToQueue(match._id, {
103+
status: "matched",
104+
match: user,
105+
});
106+
await sendToQueue(user._id, {
107+
status: "matched",
108+
match: match,
109+
});
110+
} else {
111+
// Add to the topic queue if no match
112+
await transferToTopicQueue(user);
113+
}
114+
} else if (user.type == "cancel") {
115+
//handle cancel request
116+
console.log(`Cancellation request received for user: ${user._id}`);
117+
// Remove the user from topic and difficulty queues
118+
await removeUserFromQueues(user._id);
119119
}
120120
};
121121

matching-service/src/services/rabbitMqService.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import amqp, { Channel } from "amqplib";
22
import { User } from "../types";
3-
import { processNewUser } from "./matchingService";
3+
import { processNewMessage } from "./matchingService";
44
import { SECONDS } from "../lib/constants";
55

66
let channel: Channel;
@@ -17,7 +17,7 @@ export const initRabbitMQ = async (): Promise<void> => {
1717
console.log("Connected to RabbitMQ");
1818

1919
// Subscribe to the queue upon successful connection
20-
await subscribeToQueue<User>(QUEUE, processNewUser);
20+
await subscribeToQueue<User>(QUEUE, processNewMessage);
2121

2222
// when connection close, initiate reconnection
2323
connection.on("close", async () => {

matching-service/src/services/webSocketService.ts

Lines changed: 0 additions & 145 deletions
This file was deleted.

matching-service/src/types.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export interface User {
22
_id: string;
3-
name: string;
4-
difficulty: string;
5-
topic: string;
3+
name?: string;
4+
difficulty?: string;
5+
topic?: string;
6+
type: string;
67
}

peerprep-fe/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"@radix-ui/react-scroll-area": "^1.2.0",
2020
"@radix-ui/react-select": "^2.1.1",
2121
"@radix-ui/react-slot": "^1.1.0",
22+
"@stomp/stompjs": "^7.0.0",
2223
"@types/js-cookie": "^3.0.6",
2324
"amqplib": "^0.10.4",
2425
"axios": "^1.7.7",

0 commit comments

Comments
 (0)