Skip to content
19 changes: 19 additions & 0 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,30 @@ export abstract class MessageQueue {
return;
}

// If we'd go over maxMessages or MAX_BATCH_BYTES by re-queueing this
// message, flush first
const {maxMessages} = this._options;
const size = Buffer.byteLength(message.message.ackId, 'utf8');
if (
this._requests.length + 1 >= maxMessages! ||
this.bytes + size >= MAX_BATCH_BYTES
) {
const reason =
this._requests.length + 1 >= maxMessages!
? 'going over count'
: 'going over size';

// No need to wait on this one; it clears the old batch out, and acks
// are best effort.
this.flush(reason).catch(() => {});
}

// Just throw it in for another round of processing on the next batch.
this._requests.push(message);
this.numPendingRequests++;
this.numInFlightRequests++;
this.numInRetryRequests--;
this.bytes += size;

// Make sure we actually do have another batch scheduled.
if (!this._timer) {
Expand Down
Loading