@@ -130,6 +130,24 @@ class Consumer {
130130 */
131131 #userManagedStores = false ;
132132
133+ /**
134+ * Populated with Promises for each partition that is being processed concurrently.
135+ * Each promise might run eachMessage/eachBatch.
136+ */
137+ #runningPromises = [ ] ;
138+
139+ /**
140+ * Each message that is consumed has an associated cache index.
141+ * This array maps a an index within runningPromises to the associated cached index.
142+ * ie. runningPromises[i] is associated with the cache index #savedIndexToPromiseIndex[i].
143+ */
144+ #savedIndexToPromiseIndex = [ ] ;
145+
146+ /**
147+ * Signals an intent to disconnect the consumer.
148+ */
149+ #disconnectStarted = false ;
150+
133151 /**
134152 * @constructor
135153 * @param {import("../../types/kafkajs").ConsumerConfig } kJSConfig
@@ -637,16 +655,17 @@ class Consumer {
637655 return msg ;
638656 }
639657
640- // TODO: Add this block for concurrency
641- // if (!msg) {
642- // // it's possible that we get msg = null, but that's because partitionConcurrency
643- // // exceeds the number of partitions containing messages. So in this case,
644- // // we should not call for new fetches, rather, try to focus on what we have left.
645- // return null;
646- // }
658+ /* It's possible that we get msg = null, but that's because partitionConcurrency
659+ * exceeds the number of partitions containing messages. So in this case,
660+ * we should not call for new fetches, rather, try to focus on what we have left.
661+ */
662+ if ( ! msg && this . #messageCache . pendingSize ( ) !== 0 ) {
663+ return null ;
664+ }
647665
648666 return new Promise ( ( resolve , reject ) => {
649667 this . #internalClient. consume ( this . #messageCache. maxSize , ( err , messages ) => {
668+
650669 if ( err ) {
651670 reject ( createKafkaJsErrorFromLibRdKafkaError ( err ) ) ;
652671 return ;
@@ -721,9 +740,6 @@ class Consumer {
721740 }
722741
723742 const rdKafkaConfig = this . #config( ) ;
724- const maxPollInterval = rdKafkaConfig [ 'max.poll.interval.ms' ] ?? 300000 ;
725- this . #messageCache = new MessageCache ( Math . floor ( maxPollInterval * 0.8 ) , 1 ) ;
726-
727743 this . #state = ConsumerState . CONNECTING ;
728744 this . #internalClient = new RdKafka . KafkaConsumer ( rdKafkaConfig ) ;
729745 this . #internalClient. on ( 'ready' , this . #readyCb. bind ( this ) ) ;
@@ -815,10 +831,6 @@ class Consumer {
815831 throw new error . KafkaJSError ( CompatibilityErrorMessages . runOptionsAutoCommitThreshold ( ) , { code : error . ErrorCodes . ERR__NOT_IMPLEMENTED } ) ;
816832 }
817833
818- if ( Object . hasOwn ( config , 'partitionsConsumedConcurrently' ) ) {
819- throw new error . KafkaJSError ( CompatibilityErrorMessages . runOptionsPartitionsConsumedConcurrently ( ) , { code : error . ErrorCodes . ERR__NOT_IMPLEMENTED } ) ;
820- }
821-
822834 if ( this . #running) {
823835 throw new error . KafkaJSError ( 'Consumer is already running.' , { code : error . ErrorCodes . ERR__STATE } ) ;
824836 }
@@ -829,6 +841,14 @@ class Consumer {
829841 config . eachBatchAutoResolve = true ;
830842 }
831843
844+ if ( ! Object . hasOwn ( config , 'partitionsConsumedConcurrently' ) ) {
845+ config . partitionsConsumedConcurrently = 1 ;
846+ }
847+
848+ const rdKafkaConfig = this . #config( ) ;
849+ const maxPollInterval = rdKafkaConfig [ 'max.poll.interval.ms' ] ?? 300000 ;
850+ this . #messageCache = new MessageCache ( Math . floor ( maxPollInterval * 0.8 ) , config . partitionsConsumedConcurrently ) ;
851+
832852 /* We deliberately don't await this. */
833853 if ( config . eachMessage ) {
834854 this . #runInternalEachMessage( config ) ;
@@ -837,126 +857,173 @@ class Consumer {
837857 }
838858 }
839859
860+ /**
861+ * Processes a single message.
862+ *
863+ * @param m Message as obtained from #consumeSingleCached.
864+ * @param config Config as passed to run().
865+ * @returns {Promise<number> } the cache index of the message that was processed.
866+ */
867+ async #messageProcessor( m , config ) {
868+ let eachMessageProcessed = false ;
869+ const payload = this . #createPayload( m ) ;
870+
871+ try {
872+ await config . eachMessage ( payload ) ;
873+ eachMessageProcessed = true ;
874+ } catch ( e ) {
875+ /* It's not only possible, but expected that an error will be thrown by eachMessage.
876+ * This is especially true since the pattern of pause() followed by throwing an error
877+ * is encouraged. To meet the API contract, we seek one offset backward (which
878+ * means seeking to the message offset).
879+ * However, we don't do this inside the catch, but just outside it. This is because throwing an
880+ * error is not the only case where we might want to seek back.
881+ *
882+ * So - do nothing but a debug log, but at this point eachMessageProcessed is false.
883+ */
884+ this . #logger. debug ( `Consumer encountered error while processing message. Error details: ${ e } : ${ e . stack } . The same message may be reprocessed.` ) ;
885+ }
886+
887+ /* If the message is unprocessed, due to an error, or because the user has not resolved it, we seek back. */
888+ if ( ! eachMessageProcessed ) {
889+ await this . seek ( {
890+ topic : m . topic ,
891+ partition : m . partition ,
892+ offset : m . offset ,
893+ } ) ;
894+ }
895+
896+ /* Store the offsets we need to store, or at least record them for cache invalidation reasons. */
897+ if ( eachMessageProcessed ) {
898+ try {
899+ if ( ! this . #userManagedStores) {
900+ this . #internalClient. offsetsStore ( [ {
901+ topic : m . topic , partition : m . partition , offset : Number ( m . offset ) + 1 , leaderEpoch : m . leaderEpoch
902+ } ] ) ;
903+ }
904+ this . #lastConsumedOffsets. set ( partitionKey ( m ) , Number ( m . offset ) + 1 ) ;
905+ } catch ( e ) {
906+ /* Not much we can do, except log the error. */
907+ if ( this . #logger)
908+ this . #logger. error ( `Consumer encountered error while storing offset. Error details: ${ JSON . stringify ( e ) } ` ) ;
909+ }
910+ }
911+
912+
913+ /* Force a immediate seek here. It's possible that there are no more messages to be passed to the user,
914+ * but the user seeked in the call to eachMessage, or else we encountered the error catch block.
915+ * In that case, the results of that seek will never be reflected unless we do this.
916+ * TOOD: this block can probably be common and not per message. */
917+ if ( this . #checkPendingSeeks)
918+ await this . #seekInternal( ) ;
919+
920+ return m . index ;
921+ }
922+
923+ /**
924+ * Awaits the completion of a single message's processing.
925+ *
926+ * @returns {Promise<number> } the cache index of the message in the cache that was processed.
927+ */
928+ async waitOne ( ) {
929+ const savedIndex = await Promise . any ( this . #runningPromises) ;
930+ const promiseIndex = this . #savedIndexToPromiseIndex. findIndex ( p => p === savedIndex ) ;
931+ if ( promiseIndex === - 1 ) {
932+ console . error ( "Promise not found in runningPromises" ) ;
933+ throw new Error ( "Promise not found in runningPromises" ) ;
934+ }
935+ this . #runningPromises[ promiseIndex ] = this . #runningPromises[ this . #runningPromises. length - 1 ] ;
936+ this . #savedIndexToPromiseIndex[ promiseIndex ] = this . #savedIndexToPromiseIndex[ this . #savedIndexToPromiseIndex. length - 1 ] ;
937+ this . #runningPromises. pop ( ) ;
938+ this . #savedIndexToPromiseIndex. pop ( ) ;
939+
940+ return savedIndex ;
941+ }
942+
943+ /**
944+ * Awaits the completion of all messages that are being processed.
945+ *
946+ * @returns {Promise<number[]> } a list of cache indices of the messages that were processed.
947+ */
948+ async waitAll ( ) {
949+ const indices = await Promise . all ( this . #runningPromises) ;
950+ this . #runningPromises = [ ] ;
951+ this . #savedIndexToPromiseIndex = [ ] ;
952+ return indices ;
953+ }
954+
840955 /* Internal polling loop.
841956 * It accepts the same config object that `run` accepts, but config.eachMessage must be set. */
842957 async #runInternalEachMessage( config ) {
843- let savedIdx = - 1 ;
844- while ( this . #state === ConsumerState . CONNECTED ) {
958+ const concurrency = config . partitionsConsumedConcurrently ;
959+ let nextIdx = - 1 ;
960+ while ( ! ( await acquireOrLog ( this . #lock, this . #logger) ) ) ;
845961
846- /* We need to acquire a lock here, because we need to ensure that we don't
847- * disconnect while in the middle of processing a message. */
848- if ( ! ( await acquireOrLog ( this . #lock, this . #logger) ) )
849- continue ;
962+ while ( this . #state === ConsumerState . CONNECTED ) {
963+ /* Release lock and cleanup if we intend to disconnect. */
964+ if ( this . #disconnectStarted) {
965+ const indices = await this . waitAll ( ) ;
966+ indices . forEach ( idx => this . #messageCache. return ( idx ) ) ;
967+ if ( nextIdx !== - 1 ) {
968+ this . #messageCache. return ( nextIdx ) ;
969+ }
970+ nextIdx = - 1 ;
971+ this . #lock. release ( ) ;
972+ break ;
973+ }
850974
851975 /* Invalidate the message cache if needed */
852976 const locallyStale = this . #messageCache. popLocallyStale ( ) ;
853977 if ( this . #messageCache. isStale ( ) ) { /* global staleness */
854- // TODO: await all concurrent promises for eachMessage here.
978+ const indices = await this . waitAll ( ) ;
979+ indices . forEach ( idx => this . #messageCache. return ( idx ) ) ;
980+ if ( nextIdx !== - 1 ) {
981+ this . #messageCache. return ( nextIdx ) ;
982+ }
983+ nextIdx = - 1 ;
855984 await this . #clearCacheAndResetPositions( ) ;
856- await this . #lock. release ( ) ;
857985 continue ;
858986 } else if ( locallyStale . length !== 0 ) { /* local staleness */
859987 // TODO: is it correct to await some concurrent promises for eachMessage here?
860988 // to be safe we can do it, but I don't think we really need to do that for
861- // correctness.
989+ // any correctness reason .
862990 await this . #clearCacheAndResetPositions( locallyStale ) ;
863- await this . #lock. release ( ) ;
864991 continue ;
865992 }
866993
867- const m = await this . #consumeSingleCached( savedIdx ) . catch ( e => {
994+ const m = await this . #consumeSingleCached( nextIdx ) . catch ( e => {
868995 /* Since this error cannot be exposed to the user in the current situation, just log and retry.
869996 * This is due to restartOnFailure being set to always true. */
870997 if ( this . #logger)
871998 this . #logger. error ( `Consumer encountered error while consuming. Retrying. Error details: ${ e } : ${ e . stack } ` ) ;
872999 } ) ;
8731000
1001+ nextIdx = - 1 ;
1002+
8741003 if ( ! m ) {
875- // await all concurrency related promises right here if this is null, if any such promise exists.
1004+ // await any concurrency related promises right here if this is null, if any such promise exists.
8761005 // see note in consumeSingleCached
877- savedIdx = - 1 ;
878- await this . #lock. release ( ) ;
879- continue ;
880- }
881- savedIdx = m . index ;
882-
883- /* TODO: add partitionsConsumedConcurrently-based concurrency here.
884- * If we maintain a map of topic partitions to promises, and a counter,
885- * we can probably achieve it with the correct guarantees of ordering
886- * though to maximize performance, we need to consume only from partitions for which
887- * an eachMessage call is not already going.
888- * It's risky to consume, and then store the message in something like an
889- * array/list until it can be processed, because librdkafka marks it as
890- * 'stored'... but anyway - we can implement something like this.
891- */
892-
893- /* Make pending seeks 'concrete'. */
894- if ( this . #checkPendingSeeks) {
895- const invalidateMessage = await this . #seekInternal( { topic : m . topic , partition : m . partition } ) ;
896- if ( invalidateMessage ) {
897- /* Don't pass this message on to the user if this topic partition was seeked to. */
898- this . #lock. release ( ) ;
899- continue ;
1006+ if ( this . #runningPromises. length ) {
1007+ nextIdx = await this . waitOne ( ) ;
9001008 }
1009+ continue ;
9011010 }
9021011
903- let eachMessageProcessed = false ;
904- const payload = this . #createPayload( m ) ;
905- try {
906- await config . eachMessage ( payload ) ;
907- eachMessageProcessed = true ;
908- } catch ( e ) {
909- /* It's not only possible, but expected that an error will be thrown by eachMessage.
910- * This is especially true since the pattern of pause() followed by throwing an error
911- * is encouraged. To meet the API contract, we seek one offset backward (which
912- * means seeking to the message offset).
913- * However, we don't do this inside the catch, but just outside it. This is because throwing an
914- * error is not the only case where we might want to seek back.
915- *
916- * So - do nothing but a debug log, but at this point eachMessageProcessed is false.
917- */
918- this . #logger. debug ( `Consumer encountered error while processing message. Error details: ${ e } : ${ e . stack } . The same message may be reprocessed.` ) ;
919- }
920-
921- /* If the message is unprocessed, due to an error, or because the user has not resolved it, we seek back. */
922- if ( ! eachMessageProcessed ) {
923- await this . seek ( {
924- topic : m . topic ,
925- partition : m . partition ,
926- offset : m . offset ,
927- } ) ;
928- }
1012+ const p = this . #messageProcessor( m , config ) ;
1013+ this . #runningPromises. push ( p ) ;
1014+ this . #savedIndexToPromiseIndex. push ( m . index ) ;
9291015
930- /* Store the offsets we need to store, or at least record them for cache invalidation reasons. */
931- if ( eachMessageProcessed ) {
932- try {
933- if ( ! this . #userManagedStores) {
934- this . #internalClient. offsetsStore ( [ {
935- topic : m . topic , partition : m . partition , offset : Number ( m . offset ) + 1 , leaderEpoch : m . leaderEpoch
936- } ] ) ;
937- }
938- this . #lastConsumedOffsets. set ( partitionKey ( m ) , Number ( m . offset ) + 1 ) ;
939- } catch ( e ) {
940- /* Not much we can do, except log the error. */
941- if ( this . #logger)
942- this . #logger. error ( `Consumer encountered error while storing offset. Error details: ${ JSON . stringify ( e ) } ` ) ;
943- }
1016+ if ( this . #runningPromises. length < concurrency ) {
1017+ continue ;
9441018 }
9451019
946- /* Force a immediate seek here. It's possible that there are no more messages to be passed to the user,
947- * but the user seeked in the call to eachMessage, or else we encountered the error catch block.
948- * In that case, the results of that seek will never be reflected unless we do this. */
949- if ( this . #checkPendingSeeks)
950- await this . #seekInternal( ) ;
1020+ nextIdx = await this . waitOne ( ) ;
9511021
9521022 /* TODO: another check we need to do here is to see how kafkaJS is handling
9531023 * commits. Are they commmitting after a message is _processed_?
9541024 * In that case we need to turn off librdkafka's auto-commit, and commit
9551025 * inside this function.
9561026 */
957-
958- /* Release the lock so that any pending disconnect can go through. */
959- await this . #lock. release ( ) ;
9601027 }
9611028 }
9621029
@@ -1497,6 +1564,7 @@ class Consumer {
14971564 return ;
14981565 }
14991566
1567+ this . #disconnectStarted = true ;
15001568 while ( ! ( await acquireOrLog ( this . #lock, this . #logger) ) ) ; /* Just retry... */
15011569
15021570 this . #state = ConsumerState . DISCONNECTING ;
0 commit comments