@@ -21,6 +21,7 @@ const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 256;
2121const batchSize = process . env . BATCH_SIZE ? + process . env . BATCH_SIZE : 100 ;
2222const compression = process . env . COMPRESSION || 'None' ;
2323const numPartitions = process . env . PARTITIONS ? + process . env . PARTITIONS : 3 ;
24+ const partitionsConsumedConcurrently = process . env . PARTITIONS_CONSUMED_CONCURRENTLY ? + process . env . PARTITIONS_CONSUMED_CONCURRENTLY : 1 ;
2425const warmupMessages = process . env . WARMUP_MESSAGES ? + process . env . WARMUP_MESSAGES : ( batchSize * 10 ) ;
2526const messageProcessTimeMs = process . env . MESSAGE_PROCESS_TIME_MS ? + process . env . MESSAGE_PROCESS_TIME_MS : 5 ;
2627const ctpConcurrency = process . env . CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? + process . env . CONSUME_TRANSFORM_PRODUCE_CONCURRENCY : 1 ;
@@ -121,7 +122,7 @@ function logParameters(parameters) {
121122 console . log ( ` Topic: ${ topic } ` ) ;
122123 console . log ( ` Message Count: ${ messageCount } ` ) ;
123124 startTrackingMemory ( ) ;
124- const consumerRate = await runConsumer ( parameters , topic , warmupMessages , messageCount , false , stats ) ;
125+ const consumerRate = await runConsumer ( parameters , topic , warmupMessages , messageCount , false , partitionsConsumedConcurrently , stats ) ;
125126 endTrackingMemory ( `consumer-memory-message-${ mode } .json` ) ;
126127 console . log ( "=== Consumer Rate (eachMessage): " , consumerRate ) ;
127128 console . log ( "=== Consumption time (eachMessage): " , stats . durationSeconds ) ;
@@ -134,7 +135,7 @@ function logParameters(parameters) {
134135 console . log ( ` Topic: ${ topic } ` ) ;
135136 console . log ( ` Message Count: ${ messageCount } ` ) ;
136137 startTrackingMemory ( ) ;
137- const consumerRate = await runConsumer ( parameters , topic , warmupMessages , messageCount , true , stats ) ;
138+ const consumerRate = await runConsumer ( parameters , topic , warmupMessages , messageCount , true , partitionsConsumedConcurrently , stats ) ;
138139 endTrackingMemory ( `consumer-memory-batch-${ mode } .json` ) ;
139140 console . log ( "=== Consumer Rate (eachBatch): " , consumerRate ) ;
140141 console . log ( "=== Average eachBatch lag: " , stats . averageOffsetLag ) ;
0 commit comments