Skip to content

Commit 564be4f

Browse files
committed
add reconnection to rabbit
1 parent b7b7a17 commit 564be4f

File tree

2 files changed

+36
-6
lines changed

2 files changed

+36
-6
lines changed

matching-service/src/app.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@ app.use(`${apiVersion}/`, messageRoutes);
3333
app.listen(PORT, async () => {
3434
await initRabbitMQ();
3535

36-
// Subscribe to the matching service queue
37-
await subscribeToQueue<User>(QUEUE, processNewUser);
38-
3936
try {
4037
await redisClient.connect();
4138
} catch (err) {

matching-service/src/services/rabbitMqService.ts

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,51 @@
11
import amqp, { Channel } from "amqplib";
2+
import { User } from "../types";
3+
import { processNewUser } from "./matchingService";
4+
import { SECONDS } from "../lib/constants";
25

36
let channel: Channel;
4-
const rabbit_url = process.env.RABBITMQ_URL || "amqp://localhost";
7+
const RABBITMQ_URL = process.env.RABBITMQ_URL || "amqp://localhost";
8+
const QUEUE = process.env.MATCHING_SERVICE_QUEUE || "matching-service";
59

610
/**
711
* Establish a connection to RabbitMQ
812
*/
913
export const initRabbitMQ = async (): Promise<void> => {
1014
try {
11-
const connection = await amqp.connect(rabbit_url);
15+
const connection = await amqp.connect(RABBITMQ_URL);
1216
channel = await connection.createChannel();
1317
console.log("Connected to RabbitMQ");
18+
19+
// Subscribe to the queue upon successful connection
20+
await subscribeToQueue<User>(QUEUE, processNewUser);
21+
22+
// when connection close, initiate reconnection
23+
connection.on("close", async () => {
24+
await reconnectRabbitMQ();
25+
});
1426
} catch (err) {
15-
console.error("RabbitMQ connection error:", err);
27+
console.error(
28+
"Error initialising connection with RabbitMQ connection error:",
29+
err,
30+
);
31+
}
32+
};
33+
34+
/**
35+
* This will only trigger after an initial connection is made
36+
*/
37+
export const reconnectRabbitMQ = async (): Promise<void> => {
38+
const reconnectDelay = 30 * SECONDS;
39+
while (true) {
40+
try {
41+
await initRabbitMQ();
42+
break; // Break out of the loop if successfully connected
43+
} catch (err) {
44+
console.log(
45+
`Retrying to connect to RabbitMQ in ${reconnectDelay} seconds...`,
46+
);
47+
await new Promise((resolve) => setTimeout(resolve, reconnectDelay));
48+
}
1649
}
1750
};
1851

0 commit comments

Comments
 (0)