Skip to content

Commit 9e73424

Browse files
committed
Batch max size is whatever available in the queue for that partition
1 parent ed0fb03 commit 9e73424

File tree

1 file changed

+3
-48
lines changed

1 file changed

+3
-48
lines changed

lib/kafkajs/_consumer.js

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -155,17 +155,6 @@ class Consumer {
155155
*/
156156
#messageCache = null;
157157

158-
/**
159-
* The maximum size of the message cache.
160-
* Will be adjusted dynamically.
161-
*/
162-
#messageCacheMaxSize = 1;
163-
164-
/**
165-
* Number of times we tried to increase the cache.
166-
*/
167-
#increaseCount = 0;
168-
169158
/**
170159
* Whether the user has enabled manual offset management (commits).
171160
*/
@@ -311,8 +300,6 @@ class Consumer {
311300
* consumed messages upto N from the internalClient, but the user has stale'd the cache
312301
* after consuming just k (< N) messages. We seek back to last consumed offset + 1. */
313302
this.#messageCache.clear();
314-
this.#messageCacheMaxSize = 1;
315-
this.#increaseCount = 0;
316303
const clearPartitions = this.assignment();
317304
const seeks = [];
318305
for (const topicPartition of clearPartitions) {
@@ -844,33 +831,6 @@ class Consumer {
844831
await this.commitOffsets();
845832
}
846833

847-
/**
848-
* Request a size increase.
849-
* It increases the size by 2x, but only if the size is less than 1024,
850-
* only if the size has been requested to be increased twice in a row.
851-
* @private
852-
*/
853-
#increaseMaxSize() {
854-
if (this.#messageCacheMaxSize === 1024)
855-
return;
856-
this.#increaseCount++;
857-
if (this.#increaseCount <= 1)
858-
return;
859-
this.#messageCacheMaxSize = Math.min(this.#messageCacheMaxSize << 1, 1024);
860-
this.#increaseCount = 0;
861-
}
862-
863-
/**
864-
* Request a size decrease.
865-
* It decreases the size to 80% of the last received size, with a minimum of 1.
866-
* @param {number} recvdSize - the number of messages received in the last poll.
867-
* @private
868-
*/
869-
#decreaseMaxSize(recvdSize) {
870-
this.#messageCacheMaxSize = Math.max(Math.floor((recvdSize * 8) / 10), 1);
871-
this.#increaseCount = 0;
872-
}
873-
874834
/**
875835
* Converts a list of messages returned by node-rdkafka into a message that can be used by the eachBatch callback.
876836
* @param {import("../..").Message[]} messages - must not be empty. Must contain messages from the same topic and partition.
@@ -1001,11 +961,6 @@ class Consumer {
1001961
const res = takeFromCache();
1002962
this.#lastFetchClockNs = hrtime.bigint();
1003963
this.#maxPollIntervalRestart.resolve();
1004-
if (messages.length === this.#messageCacheMaxSize) {
1005-
this.#increaseMaxSize();
1006-
} else {
1007-
this.#decreaseMaxSize(messages.length);
1008-
}
1009964
return res;
1010965
} finally {
1011966
this.#fetchInProgress.resolve();
@@ -1040,7 +995,7 @@ class Consumer {
1040995
}
1041996

1042997
return this.#fetchAndResolveWith(() => this.#messageCache.next(),
1043-
this.#messageCacheMaxSize);
998+
Number.MAX_SAFE_INTEGER);
1044999
}
10451000

10461001
/**
@@ -1071,7 +1026,7 @@ class Consumer {
10711026

10721027
return this.#fetchAndResolveWith(() =>
10731028
this.#messageCache.nextN(null, size),
1074-
this.#messageCacheMaxSize);
1029+
Number.MAX_SAFE_INTEGER);
10751030
}
10761031

10771032
/**
@@ -1560,7 +1515,7 @@ class Consumer {
15601515
this.#concurrency = config.partitionsConsumedConcurrently;
15611516
const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor;
15621517
/* TODO: make this dynamic, based on max batch size / size of last message seen. */
1563-
const maxBatchSize = 32;
1518+
const maxBatchSize = -1;
15641519
const fetcher = config.eachMessage
15651520
? (savedIdx) => this.#consumeSingleCached(savedIdx)
15661521
: (savedIdx) => this.#consumeCachedN(savedIdx, maxBatchSize);

0 commit comments

Comments
 (0)