@@ -155,17 +155,6 @@ class Consumer {
155155 */
156156 #messageCache = null ;
157157
158- /**
159- * The maximum size of the message cache.
160- * Will be adjusted dynamically.
161- */
162- #messageCacheMaxSize = 1 ;
163-
164- /**
165- * Number of times we tried to increase the cache.
166- */
167- #increaseCount = 0 ;
168-
169158 /**
170159 * Whether the user has enabled manual offset management (commits).
171160 */
@@ -182,6 +171,11 @@ class Consumer {
182171 */
183172 #partitionCount = 0 ;
184173
174+ /**
175+ * Maximum batch size passed in eachBatch calls.
176+ */
177+ #maxBatchSize = 32 ;
178+
185179 /**
186180 * Whether worker termination has been scheduled.
187181 */
@@ -311,8 +305,6 @@ class Consumer {
311305 * consumed messages upto N from the internalClient, but the user has stale'd the cache
312306 * after consuming just k (< N) messages. We seek back to last consumed offset + 1. */
313307 this . #messageCache. clear ( ) ;
314- this . #messageCacheMaxSize = 1 ;
315- this . #increaseCount = 0 ;
316308 const clearPartitions = this . assignment ( ) ;
317309 const seeks = [ ] ;
318310 for ( const topicPartition of clearPartitions ) {
@@ -691,6 +683,17 @@ class Consumer {
691683 this . #cacheExpirationTimeoutMs = this . #maxPollIntervalMs;
692684 rdKafkaConfig [ 'max.poll.interval.ms' ] = this . #maxPollIntervalMs * 2 ;
693685
686+ if ( rdKafkaConfig [ 'js.max.batch.size' ] !== undefined ) {
687+ const maxBatchSize = + rdKafkaConfig [ 'js.max.batch.size' ] ;
688+ if ( Number . isInteger ( maxBatchSize ) && ( maxBatchSize === - 1 || maxBatchSize > 0 ) ) {
689+ throw new error . KafkaJSError (
690+ "'js.max.batch.size' must be a positive integer or -1 for unlimited batch size." ,
691+ { code : error . ErrorCodes . ERR__INVALID_ARG } ) ;
692+ }
693+ this . #maxBatchSize = maxBatchSize ;
694+ delete rdKafkaConfig [ 'js.max.batch.size' ] ;
695+ }
696+
694697 return rdKafkaConfig ;
695698 }
696699
@@ -844,33 +847,6 @@ class Consumer {
844847 await this . commitOffsets ( ) ;
845848 }
846849
847- /**
848- * Request a size increase.
849- * It increases the size by 2x, but only if the size is less than 1024,
850- * only if the size has been requested to be increased twice in a row.
851- * @private
852- */
853- #increaseMaxSize( ) {
854- if ( this . #messageCacheMaxSize === 1024 )
855- return ;
856- this . #increaseCount++ ;
857- if ( this . #increaseCount <= 1 )
858- return ;
859- this . #messageCacheMaxSize = Math . min ( this . #messageCacheMaxSize << 1 , 1024 ) ;
860- this . #increaseCount = 0 ;
861- }
862-
863- /**
864- * Request a size decrease.
865- * It decreases the size to 80% of the last received size, with a minimum of 1.
866- * @param {number } recvdSize - the number of messages received in the last poll.
867- * @private
868- */
869- #decreaseMaxSize( recvdSize ) {
870- this . #messageCacheMaxSize = Math . max ( Math . floor ( ( recvdSize * 8 ) / 10 ) , 1 ) ;
871- this . #increaseCount = 0 ;
872- }
873-
874850 /**
875851 * Converts a list of messages returned by node-rdkafka into a message that can be used by the eachBatch callback.
876852 * @param {import("../..").Message[] } messages - must not be empty. Must contain messages from the same topic and partition.
@@ -1001,11 +977,6 @@ class Consumer {
1001977 const res = takeFromCache ( ) ;
1002978 this . #lastFetchClockNs = hrtime . bigint ( ) ;
1003979 this . #maxPollIntervalRestart. resolve ( ) ;
1004- if ( messages . length === this . #messageCacheMaxSize) {
1005- this . #increaseMaxSize( ) ;
1006- } else {
1007- this . #decreaseMaxSize( messages . length ) ;
1008- }
1009980 return res ;
1010981 } finally {
1011982 this . #fetchInProgress. resolve ( ) ;
@@ -1040,7 +1011,7 @@ class Consumer {
10401011 }
10411012
10421013 return this . #fetchAndResolveWith( ( ) => this . #messageCache. next ( ) ,
1043- this . #messageCacheMaxSize ) ;
1014+ Number . MAX_SAFE_INTEGER ) ;
10441015 }
10451016
10461017 /**
@@ -1071,7 +1042,7 @@ class Consumer {
10711042
10721043 return this . #fetchAndResolveWith( ( ) =>
10731044 this . #messageCache. nextN ( null , size ) ,
1074- this . #messageCacheMaxSize ) ;
1045+ Number . MAX_SAFE_INTEGER ) ;
10751046 }
10761047
10771048 /**
@@ -1559,11 +1530,9 @@ class Consumer {
15591530 async #runInternal( config ) {
15601531 this . #concurrency = config . partitionsConsumedConcurrently ;
15611532 const perMessageProcessor = config . eachMessage ? this . #messageProcessor : this . #batchProcessor;
1562- /* TODO: make this dynamic, based on max batch size / size of last message seen. */
1563- const maxBatchSize = 32 ;
15641533 const fetcher = config . eachMessage
15651534 ? ( savedIdx ) => this . #consumeSingleCached( savedIdx )
1566- : ( savedIdx ) => this . #consumeCachedN( savedIdx , maxBatchSize ) ;
1535+ : ( savedIdx ) => this . #consumeCachedN( savedIdx , this . # maxBatchSize) ;
15671536 this . #workers = [ ] ;
15681537
15691538 await this . #lock. write ( async ( ) => {
0 commit comments