Skip to content

Commit 38aa2dc

Browse files
committed
Use only 1.5 seconds cache size estimation aligned at batch size * concurrency
1 parent 9b556d2 commit 38aa2dc

File tree

3 files changed

+21
-83
lines changed

3 files changed

+21
-83
lines changed

lib/kafkajs/_consumer.js

Lines changed: 12 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,6 @@ class Consumer {
197197
* The number of partitions to consume concurrently as set by the user, or 1.
198198
*/
199199
#concurrency = 1;
200-
201-
202-
#runConfig = null;
203-
204200
/**
205201
* Promise that resolves together with last in progress fetch.
206202
* It's set to null when no fetch is in progress.
@@ -958,7 +954,7 @@ class Consumer {
958954
#updateMaxMessageCacheSize() {
959955
if (this.#maxBatchSize === -1) {
960956
// In case of unbounded max batch size it returns all available messages
961-
// for a partition in each batch. Cache is unbounded as well as
957+
// for a partition in each batch. Cache is unbounded given that
962958
// it takes only one call to process each partition.
963959
return;
964960
}
@@ -969,15 +965,11 @@ class Consumer {
969965
const consumptionDurationSeconds = Number(nowNs - this.#lastFetchClockNs) / 1e9;
970966
const messagesPerSecondSingleWorker = this.#lastFetchedMessageCnt / this.#lastFetchedConcurrency / consumptionDurationSeconds;
971967
// Keep enough messages in the cache for 1.5 seconds of concurrent consumption.
972-
this.#messageCacheMaxSize = Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency;
973-
const minCacheSize = this.#runConfig.eachBatch ? this.#maxBatchesSize : this.#concurrency;
974-
if (this.#messageCacheMaxSize < minCacheSize)
975-
// Keep at least one batch or one message per worker.
976-
// It's possible less workers than requested were active in previous run.
977-
this.#messageCacheMaxSize = minCacheSize;
978-
else if (this.#messageCacheMaxSize > minCacheSize * 10)
979-
// Keep at most 10 messages or batches per requested worker.
980-
this.#messageCacheMaxSize = minCacheSize * 10;
968+
// Round up to the nearest multiple of `#maxBatchesSize`.
969+
this.#messageCacheMaxSize = Math.ceil(
970+
Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency
971+
/ this.#maxBatchesSize
972+
) * this.#maxBatchesSize;
981973
}
982974
}
983975

@@ -1110,24 +1102,6 @@ class Consumer {
11101102
this.#messageCacheMaxSize);
11111103
}
11121104

1113-
/**
1114-
* Consumes n messages from the internal consumer.
1115-
* @returns {Promise<import("../..").Message[]>} A promise that resolves to a list of messages. The size of this list is guaranteed to be less than or equal to n.
1116-
* @note this method cannot be used in conjunction with #consumeSingleCached.
1117-
* @private
1118-
*/
1119-
async #consumeN(n) {
1120-
return new Promise((resolve, reject) => {
1121-
this.#internalClient.consume(n, (err, messages) => {
1122-
if (err) {
1123-
reject(createKafkaJsErrorFromLibRdKafkaError(err));
1124-
return;
1125-
}
1126-
resolve(messages);
1127-
});
1128-
});
1129-
}
1130-
11311105
/**
11321106
* Flattens a list of topics with partitions into a list of topic, partition.
11331107
* @param {Array<({topic: string, partitions: Array<number>}|{topic: string, partition: number})>} topics
@@ -1593,16 +1567,18 @@ class Consumer {
15931567
* @private
15941568
*/
15951569
async #runInternal(config) {
1596-
this.#runConfig = config;
1597-
this.#concurrency = config.partitionsConsumedConcurrently;
1598-
this.#maxBatchesSize = this.#maxBatchSize * this.#concurrency;
15991570
const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor;
16001571
const fetcher = config.eachMessage
16011572
? (savedIdx) => this.#consumeSingleCached(savedIdx)
16021573
: (savedIdx) => this.#consumeCachedN(savedIdx, this.#maxBatchSize);
1603-
this.#workers = [];
16041574

16051575
await this.#lock.write(async () => {
1576+
this.#workers = [];
1577+
this.#concurrency = config.partitionsConsumedConcurrently;
1578+
this.#maxBatchesSize = (
1579+
config.eachBatch && this.#maxBatchSize > 0 ?
1580+
this.#maxBatchSize :
1581+
1) * this.#concurrency;
16061582

16071583
while (!this.#disconnectStarted) {
16081584
if (this.#maxPollIntervalRestart.resolved)
@@ -1639,38 +1615,6 @@ class Consumer {
16391615
this.#maxPollIntervalRestart.resolve();
16401616
}
16411617

1642-
/**
1643-
* Consumes a single message from the consumer within the given timeout.
1644-
* THIS METHOD IS NOT IMPLEMENTED.
1645-
* @note This method cannot be used with run(). Either that, or this must be used.
1646-
*
1647-
* @param {any} args
1648-
* @param {number} args.timeout - the timeout in milliseconds, defaults to 1000.
1649-
* @returns {import("../..").Message|null} a message, or null if the timeout was reached.
1650-
* @private
1651-
*/
1652-
async consume({ timeout } = { timeout: 1000 }) {
1653-
if (this.#state !== ConsumerState.CONNECTED) {
1654-
throw new error.KafkaJSError('consume can only be called while connected.', { code: error.ErrorCodes.ERR__STATE });
1655-
}
1656-
1657-
if (this.#running) {
1658-
throw new error.KafkaJSError('consume() and run() cannot be used together.', { code: error.ErrorCodes.ERR__CONFLICT });
1659-
}
1660-
1661-
this.#internalClient.setDefaultConsumeTimeout(timeout);
1662-
let m = null;
1663-
1664-
try {
1665-
const ms = await this.#consumeN(1);
1666-
m = ms[0];
1667-
} finally {
1668-
this.#internalClient.setDefaultConsumeTimeout(undefined);
1669-
}
1670-
1671-
throw new error.KafkaJSError('consume() is not implemented.' + m, { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
1672-
}
1673-
16741618
async #commitOffsetsUntilNoStateErr(offsetsToCommit) {
16751619
let err = { code: error.ErrorCodes.ERR_NO_ERROR };
16761620
do {

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,9 +434,8 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
434434
inProgressMaxValue = Math.max(inProgress, inProgressMaxValue);
435435
if (inProgressMaxValue >= expectedMaxConcurrentWorkers) {
436436
maxConcurrentWorkersReached.resolve();
437-
} else if (messagesConsumed.length > 2048) {
438-
await sleep(1000);
439437
}
438+
await sleep(100);
440439
inProgress--;
441440
},
442441
});

test/promisified/consumer/consumerCacheTests.spec.js

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
142142
* the consumers are created with the same groupId, we create them here.
143143
* TODO: verify correctness of theory. It's conjecture... which solves flakiness. */
144144
let groupId = `consumer-group-id-${secureRandom()}`;
145+
const multiplier = 9;
146+
const numMessages = 16 * multiplier;
145147
consumer = createConsumer({
146148
groupId,
147149
maxWaitTimeInMs: 100,
@@ -164,7 +166,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
164166
const messagesConsumed = [];
165167
const messagesConsumedConsumer1 = [];
166168
const messagesConsumedConsumer2 = [];
167-
let consumer2ConsumeRunning = false;
168169

169170
consumer.run({
170171
partitionsConsumedConcurrently,
@@ -176,18 +177,13 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
176177
{ topic: event.topic, partition: event.partition, offset: Number(event.message.offset) + 1 },
177178
]);
178179

179-
/* Until the second consumer joins, consume messages slowly so as to not consume them all
180-
* before the rebalance triggers. */
181-
if (messagesConsumed.length > 1024 && !consumer2ConsumeRunning) {
182-
await sleep(10);
183-
}
180+
await sleep(100);
184181
}
185182
});
186183

187184
/* Evenly distribute 1024*9 messages across 3 partitions */
188185
let i = 0;
189-
const multiplier = 9;
190-
const messages = Array(1024 * multiplier)
186+
const messages = Array(numMessages)
191187
.fill()
192188
.map(() => {
193189
const value = secureRandom();
@@ -198,7 +194,7 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
198194

199195
// Wait for the messages - some of them, before starting the
200196
// second consumer.
201-
await waitForMessages(messagesConsumed, { number: 1024 });
197+
await waitForMessages(messagesConsumed, { number: 16 });
202198

203199
await consumer2.connect();
204200
await consumer2.subscribe({ topic: topicName });
@@ -210,18 +206,17 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
210206
});
211207

212208
await waitFor(() => consumer2.assignment().length > 0, () => null);
213-
consumer2ConsumeRunning = true;
214209

215210
/* Now that both consumers have joined, wait for all msgs to be consumed */
216-
await waitForMessages(messagesConsumed, { number: 1024 * multiplier });
211+
await waitForMessages(messagesConsumed, { number: numMessages });
217212

218213
/* No extra messages should be consumed. */
219214
await sleep(1000);
220-
expect(messagesConsumed.length).toEqual(1024 * multiplier);
215+
expect(messagesConsumed.length).toEqual(numMessages);
221216

222217
/* Check if all messages were consumed. */
223218
expect(messagesConsumed.map(event => (+event.message.offset)).sort((a, b) => a - b))
224-
.toEqual(Array(1024 * multiplier).fill().map((_, i) => Math.floor(i / 3)));
219+
.toEqual(Array(numMessages).fill().map((_, i) => Math.floor(i / 3)));
225220

226221
/* Consumer2 should have consumed at least one message. */
227222
expect(messagesConsumedConsumer2.length).toBeGreaterThan(0);

0 commit comments

Comments
 (0)