Skip to content

Commit 3cc0d98

Browse files
Merge pull request #15032 from yatin166/fix/close-amqp-connection-properly
fix(microservices): ensure all the amqp connections are closed properly
2 parents 886104e + 771e1d1 commit 3cc0d98

File tree

2 files changed

+29
-26
lines changed

2 files changed

+29
-26
lines changed

packages/microservices/server/server-rmq.ts

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,9 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
100100
}
101101
}
102102

103-
public close(): void {
104-
this.channel && this.channel.close();
105-
this.server && this.server.close();
103+
public async close(): Promise<void> {
104+
this.channel && (await this.channel.close());
105+
this.server && (await this.server.close());
106106
this.pendingEventListeners = [];
107107
}
108108

@@ -135,25 +135,28 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
135135
this.pendingEventListeners = [];
136136

137137
const connectFailedEvent = 'connectFailed';
138-
this.server!.once(connectFailedEvent, (error: Record<string, unknown>) => {
139-
this._status$.next(RmqStatus.DISCONNECTED);
140-
141-
this.logger.error(CONNECTION_FAILED_MESSAGE);
142-
if (error?.err) {
143-
this.logger.error(error.err);
144-
}
145-
const isReconnecting = !!this.channel;
146-
if (
147-
maxConnectionAttempts === INFINITE_CONNECTION_ATTEMPTS ||
148-
isReconnecting
149-
) {
150-
return;
151-
}
152-
if (++this.connectionAttempts === maxConnectionAttempts) {
153-
this.close();
154-
callback?.(error.err ?? new Error(CONNECTION_FAILED_MESSAGE));
155-
}
156-
});
138+
this.server!.once(
139+
connectFailedEvent,
140+
async (error: Record<string, unknown>) => {
141+
this._status$.next(RmqStatus.DISCONNECTED);
142+
143+
this.logger.error(CONNECTION_FAILED_MESSAGE);
144+
if (error?.err) {
145+
this.logger.error(error.err);
146+
}
147+
const isReconnecting = !!this.channel;
148+
if (
149+
maxConnectionAttempts === INFINITE_CONNECTION_ATTEMPTS ||
150+
isReconnecting
151+
) {
152+
return;
153+
}
154+
if (++this.connectionAttempts === maxConnectionAttempts) {
155+
await this.close();
156+
callback?.(error.err ?? new Error(CONNECTION_FAILED_MESSAGE));
157+
}
158+
},
159+
);
157160
}
158161

159162
public createClient<T = any>(): T {

packages/microservices/test/server/server-rmq.spec.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,12 @@ describe('ServerRMQ', () => {
7878
untypedServer.server = rmqServer;
7979
untypedServer.channel = rmqChannel;
8080
});
81-
it('should close server', () => {
82-
server.close();
81+
it('should close server', async () => {
82+
await server.close();
8383
expect(rmqServer.close.called).to.be.true;
8484
});
85-
it('should close channel', () => {
86-
server.close();
85+
it('should close channel', async () => {
86+
await server.close();
8787
expect(rmqChannel.close.called).to.be.true;
8888
});
8989
});

0 commit comments

Comments
 (0)