Skip to content

Commit 9798269

Browse files
authored
fix(broker): cleanup channel consumers after cancellation (#84)
* fixes #53: Handle server initiated consumer cancellations * fixes #54: Clean up channel consumers after consumers are cancelled. * fixes #67: Skip consumers that fail to start while resurrecting.
1 parent 59be1ec commit 9798269

File tree

3 files changed

+83
-29
lines changed

3 files changed

+83
-29
lines changed

src/broker.test.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,5 +241,21 @@ describe('NodeAmqpBroker', () => {
241241
expect((broker as any)._channelConsumers.get(newChannel)).to.include(consumer);
242242
expect((broker as any)._channelConsumers.get(mockChannel)).to.be.undefined;
243243
});
244+
245+
it('should skip rewiring if the channel does not start a consumer', async () => {
246+
await broker.connect();
247+
const consumer = await broker.consume('test-queue');
248+
const newChannel = createMockChannel(6);
249+
const oldChannel = consumer.channel;
250+
251+
// Simulate a channel that does not start a consumer
252+
newChannel.consume.rejects(new Error('Consumer not started'));
253+
254+
broker.pool!.emit('channelReplaced', oldChannel, newChannel);
255+
await new Promise(r => setImmediate(r)); // give dispatch loop a tick
256+
257+
sinon.assert.calledOnce(newChannel.consume);
258+
expect((broker as any)._channelConsumers.get(newChannel)).to.be.empty;
259+
});
244260
});
245261
});

src/broker.ts

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,26 @@ export default class NodeAmqpBroker {
224224
}
225225
}
226226

227+
private async startConsumer(ch: Channel, consumer: Consumer) {
228+
const res = await ch.consume(consumer.queue, (msg: ConsumeMessage | null) => {
229+
if (msg) {
230+
consumer.handleDelivery(msg);
231+
} else {
232+
consumer.handleCancel(true);
233+
}
234+
});
235+
236+
consumer.tag = res.consumerTag;
237+
consumer.channel = ch;
238+
239+
// clear any previous cancel event handler, since this consumer might have
240+
// been rewired to a new channel
241+
consumer.removeAllListeners('cancel');
242+
consumer.once('cancel', () => {
243+
this._channelConsumers.get(ch)?.delete(consumer);
244+
});
245+
}
246+
227247
/**
228248
* Recreates consumers on a new channel in the same way as the old one
229249
* @todo properly handle errors, e.g. if the channel is closed when we try to consume (#67)
@@ -233,47 +253,37 @@ export default class NodeAmqpBroker {
233253
if (!set || set.size < 1) return;
234254

235255
for (const consumer of set) {
236-
// FIXME (#67): handle potential errors
237-
const res = await newCh.consume(consumer.queue, (msg: ConsumeMessage | null) => {
238-
if (msg && consumer) {
239-
consumer.handleDelivery(msg);
240-
}
241-
});
242-
243-
consumer.tag = res.consumerTag;
244-
consumer.channel = newCh;
245-
consumer.resume();
256+
try {
257+
await this.startConsumer(newCh, consumer);
258+
consumer.resume();
259+
} catch (ex: any) {
260+
set.delete(consumer);
261+
debug('Failed to rewire consumer', { queue: consumer.queue, reason: ex });
262+
}
246263
}
247264

248265
this._channelConsumers.delete(ch);
249266
this._channelConsumers.set(newCh, set);
250267
}
251268

252-
private async _startConsumer(ch: Channel, queue: string): Promise<Consumer> {
269+
private async createConsumer(ch: Channel, queue: string): Promise<Consumer> {
253270
let consumer = new Consumer(ch, queue);
271+
await this.startConsumer(ch, consumer);
254272

255-
return await ch.consume(queue, (msg: ConsumeMessage | null) => {
256-
if (msg) {
257-
consumer.handleDelivery(msg);
258-
}
259-
}).then((res: Replies.Consume) => {
260-
consumer.tag = res.consumerTag;
261-
262-
if (!this._channelConsumers.has(ch)) {
263-
this._channelConsumers.set(ch, new Set([consumer]));
264-
} else {
265-
const set = this._channelConsumers.get(ch);
266-
set!.add(consumer);
267-
}
273+
if (!this._channelConsumers.has(ch)) {
274+
this._channelConsumers.set(ch, new Set([consumer]));
275+
} else {
276+
const set = this._channelConsumers.get(ch);
277+
set!.add(consumer);
278+
}
268279

269-
return consumer;
270-
});
280+
return consumer;
271281
}
272282

273283
public consume(queue: string): PromiseLike<Consumer> {
274284
if (!this.pool) throw new PeanarAdapterError('Not connected!');
275285

276-
return this.pool.acquireAndRun(async ch => this._startConsumer(ch, queue));
286+
return this.pool.acquireAndRun(async ch => this.createConsumer(ch, queue));
277287
}
278288

279289
public consumeOver(queues: string[]) {
@@ -283,7 +293,7 @@ export default class NodeAmqpBroker {
283293
return {
284294
queue,
285295
channel: ch,
286-
consumer: await this._startConsumer(ch, queue)
296+
consumer: await this.createConsumer(ch, queue)
287297
};
288298
});
289299
}

test/broker.test.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { rejects } from 'assert';
44

55
import sinon from 'sinon';
66
import { expect } from 'chai';
7-
import amqplib, { ChannelModel } from 'amqplib';
7+
import amqplib, { Channel, ChannelModel } from 'amqplib';
88
import { IConnectionParams, IMessage } from '../src/types';
99

1010
import { brokerOptions } from './config';
@@ -403,6 +403,34 @@ describe('Broker', () => {
403403
expect(consumerCount).to.be.eq(3);
404404
await Promise.all(consumers.map(c => c.consumer.cancel()));
405405
});
406+
407+
it('handles client cancellation', async function() {
408+
const consumer = await broker.consume(q1);
409+
const canceledPromise = once(consumer, 'cancel');
410+
411+
// Cancel the consumer
412+
await consumer.cancel();
413+
414+
const [{ server }] = await canceledPromise;
415+
expect(server).to.be.false;
416+
417+
expect(broker.channelConsumers.size).to.eq(1);
418+
expect(broker.channelConsumers.get(consumer.channel as Channel)).to.be.empty;
419+
});
420+
421+
it('handles server cancellation', async function() {
422+
const consumer = await broker.consume(q1);
423+
const canceledPromise = once(consumer, 'cancel');
424+
425+
// Cause a server-initiated cancellation
426+
await broker.pool!.acquireAndRun(ch => ch.deleteQueue(q1));
427+
428+
const [{ server }] = await canceledPromise;
429+
expect(server).to.be.true;
430+
431+
expect(broker.channelConsumers.size).to.eq(1);
432+
expect(broker.channelConsumers.get(consumer.channel as Channel)).to.be.empty;
433+
});
406434
});
407435

408436
describe('Error handling', function() {

0 commit comments

Comments
 (0)