@@ -131,22 +131,41 @@ class Consumer {
131131 #userManagedStores = false ;
132132
133133 /**
134- * Populated with Promises for each partition that is being processed concurrently.
135- * Each promise might run eachMessage/eachBatch.
134+ * Signals an intent to disconnect the consumer.
136135 */
137- #runningPromises = [ ] ;
136+ #disconnectStarted = false ;
138137
139138 /**
140- * Each message that is consumed has an associated cache index.
141- * This array maps a an index within runningPromises to the associated cached index.
142- * ie. runningPromises[i] is associated with the cache index #savedIndexToPromiseIndex[i].
139+ * Number of partitions owned by the consumer.
140+ * @note This value may or may not be completely accurate, it's more so a hint for spawning concurrent workers.
143141 */
144- #savedIndexToPromiseIndex = [ ] ;
142+ #partitionCount = 0 ;
145143
146144 /**
147- * Signals an intent to disconnect the consumer .
145+ * Whether worker termination has been scheduled .
148146 */
149- #disconnectStarted = false ;
147+ #workerTerminationScheduled = false ;
148+
149+ /**
150+ * The worker functions currently running in the consumer.
151+ */
152+ #workers = [ ] ;
153+
154+ /**
155+ * The number of partitions to consume concurrently as set by the user, or 1.
156+ */
157+ #concurrency = 1 ;
158+
159+ /**
160+ * Whether any call to the internalClient's consume() method is in progress.
161+ */
162+ #fetchInProgress = false ;
163+
164+ /**
165+ * TODO: remove this or make it a bit more reliable.
166+ * This is a debug property for this branch.
167+ */
168+ clientId = null ;
150169
151170 /**
152171 * @constructor
@@ -217,7 +236,6 @@ class Consumer {
217236 * @param {import("../../types").TopicPartition[] } assignment
218237 */
219238 #rebalanceCallback( err , assignment ) {
220- // Create the librdkafka error
221239 err = LibrdKafkaError . create ( err ) ;
222240 const userSpecifiedRebalanceCb = this . #userConfig[ 'rebalance_cb' ] ;
223241
@@ -276,6 +294,8 @@ class Consumer {
276294 * and marked the cache stale. This means that the cache is always expired when a rebalance
277295 * is triggered.
278296 * This is applicable both for incremental and non-incremental rebalances.
297+ * Multiple consume()s cannot be called together, too, because we make sure that only
298+ * one worker is calling into the internal consumer at a time.
279299 */
280300
281301 try {
@@ -285,10 +305,13 @@ class Consumer {
285305 if ( checkPendingSeeks && ! assignmentModified )
286306 assignment = this . #assignAsPerSeekedOffsets( assignment ) ;
287307
288- if ( this . #internalClient. rebalanceProtocol ( ) === "EAGER" )
308+ if ( this . #internalClient. rebalanceProtocol ( ) === "EAGER" ) {
289309 this . #internalClient. assign ( assignment ) ;
290- else
310+ this . #partitionCount = assignment . length ;
311+ } else {
291312 this . #internalClient. incrementalAssign ( assignment ) ;
313+ this . #partitionCount += assignment . length ;
314+ }
292315
293316 if ( checkPendingSeeks ) {
294317 const offsetsToCommit = assignment
@@ -313,9 +336,11 @@ class Consumer {
313336 if ( this . #internalClient. rebalanceProtocol ( ) === "EAGER" ) {
314337 this . #internalClient. unassign ( ) ;
315338 this . #messageCache. removeTopicPartitions ( ) ;
339+ this . #partitionCount = 0 ;
316340 } else {
317341 this . #internalClient. incrementalUnassign ( assignment ) ;
318342 this . #messageCache. removeTopicPartitions ( assignment ) ;
343+ this . #partitionCount -= assignment . length ;
319344 }
320345 }
321346 } catch ( e ) {
@@ -324,6 +349,18 @@ class Consumer {
324349 this . #internalClient. emit ( 'rebalance.error' , e ) ;
325350 }
326351 }
352+
353+ /**
354+ * Schedule worker termination here, in case the number of workers is not equal to the target concurrency.
355+ * We need to do this so we will respawn workers with the correct concurrency count.
356+ */
357+ const workersToSpawn = Math . max ( 1 , Math . min ( this . #concurrency, this . #partitionCount) ) ;
358+ if ( workersToSpawn !== this . #workers. length ) {
359+ this . #workerTerminationScheduled = true ;
360+ /* We don't need to await the workers here. We are OK if the termination and respawning
361+ * occurs later, since even if we have a few more or few less workers for a while, it's
362+ * not a big deal. */
363+ }
327364 } ) ;
328365 }
329366
@@ -338,6 +375,8 @@ class Consumer {
338375 { code : error . ErrorCodes . ERR__INVALID_ARG } ) ;
339376 }
340377 const rdKafkaConfig = kafkaJSToRdKafkaConfig ( kjsConfig ) ;
378+ this . clientId = rdKafkaConfig [ 'client.id' ] ;
379+ this . #logger = new DefaultLogger ( ) ;
341380
342381 /* Consumer specific configuration */
343382 if ( Object . hasOwn ( kjsConfig , 'groupId' ) ) {
@@ -663,8 +702,14 @@ class Consumer {
663702 return null ;
664703 }
665704
705+ if ( this . #fetchInProgress) {
706+ return null ;
707+ }
708+
709+ this . #fetchInProgress = true ;
666710 return new Promise ( ( resolve , reject ) => {
667711 this . #internalClient. consume ( this . #messageCache. maxSize , ( err , messages ) => {
712+ this . #fetchInProgress = false ;
668713 if ( err ) {
669714 reject ( createKafkaJsErrorFromLibRdKafkaError ( err ) ) ;
670715 return ;
@@ -846,14 +891,10 @@ class Consumer {
846891
847892 const rdKafkaConfig = this . #config( ) ;
848893 const maxPollInterval = rdKafkaConfig [ 'max.poll.interval.ms' ] ?? 300000 ;
849- this . #messageCache = new MessageCache ( Math . floor ( maxPollInterval * 0.8 ) , config . partitionsConsumedConcurrently ) ;
894+ this . #messageCache = new MessageCache ( Math . floor ( maxPollInterval * 0.8 ) , config . partitionsConsumedConcurrently , this . #logger ) ;
850895
851- /* We deliberately don't await this. */
852- if ( config . eachMessage ) {
853- this . #runInternalEachMessage( config ) ;
854- } else {
855- this . #runInternalEachBatch( config ) ;
856- }
896+ /* We deliberately don't await this because we want to return from this method immediately. */
897+ this . #runInternal( config ) ;
857898 }
858899
859900 /**
@@ -960,8 +1001,7 @@ class Consumer {
9601001 /* The value of eachBatchAutoResolve is not important. The only place where a message is marked processed
9611002 * despite an error is if the user says so, and the user can use resolveOffsets for both the possible
9621003 * values eachBatchAutoResolve can take. */
963- if ( config . eachBatch )
964- eachMessageProcessed = payload . _messageResolved
1004+ eachMessageProcessed = payload . _messageResolved ;
9651005 }
9661006
9671007 /* If the message is unprocessed, due to an error, or because the user has not resolved it, we seek back. */
@@ -999,68 +1039,25 @@ class Consumer {
9991039 }
10001040
10011041 /**
1002- * Awaits the completion of a single message's processing .
1042+ * Starts a worker to fetch messages/batches from the internal consumer and process them .
10031043 *
1004- * @returns {Promise<number> } the cache index of the message in the cache that was processed.
1005- */
1006- async waitOne ( ) {
1007- const savedIndex = await Promise . any ( this . #runningPromises) ;
1008- const promiseIndex = this . #savedIndexToPromiseIndex. findIndex ( p => p === savedIndex ) ;
1009- if ( promiseIndex === - 1 ) {
1010- console . error ( "Promise not found in runningPromises" ) ;
1011- throw new Error ( "Promise not found in runningPromises" ) ;
1012- }
1013- this . #runningPromises[ promiseIndex ] = this . #runningPromises[ this . #runningPromises. length - 1 ] ;
1014- this . #savedIndexToPromiseIndex[ promiseIndex ] = this . #savedIndexToPromiseIndex[ this . #savedIndexToPromiseIndex. length - 1 ] ;
1015- this . #runningPromises. pop ( ) ;
1016- this . #savedIndexToPromiseIndex. pop ( ) ;
1017-
1018- return savedIndex ;
1019- }
1020-
1021- /**
1022- * Awaits the completion of all messages that are being processed.
1044+ * A worker runs until it's told to stop.
1045+ * Conditions where the worker is told to stop:
1046+ * 1. Cache globally stale
1047+ * 2. Disconnected initiated
1048+ * 3. Rebalance
1049+ * 4. Some other worker has started terminating.
10231050 *
1024- * @returns { Promise<number[]> } a list of cache indices of the messages that were processed .
1051+ * Worker termination acts as a async barrier .
10251052 */
1026- async waitAll ( ) {
1027- const indices = await Promise . all ( this . #runningPromises) ;
1028- this . #runningPromises = [ ] ;
1029- this . #savedIndexToPromiseIndex = [ ] ;
1030- return indices ;
1031- }
1032-
1033- /* Internal polling loop.
1034- * It accepts the same config object that `run` accepts, but config.eachMessage must be set. */
1035- async #runInternalEachMessage( config ) {
1036- const concurrency = config . partitionsConsumedConcurrently ;
1053+ async #worker( config , perMessageProcessor , id ) {
10371054 let nextIdx = - 1 ;
1038- while ( ! ( await acquireOrLog ( this . #lock, this . #logger) ) ) ;
1039-
1040- while ( this . #state === ConsumerState . CONNECTED ) {
1041- /* Release lock and cleanup if we intend to disconnect. */
1042- if ( this . #disconnectStarted) {
1043- const indices = await this . waitAll ( ) ;
1044- indices . forEach ( idx => this . #messageCache. return ( idx ) ) ;
1045- if ( nextIdx !== - 1 ) {
1046- this . #messageCache. return ( nextIdx ) ;
1047- }
1048- nextIdx = - 1 ;
1049- this . #lock. release ( ) ;
1050- break ;
1051- }
1052-
1055+ while ( ! this . #workerTerminationScheduled) {
10531056 /* Invalidate the message cache if needed */
10541057 const locallyStale = this . #messageCache. popLocallyStale ( ) ;
10551058 if ( this . #messageCache. isStale ( ) ) { /* global staleness */
1056- const indices = await this . waitAll ( ) ;
1057- indices . forEach ( idx => this . #messageCache. return ( idx ) ) ;
1058- if ( nextIdx !== - 1 ) {
1059- this . #messageCache. return ( nextIdx ) ;
1060- }
1061- nextIdx = - 1 ;
1062- await this . #clearCacheAndResetPositions( ) ;
1063- continue ;
1059+ this . #workerTerminationScheduled = true ;
1060+ break ;
10641061 } else if ( locallyStale . length !== 0 ) { /* local staleness */
10651062 // TODO: is it correct to await some concurrent promises for eachMessage here?
10661063 // to be safe we can do it, but I don't think we really need to do that for
@@ -1079,99 +1076,44 @@ class Consumer {
10791076 nextIdx = - 1 ;
10801077
10811078 if ( ! m ) {
1082- // await any concurrency related promises right here if this is null, if any such promise exists.
1083- // see note in consumeSingleCached
1084- if ( this . #runningPromises. length ) {
1085- nextIdx = await this . waitOne ( ) ;
1086- }
1087- continue ;
1088- }
1089-
1090- const p = this . #messageProcessor( m , config ) ;
1091- this . #runningPromises. push ( p ) ;
1092- this . #savedIndexToPromiseIndex. push ( m . index ) ;
1093-
1094- if ( this . #runningPromises. length < concurrency ) {
1079+ /* Backoff a little. If m is null, we might be fetching from the internal consumer (fetch in progress),
1080+ * and calling consumeSingleCached in a tight loop will help no one. */
1081+ await new Promise ( ( resolve ) => setTimeout ( resolve , 1 ) ) ;
10951082 continue ;
10961083 }
10971084
1098- nextIdx = await this . waitOne ( ) ;
1085+ nextIdx = await perMessageProcessor ( m , config ) ;
1086+ }
10991087
1100- /* TODO: another check we need to do here is to see how kafkaJS is handling
1101- * commits. Are they commmitting after a message is _processed_?
1102- * In that case we need to turn off librdkafka's auto-commit, and commit
1103- * inside this function.
1104- */
1088+ if ( nextIdx !== - 1 ) {
1089+ this . #messageCache. return ( nextIdx ) ;
11051090 }
11061091 }
11071092
1108- /* Internal polling loop.
1109- * It accepts the same config object that `run` accepts, but config.eachBatch must be set. */
1110- async #runInternalEachBatch( config ) {
1111- const concurrency = config . partitionsConsumedConcurrently ;
1112- let nextIdx = - 1 ;
1093+ /**
1094+ * Internal polling loop.
1095+ * Spawns and awaits workers until disconnect is initiated.
1096+ */
1097+ async #runInternal( config ) {
1098+ this . #concurrency = config . partitionsConsumedConcurrently ;
1099+ const perMessageProcessor = config . eachMessage ? this . #messageProcessor : this . #batchProcessor;
1100+ this . #workers = [ ] ;
11131101 while ( ! ( await acquireOrLog ( this . #lock, this . #logger) ) ) ;
11141102
1115- while ( this . #state === ConsumerState . CONNECTED ) {
1116- /* Release lock and cleanup if we intend to disconnect. */
1117- if ( this . #disconnectStarted) {
1118- const indices = await this . waitAll ( ) ;
1119- indices . forEach ( idx => this . #messageCache. return ( idx ) ) ;
1120- if ( nextIdx !== - 1 ) {
1121- this . #messageCache. return ( nextIdx ) ;
1122- }
1123- nextIdx = - 1 ;
1124- this . #lock. release ( ) ;
1125- break ;
1126- }
1103+ while ( ! this . #disconnectStarted) {
1104+ this . #workerTerminationScheduled = false ;
1105+ const workersToSpawn = Math . max ( 1 , Math . min ( this . #concurrency, this . #partitionCount) ) ;
1106+ this . #workers = Array ( workersToSpawn ) . fill ( ) . map ( ( _ , i ) => this . #worker( config , perMessageProcessor . bind ( this ) , i ) ) ;
1107+ await Promise . all ( this . #workers) ;
11271108
1128- /* Invalidate the message cache if needed */
1129- const locallyStale = this . #messageCache. popLocallyStale ( ) ;
1130- if ( this . #messageCache. isStale ( ) ) { /* global staleness */
1131- const indices = await this . waitAll ( ) ;
1132- indices . forEach ( idx => this . #messageCache. return ( idx ) ) ;
1133- if ( nextIdx !== - 1 ) {
1134- this . #messageCache. return ( nextIdx ) ;
1135- }
1136- nextIdx = - 1 ;
1109+ /* One of the possible reasons for the workers to end is that the cache is globally stale.
1110+ * We need to take care of expiring it. */
1111+ if ( this . #messageCache. isStale ( ) ) {
11371112 await this . #clearCacheAndResetPositions( ) ;
1138- continue ;
1139- } else if ( locallyStale . length !== 0 ) { /* local staleness */
1140- // TODO: is it correct to await some concurrent promises for eachMessage here?
1141- // to be safe we can do it, but I don't think we really need to do that for
1142- // any correctness reason.
1143- await this . #clearCacheAndResetPositions( locallyStale ) ;
1144- continue ;
11451113 }
1146-
1147- const m = await this . #consumeSingleCached( nextIdx ) . catch ( e => {
1148- /* Since this error cannot be exposed to the user in the current situation, just log and retry.
1149- * This is due to restartOnFailure being set to always true. */
1150- if ( this . #logger)
1151- this . #logger. error ( `Consumer encountered error while consuming. Retrying. Error details: ${ JSON . stringify ( e ) } ` ) ;
1152- } ) ;
1153-
1154- nextIdx = - 1 ;
1155-
1156- if ( ! m ) {
1157- // await any concurrency related promises right here if this is null, if any such promise exists.
1158- // see note in consumeSingleCached
1159- if ( this . #runningPromises. length ) {
1160- nextIdx = await this . waitOne ( ) ;
1161- }
1162- continue ;
1163- }
1164-
1165- const p = this . #batchProcessor( m , config ) ;
1166- this . #runningPromises. push ( p ) ;
1167- this . #savedIndexToPromiseIndex. push ( m . index ) ;
1168-
1169- if ( this . #runningPromises. length < concurrency ) {
1170- continue ;
1171- }
1172-
1173- nextIdx = await this . waitOne ( ) ;
11741114 }
1115+
1116+ this . #lock. release ( ) ;
11751117 }
11761118
11771119 /**
@@ -1582,6 +1524,7 @@ class Consumer {
15821524 }
15831525
15841526 this . #disconnectStarted = true ;
1527+ this . #workerTerminationScheduled = true ;
15851528 while ( ! ( await acquireOrLog ( this . #lock, this . #logger) ) ) ; /* Just retry... */
15861529
15871530 this . #state = ConsumerState . DISCONNECTING ;
0 commit comments