Skip to content

Commit 72fb2f4

Browse files
committed
feat(rabbitmq): implement robust connection handling with reconnection logic and error logging
1 parent e86b646 commit 72fb2f4

File tree

1 file changed

+189
-28
lines changed

1 file changed

+189
-28
lines changed

src/api/integrations/event/rabbitmq/rabbitmq.controller.ts

Lines changed: 189 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ import { EmitData, EventController, EventControllerInterface } from '../event.co
88

99
export class RabbitmqController extends EventController implements EventControllerInterface {
1010
public amqpChannel: amqp.Channel | null = null;
11+
private amqpConnection: amqp.Connection | null = null;
1112
private readonly logger = new Logger('RabbitmqController');
13+
private reconnectAttempts = 0;
14+
private maxReconnectAttempts = 10;
15+
private reconnectDelay = 5000; // 5 seconds
16+
private isReconnecting = false;
1217

1318
constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) {
1419
super(prismaRepository, waMonitor, configService.get<Rabbitmq>('RABBITMQ')?.ENABLED, 'rabbitmq');
@@ -19,7 +24,11 @@ export class RabbitmqController extends EventController implements EventControll
1924
return;
2025
}
2126

22-
await new Promise<void>((resolve, reject) => {
27+
await this.connect();
28+
}
29+
30+
private async connect(): Promise<void> {
31+
return new Promise<void>((resolve, reject) => {
2332
const uri = configService.get<Rabbitmq>('RABBITMQ').URI;
2433
const frameMax = configService.get<Rabbitmq>('RABBITMQ').FRAME_MAX;
2534
const rabbitmqExchangeName = configService.get<Rabbitmq>('RABBITMQ').EXCHANGE_NAME;
@@ -33,39 +42,143 @@ export class RabbitmqController extends EventController implements EventControll
3342
password: url.password || 'guest',
3443
vhost: url.pathname.slice(1) || '/',
3544
frameMax: frameMax,
45+
heartbeat: 30, // Add heartbeat of 30 seconds
3646
};
3747

3848
amqp.connect(connectionOptions, (error, connection) => {
3949
if (error) {
50+
this.logger.error({
51+
local: 'RabbitmqController.connect',
52+
message: 'Failed to connect to RabbitMQ',
53+
error: error.message || error,
54+
});
4055
reject(error);
41-
4256
return;
4357
}
4458

59+
// Connection event handlers
60+
connection.on('error', (err) => {
61+
this.logger.error({
62+
local: 'RabbitmqController.connectionError',
63+
message: 'RabbitMQ connection error',
64+
error: err.message || err,
65+
});
66+
this.handleConnectionLoss();
67+
});
68+
69+
connection.on('close', () => {
70+
this.logger.warn('RabbitMQ connection closed');
71+
this.handleConnectionLoss();
72+
});
73+
4574
connection.createChannel((channelError, channel) => {
4675
if (channelError) {
76+
this.logger.error({
77+
local: 'RabbitmqController.createChannel',
78+
message: 'Failed to create RabbitMQ channel',
79+
error: channelError.message || channelError,
80+
});
4781
reject(channelError);
48-
4982
return;
5083
}
5184

85+
// Channel event handlers
86+
channel.on('error', (err) => {
87+
this.logger.error({
88+
local: 'RabbitmqController.channelError',
89+
message: 'RabbitMQ channel error',
90+
error: err.message || err,
91+
});
92+
this.handleConnectionLoss();
93+
});
94+
95+
channel.on('close', () => {
96+
this.logger.warn('RabbitMQ channel closed');
97+
this.handleConnectionLoss();
98+
});
99+
52100
const exchangeName = rabbitmqExchangeName;
53101

54102
channel.assertExchange(exchangeName, 'topic', {
55103
durable: true,
56104
autoDelete: false,
57105
});
58106

107+
this.amqpConnection = connection;
59108
this.amqpChannel = channel;
109+
this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection
110+
this.isReconnecting = false;
60111

61-
this.logger.info('AMQP initialized');
112+
this.logger.info('AMQP initialized successfully');
62113

63114
resolve();
64115
});
65116
});
66-
}).then(() => {
67-
if (configService.get<Rabbitmq>('RABBITMQ')?.GLOBAL_ENABLED) this.initGlobalQueues();
68-
});
117+
})
118+
.then(() => {
119+
if (configService.get<Rabbitmq>('RABBITMQ')?.GLOBAL_ENABLED) {
120+
this.initGlobalQueues();
121+
}
122+
})
123+
.catch((error) => {
124+
this.logger.error({
125+
local: 'RabbitmqController.init',
126+
message: 'Failed to initialize AMQP',
127+
error: error.message || error,
128+
});
129+
this.scheduleReconnect();
130+
throw error;
131+
});
132+
}
133+
134+
private handleConnectionLoss(): void {
135+
if (this.isReconnecting) {
136+
return; // Already attempting to reconnect
137+
}
138+
139+
this.amqpChannel = null;
140+
this.amqpConnection = null;
141+
this.scheduleReconnect();
142+
}
143+
144+
private scheduleReconnect(): void {
145+
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
146+
this.logger.error(
147+
`Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`,
148+
);
149+
return;
150+
}
151+
152+
if (this.isReconnecting) {
153+
return; // Already scheduled
154+
}
155+
156+
this.isReconnecting = true;
157+
this.reconnectAttempts++;
158+
159+
const delay = this.reconnectDelay * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5)); // Exponential backoff with max delay
160+
161+
this.logger.info(
162+
`Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`,
163+
);
164+
165+
setTimeout(async () => {
166+
try {
167+
this.logger.info(
168+
`Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`,
169+
);
170+
await this.connect();
171+
this.logger.info('Successfully reconnected to RabbitMQ');
172+
} catch (error) {
173+
this.logger.error({
174+
local: 'RabbitmqController.scheduleReconnect',
175+
message: `Reconnection attempt ${this.reconnectAttempts} failed`,
176+
error: error.message || error,
177+
});
178+
this.isReconnecting = false;
179+
this.scheduleReconnect();
180+
}
181+
}, delay);
69182
}
70183

71184
private set channel(channel: amqp.Channel) {
@@ -76,6 +189,17 @@ export class RabbitmqController extends EventController implements EventControll
76189
return this.amqpChannel;
77190
}
78191

192+
private async ensureConnection(): Promise<boolean> {
193+
if (!this.amqpChannel) {
194+
this.logger.warn('AMQP channel is not available, attempting to reconnect...');
195+
if (!this.isReconnecting) {
196+
this.scheduleReconnect();
197+
}
198+
return false;
199+
}
200+
return true;
201+
}
202+
79203
public async emit({
80204
instanceName,
81205
origin,
@@ -95,6 +219,11 @@ export class RabbitmqController extends EventController implements EventControll
95219
return;
96220
}
97221

222+
if (!(await this.ensureConnection())) {
223+
this.logger.warn(`Failed to emit event ${event} for instance ${instanceName}: No AMQP connection`);
224+
return;
225+
}
226+
98227
const instanceRabbitmq = await this.get(instanceName);
99228
const rabbitmqLocal = instanceRabbitmq?.events;
100229
const rabbitmqGlobal = configService.get<Rabbitmq>('RABBITMQ').GLOBAL_ENABLED;
@@ -154,7 +283,15 @@ export class RabbitmqController extends EventController implements EventControll
154283

155284
break;
156285
} catch (error) {
286+
this.logger.error({
287+
local: 'RabbitmqController.emit',
288+
message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`,
289+
error: error.message || error,
290+
});
157291
retry++;
292+
if (retry >= 3) {
293+
this.handleConnectionLoss();
294+
}
158295
}
159296
}
160297
}
@@ -199,7 +336,15 @@ export class RabbitmqController extends EventController implements EventControll
199336

200337
break;
201338
} catch (error) {
339+
this.logger.error({
340+
local: 'RabbitmqController.emit',
341+
message: `Error publishing global RabbitMQ message (attempt ${retry + 1}/3)`,
342+
error: error.message || error,
343+
});
202344
retry++;
345+
if (retry >= 3) {
346+
this.handleConnectionLoss();
347+
}
203348
}
204349
}
205350
}
@@ -208,41 +353,57 @@ export class RabbitmqController extends EventController implements EventControll
208353
private async initGlobalQueues(): Promise<void> {
209354
this.logger.info('Initializing global queues');
210355

356+
if (!(await this.ensureConnection())) {
357+
this.logger.error('Cannot initialize global queues: No AMQP connection');
358+
return;
359+
}
360+
211361
const rabbitmqExchangeName = configService.get<Rabbitmq>('RABBITMQ').EXCHANGE_NAME;
212362
const events = configService.get<Rabbitmq>('RABBITMQ').EVENTS;
213363
const prefixKey = configService.get<Rabbitmq>('RABBITMQ').PREFIX_KEY;
214364

215365
if (!events) {
216366
this.logger.warn('No events to initialize on AMQP');
217-
218367
return;
219368
}
220369

221370
const eventKeys = Object.keys(events);
222371

223-
eventKeys.forEach((event) => {
224-
if (events[event] === false) return;
372+
for (const event of eventKeys) {
373+
if (events[event] === false) continue;
225374

226-
const queueName =
227-
prefixKey !== ''
228-
? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}`
229-
: `${event.replace(/_/g, '.').toLowerCase()}`;
230-
const exchangeName = rabbitmqExchangeName;
375+
try {
376+
const queueName =
377+
prefixKey !== ''
378+
? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}`
379+
: `${event.replace(/_/g, '.').toLowerCase()}`;
380+
const exchangeName = rabbitmqExchangeName;
231381

232-
this.amqpChannel.assertExchange(exchangeName, 'topic', {
233-
durable: true,
234-
autoDelete: false,
235-
});
382+
await this.amqpChannel.assertExchange(exchangeName, 'topic', {
383+
durable: true,
384+
autoDelete: false,
385+
});
236386

237-
this.amqpChannel.assertQueue(queueName, {
238-
durable: true,
239-
autoDelete: false,
240-
arguments: {
241-
'x-queue-type': 'quorum',
242-
},
243-
});
387+
await this.amqpChannel.assertQueue(queueName, {
388+
durable: true,
389+
autoDelete: false,
390+
arguments: {
391+
'x-queue-type': 'quorum',
392+
},
393+
});
394+
395+
await this.amqpChannel.bindQueue(queueName, exchangeName, event);
244396

245-
this.amqpChannel.bindQueue(queueName, exchangeName, event);
246-
});
397+
this.logger.info(`Global queue initialized: ${queueName}`);
398+
} catch (error) {
399+
this.logger.error({
400+
local: 'RabbitmqController.initGlobalQueues',
401+
message: `Failed to initialize global queue for event ${event}`,
402+
error: error.message || error,
403+
});
404+
this.handleConnectionLoss();
405+
break;
406+
}
407+
}
247408
}
248409
}

0 commit comments

Comments
 (0)