Skip to content

Commit 2f86688

Browse files
committed
Remove minimum 1024 cache size for faster
rebalances in case of long processing time
1 parent 4ad7f02 commit 2f86688

File tree

4 files changed

+30
-14
lines changed

4 files changed

+30
-14
lines changed

ci/tests/run_perf_test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,14 +232,14 @@ async function main() {
232232
consumerKjsMessageMaxLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachMessage):');
233233
consumerKjsMessageAvgLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachMessage):');
234234
consumerKjsMessageMaxLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachMessage):');
235+
consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):');
235236
consumerKjsMessageAverageRSS = extractValue(outputKjsProducerConsumer, '=== Average consumer-each-message RSS KB:');
236237
consumerKjsMessageMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max consumer-each-message RSS KB:');
237238
consumerKjsMessageAverageBrokerLag = extractValue(outputKjsProducerConsumer, `=== Average broker lag (${groupIdEachMessageKafkaJS}):`);
238239
consumerKjsMessageMaxBrokerLag = extractValue(outputKjsProducerConsumer, `=== Max broker lag (${groupIdEachMessageKafkaJS}):`);
239240
consumerKjsMessageTotalLagMeasurements = extractValue(outputKjsProducerConsumer, `=== Sample size for broker lag measurement (${groupIdEachMessageKafkaJS}):`);
240241
}
241242
if (consumerModeAll || consumerModeEachBatch) {
242-
consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):');
243243
consumerKjsBatch = extractValue(outputKjsProducerConsumer, '=== Consumer Rate MB/s (eachBatch):');
244244
consumerKjsBatchRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachBatch):');
245245
consumerKjsBatchAvgLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachBatch):');

examples/performance/performance-primitives-common.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
190190

191191
if (actionOnMessages) {
192192
await actionOnMessages(batch.messages);
193-
if (messagesMeasured > 0) {
193+
if (messagesMeasured > 0 && messages.length > 0) {
194194
let i = 1;
195195
const now = Date.now();
196196
for (const message of messages) {

lib/kafkajs/_consumer.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -957,6 +957,9 @@ class Consumer {
957957

958958
#updateMaxMessageCacheSize() {
959959
if (this.#maxBatchSize === -1) {
960+
// In case of unbounded max batch size it returns all available messages
961+
// for a partition in each batch. Cache is unbounded as well as
962+
// it takes only one call to process each partition.
960963
return;
961964
}
962965

@@ -965,15 +968,16 @@ class Consumer {
965968
nowNs > this.#lastFetchClockNs) {
966969
const consumptionDurationSeconds = Number(nowNs - this.#lastFetchClockNs) / 1e9;
967970
const messagesPerSecondSingleWorker = this.#lastFetchedMessageCnt / this.#lastFetchedConcurrency / consumptionDurationSeconds;
968-
// Keep enough messages in the cache for 1.5 seconds of consumption.
971+
// Keep enough messages in the cache for 1.5 seconds of concurrent consumption.
969972
this.#messageCacheMaxSize = Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency;
970973
const minCacheSize = this.#runConfig.eachBatch ? this.#maxBatchesSize : this.#concurrency;
971974
if (this.#messageCacheMaxSize < minCacheSize)
975+
// Keep at least one batch or one message per worker.
976+
// It's possible less workers than requested were active in previous run.
972977
this.#messageCacheMaxSize = minCacheSize;
973978
else if (this.#messageCacheMaxSize > minCacheSize * 10)
979+
// Keep at most 10 messages or batches per requested worker.
974980
this.#messageCacheMaxSize = minCacheSize * 10;
975-
if (this.#messageCacheMaxSize < 1024)
976-
this.#messageCacheMaxSize = 1024;
977981
}
978982
}
979983

@@ -1018,7 +1022,7 @@ class Consumer {
10181022
const fetchResult = new DeferredPromise();
10191023
this.#logger.debug(`Attempting to fetch ${size} messages to the message cache`,
10201024
this.#createConsumerBindingMessageMetadata());
1021-
1025+
10221026
this.#updateMaxMessageCacheSize();
10231027
this.#internalClient.consume(size, (err, messages) =>
10241028
fetchResult.resolve([err, messages]));

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,11 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
412412
partitions: partitions,
413413
});
414414

415+
// If you have a large consume time and consuming one message at a time,
416+
// you need to have very small batch sizes to keep the concurrency up.
417+
// It's to avoid having a too large cache and postponing the next fetch
418+
// and so the rebalance too much.
419+
const producer = createProducer({}, {'batch.num.messages': '1'});
415420
await producer.connect();
416421
await consumer.connect();
417422
await consumer.subscribe({ topic: topicName });
@@ -448,6 +453,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
448453
await producer.send({ topic: topicName, messages });
449454
await maxConcurrentWorkersReached;
450455
expect(inProgressMaxValue).toBe(expectedMaxConcurrentWorkers);
456+
await producer.disconnect();
451457
});
452458

453459
it('consume GZIP messages', async () => {
@@ -612,6 +618,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
612618
let assigns = 0;
613619
let revokes = 0;
614620
let lost = 0;
621+
let firstBatchProcessing;
615622
consumer = createConsumer({
616623
groupId,
617624
maxWaitTimeInMs: 100,
@@ -649,14 +656,14 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
649656
receivedMessages++;
650657

651658
try {
652-
if (event.batch.messages.length >= 32) {
653-
expect(event.isStale()).toEqual(false);
654-
await sleep(7500);
655-
/* 7.5s 'processing'
656-
* doesn't exceed max poll interval.
657-
* Cache reset is transparent */
658-
expect(event.isStale()).toEqual(false);
659-
}
659+
expect(event.isStale()).toEqual(false);
660+
await sleep(7500);
661+
/* 7.5s 'processing'
662+
* doesn't exceed max poll interval.
663+
* Cache reset is transparent */
664+
expect(event.isStale()).toEqual(false);
665+
if (firstBatchProcessing === undefined)
666+
firstBatchProcessing = receivedMessages;
660667
} catch (e) {
661668
console.error(e);
662669
errors = true;
@@ -680,6 +687,8 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
680687
/* Triggers revocation */
681688
await consumer.disconnect();
682689

690+
expect(firstBatchProcessing).toBeDefined();
691+
expect(receivedMessages).toBeGreaterThan(firstBatchProcessing);
683692
/* First assignment */
684693
expect(assigns).toEqual(1);
685694
/* Revocation on disconnect */
@@ -777,6 +786,9 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
777786
/* Triggers revocation */
778787
await consumer.disconnect();
779788

789+
expect(firstLongBatchProcessing).toBeDefined();
790+
expect(receivedMessages).toBeGreaterThan(firstLongBatchProcessing);
791+
780792
/* First assignment + assignment after partitions lost */
781793
expect(assigns).toEqual(2);
782794
/* Partitions lost + revocation on disconnect */

0 commit comments

Comments
 (0)