Skip to content

Commit b7b7a17

Browse files
committed
abstract subscribe to queue and add comments
1 parent d6be5ab commit b7b7a17

File tree

6 files changed

+89
-50
lines changed

6 files changed

+89
-50
lines changed

docker-compose.prod.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ services:
33
image: redis:latest
44
container_name: redis-server
55
ports:
6-
- "6379:6379" # Expose Redis port
6+
- "6379:6379"
77
networks:
88
- peerprep-network
99
frontend:

matching-service/src/app.ts

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@ import cors from "cors";
33
import * as dotenv from "dotenv";
44

55
import messageRoutes from "./routes/messageRoute";
6-
import { initRabbitMQ } from "./services/rabbitMqService";
6+
import { initRabbitMQ, subscribeToQueue } from "./services/rabbitMqService";
77
import redisClient from "./config/redisConfig";
8-
import { startBackgroundTransfer } from "./services/matchingService";
8+
import {
9+
processNewUser,
10+
startBackgroundTransfer,
11+
} from "./services/matchingService";
12+
import { User } from "./types";
913

1014
dotenv.config({ path: ".env.dev" });
1115

@@ -22,21 +26,22 @@ const apiVersion = "/api/v1";
2226
// Allow frontend to talk to service
2327
app.use(cors({ origin: true, credentials: true }));
2428

25-
app.use(`${apiVersion}/message`, messageRoutes);
26-
27-
// ping server
28-
app.get(`${apiVersion}/ping`, (req, res) => {
29-
res.status(200).json({ data: "pong" });
30-
});
29+
// Mainly to check health or state of service
30+
app.use(`${apiVersion}/`, messageRoutes);
3131

3232
// Start the server
3333
app.listen(PORT, async () => {
34-
await initRabbitMQ(QUEUE);
34+
await initRabbitMQ();
35+
36+
// Subscribe to the matching service queue
37+
await subscribeToQueue<User>(QUEUE, processNewUser);
38+
3539
try {
3640
await redisClient.connect();
3741
} catch (err) {
3842
console.error("Error connecting to Redis:", err);
3943
}
44+
4045
console.log(`Server is running on port ${PORT}`);
4146

4247
startBackgroundTransfer();

matching-service/src/model/userModel.ts

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
11
import redisClient from "../config/redisConfig";
22
import { User } from "../types";
33

4-
// Store user in Redis using async/await
5-
export const storeUser = async (user: User): Promise<string | null> => {
6-
try {
7-
// Store the user in Redis, Redis 4.x+ returns a Promise, no need for callbacks
8-
const reply = await redisClient.set(user._id, JSON.stringify(user));
9-
console.log("User stored in Redis:", reply);
10-
return reply;
11-
} catch (err) {
12-
console.error("Redis set error:", err);
13-
throw err;
14-
}
15-
};
16-
4+
/**
5+
* Store a user in an ordered set in Redis, where the key represents the location
6+
* and the user is stored with a score based on the current timestamp.
7+
*
8+
* @param key key to store user in
9+
* @param user The user object to store, which will be serialized to JSON.
10+
* @returns The number of elements added to the sorted set (not including all the elements already present).
11+
* If an error occurs, it returns null.
12+
*/
1713
export const storeUserToKey = async (
1814
key: string,
1915
user: User,
@@ -24,6 +20,16 @@ export const storeUserToKey = async (
2420
]);
2521
};
2622

23+
/**
24+
* Remove a user from an ordered set in Redis.
25+
*
26+
* This function removes the specified user from the ordered set identified by the given key.
27+
*
28+
* @param key - The key of the ordered set from which the user will be removed.
29+
* @param user - The string representation of the user to be removed from the ordered set.
30+
* @returns The number of elements removed from the sorted set (1 if the user was removed, 0 if the user was not found).
31+
* Returns null if an error occurs during the operation.
32+
*/
2733
export const removeUserFromKey = async (
2834
key: string,
2935
user: string,
@@ -32,6 +38,9 @@ export const removeUserFromKey = async (
3238
return redisClient.zRem(key, user);
3339
};
3440

41+
/**
42+
* Transfers user to the difficulty queue based on the user's difficulty level.
43+
*/
3544
export const transferToDifficultyQueue = async (user: User): Promise<void> => {
3645
const difficultyKey = `difficulty:${user.difficulty}`;
3746
const res = await storeUserToKey(difficultyKey, user);
@@ -42,6 +51,9 @@ export const transferToDifficultyQueue = async (user: User): Promise<void> => {
4251
}
4352
};
4453

54+
/**
55+
* Transfers user to the topic queue based on the user's topic.
56+
*/
4557
export const transferToTopicQueue = async (user: User): Promise<void> => {
4658
const topicKey = `topic:${user.topic}`;
4759
const res = await storeUserToKey(topicKey, user);

matching-service/src/routes/messageRoute.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { returnPing } from "../controller/messageController";
33

44
const router = express.Router();
55

6+
// Check the health of the service
67
router.get("/ping", returnPing);
78

89
export default router;

matching-service/src/services/matchingService.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { sendToQueue } from "./rabbitMqService";
1414
* If there is no match, we will return null.
1515
*
1616
* @param topic
17-
* @returns
17+
* @returns User if there is a match, null if there is no match
1818
*/
1919
export const checkMatch = async (key: string): Promise<User | null> => {
2020
const match = await redisClient.zRange(key, 0, 0); // Get the first user in the topic queue
@@ -26,7 +26,6 @@ export const checkMatch = async (key: string): Promise<User | null> => {
2626
if (res === null) {
2727
console.error(`Error removing user from queue for key: ${key}`);
2828
}
29-
3029
return user;
3130
}
3231
return null;
@@ -111,12 +110,13 @@ export const processNewUser = async (user: User): Promise<void> => {
111110
user2: user,
112111
});
113112
} else {
113+
// Add to the topic queue if no match
114114
await transferToTopicQueue(user);
115115
}
116116
};
117117

118118
// Start background transfer process, polling every 5 sec
119-
export const startBackgroundTransfer = (): void => {
119+
export const startBackgroundTransfer = () => {
120120
setInterval(async () => {
121121
await processOldUsers();
122122
}, 5 * SECONDS);

matching-service/src/services/rabbitMqService.ts

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,68 @@
11
import amqp, { Channel } from "amqplib";
2-
import { storeUser } from "../model/userModel";
3-
import { User } from "../types";
4-
import { processNewUser } from "./matchingService";
52

63
let channel: Channel;
74
const rabbit_url = process.env.RABBITMQ_URL || "amqp://localhost";
85

9-
export const initRabbitMQ = async (queue: string): Promise<void> => {
6+
/**
7+
* Establish a connection to RabbitMQ
8+
*/
9+
export const initRabbitMQ = async (): Promise<void> => {
1010
try {
1111
const connection = await amqp.connect(rabbit_url);
1212
channel = await connection.createChannel();
13-
await channel.assertQueue(queue, { durable: true });
14-
console.log(`RabbitMQ initialised and subscribed to ${queue}`);
15-
16-
// Start consuming messages from matching-service
17-
channel.consume(queue, async (msg) => {
18-
if (msg !== null) {
19-
const user: User = JSON.parse(msg.content.toString());
20-
console.log("Received message from queue:", user);
21-
22-
// Store the user into Redis
23-
try {
24-
await processNewUser(user);
25-
} catch (err) {
26-
console.error("Error storing user in Redis:", err);
27-
}
28-
29-
channel.ack(msg);
30-
}
31-
});
13+
console.log("Connected to RabbitMQ");
3214
} catch (err) {
3315
console.error("RabbitMQ connection error:", err);
3416
}
3517
};
3618

19+
/**
20+
* Get the RabbitMQ channel
21+
* @returns RabbitMQ channel
22+
*/
3723
export const getChannel = (): amqp.Channel => {
3824
if (!channel) {
3925
throw new Error("RabbitMQ channel is not initialized");
4026
}
4127
return channel;
4228
};
4329

44-
// Send a payload to a specified RabbitMQ queue
30+
/**
31+
* Subscribes to a RabbitMQ queue and calls the callback function when a message is received.
32+
* @param queue queue to subscribe to
33+
* @param callback callback when message is received
34+
*/
35+
export const subscribeToQueue = async <T>(
36+
queue: string,
37+
callback: (message: T) => void,
38+
): Promise<void> => {
39+
try {
40+
if (!channel) throw new Error("RabbitMQ channel is not initialized");
41+
42+
await channel.assertQueue(queue);
43+
console.log(`Subscribed to RabbitMQ queue "${queue}"`);
44+
45+
channel.consume(queue, (msg) => {
46+
if (msg !== null) {
47+
const message: T = JSON.parse(msg.content.toString());
48+
console.log(
49+
`Received message from RabbitMQ queue "${queue}":`,
50+
message,
51+
);
52+
callback(message);
53+
channel.ack(msg);
54+
}
55+
});
56+
} catch (err) {
57+
console.error(`Error subscribing to RabbitMQ queue "${queue}":`, err);
58+
}
59+
};
60+
61+
/**
62+
* Sends a payload to a RabbitMQ queue
63+
* @param queue queue to send payload to
64+
* @param payload payload object to send
65+
*/
4566
export const sendToQueue = async (
4667
queue: string,
4768
payload: Object,

0 commit comments

Comments
 (0)