Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .nsprc
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@
"GHSA-4hjh-wcwx-xvwj": {
"active": true,
"notes": "should be removed when maester-client is fixed: https://github.com/elasticio/maester-client/issues/47"
},
"GHSA-869p-cjfg-cm3x": {
"active": true,
"notes": "we don't use HMAC signature"
}
}
27 changes: 25 additions & 2 deletions lib/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -336,16 +336,39 @@ class Amqp {
return message;
}

async getMessageToAck(messageId) {
let message;
let retryCount = 0;
for (let i = 0; i <= this.settings.AMQP_PUBLISH_RETRY_ATTEMPTS; i++) {
await this._ensureConsumerChannel();
message = await this.getMostRecentMessage(messageId);
if (this.consumerChannel) {
break;
}
// Make sure consumer channel is available after receiving the message
log.debug({ messageId, retryCount: retryCount++ }, 'Waiting for consumer channel to ack/nack message');
const backoffTime = this._getDelay(
this.settings.AMQP_PUBLISH_RETRY_DELAY,
this.settings.AMQP_PUBLISH_MAX_RETRY_DELAY,
retryCount
);
await this._sleep(backoffTime);
}
return message;
}

async ack(messageId) {
const message = await this.getMostRecentMessage(messageId);
const message = await this.getMessageToAck(messageId);
log.debug(message.fields, 'Message ack');
assert(this.consumerChannel, 'Consumer channel is not available to ack message');
this.consumerChannel.ack(message);
messagesDB.deleteMessage(messageId);
}

async reject(messageId) {
const message = await this.getMostRecentMessage(messageId);
const message = await this.getMessageToAck(messageId);
log.debug(message.fields, 'Message reject');
assert(this.consumerChannel, 'Consumer channel is not available to reject message');
this.consumerChannel.reject(message, false);
messagesDB.deleteMessage(messageId);
}
Expand Down
57 changes: 57 additions & 0 deletions mocha_spec/unit/amqp.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,63 @@ describe('AMQP', () => {
expect(messagesDB.deleteMessage).to.have.been.calledOnce.and.calledWith(messageId);
});

it('Should ack message after connection close, reconnect and receiving message with new consumerTag', async () => {
const oldMessage = {
...message,
fields: {
...message.fields,
consumerTag: 'oldConsumerTag'
}
};
const newMessage = {
...message,
fields: {
...message.fields,
consumerTag: 'newConsumerTag'
}
};

messagesDB.addMessage(messageId, oldMessage);
const amqp = new Amqp(settings);

// Mock the _ensureConsumerChannel to simulate reconnection logic
let channelCreationCount = 0;
sandbox.stub(amqp, '_ensureConsumerChannel').callsFake(async () => {
channelCreationCount++;
if (channelCreationCount === 1) {
// First call: no consumer channel (connection was closed)
amqp.consumerChannel = undefined;
return undefined;
}
// Second call: new consumer channel after reconnect
amqp.consumerChannel = {
ack: sandbox.stub()
};
return amqp.consumerChannel;
});

// Set up consume object with new consumer tag after reconnect
amqp.consume = {
consumerTag: 'newConsumerTag'
};

sandbox.spy(messagesDB, 'deleteMessage');

// Simulate the ack flow with reconnection
await Promise.all([
new Promise(resolve => setTimeout(() => {
// Update the message with new consumerTag after reconnection
messagesDB.addMessage(messageId, newMessage);
resolve();
}, 100)),
amqp.ack(messageId)
]);

expect(amqp._ensureConsumerChannel).to.have.been.calledTwice;
expect(amqp.consumerChannel.ack).to.have.been.calledOnce.and.calledWith(newMessage);
expect(messagesDB.deleteMessage).to.have.been.calledOnce.and.calledWith(messageId);
});

it('Should reject original message when ack is called with false', () => {
const amqp = new Amqp(settings);
amqp.consumerChannel = {
Expand Down