@@ -20,6 +20,8 @@ const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 10
2020const messageSize = process . env . MESSAGE_SIZE ? + process . env . MESSAGE_SIZE : 256 ;
2121const batchSize = process . env . BATCH_SIZE ? + process . env . BATCH_SIZE : 100 ;
2222const compression = process . env . COMPRESSION || 'None' ;
23+ // Between 0 and 1, percentage of random bytes in each message
24+ const randomness = process . env . RANDOMNESS ? + process . env . RANDOMNESS : 0.5 ;
2325const numPartitions = process . env . PARTITIONS ? + process . env . PARTITIONS : 3 ;
2426const partitionsConsumedConcurrently = process . env . PARTITIONS_CONSUMED_CONCURRENTLY ? + process . env . PARTITIONS_CONSUMED_CONCURRENTLY : 1 ;
2527const warmupMessages = process . env . WARMUP_MESSAGES ? + process . env . WARMUP_MESSAGES : ( batchSize * 10 ) ;
@@ -110,7 +112,8 @@ function logParameters(parameters) {
110112 console . log ( ` Compression: ${ compression } ` ) ;
111113 console . log ( ` Warmup Messages: ${ warmupMessages } ` ) ;
112114 startTrackingMemory ( ) ;
113- const producerRate = await runProducer ( parameters , topic , batchSize , warmupMessages , messageCount , messageSize , compression ) ;
115+ const producerRate = await runProducer ( parameters , topic , batchSize ,
116+ warmupMessages , messageCount , messageSize , compression , randomness ) ;
114117 endTrackingMemory ( `producer-memory-${ mode } .json` ) ;
115118 console . log ( "=== Producer Rate: " , producerRate ) ;
116119 }
@@ -121,10 +124,14 @@ function logParameters(parameters) {
121124 logParameters ( parameters ) ;
122125 console . log ( ` Topic: ${ topic } ` ) ;
123126 console . log ( ` Message Count: ${ messageCount } ` ) ;
127+ console . log ( ` Partitions consumed concurrently: ${ partitionsConsumedConcurrently } ` ) ;
124128 startTrackingMemory ( ) ;
125- const consumerRate = await runConsumer ( parameters , topic , warmupMessages , messageCount , false , partitionsConsumedConcurrently , stats ) ;
129+ const consumerRate = await runConsumer ( parameters , topic ,
130+ warmupMessages , messageCount ,
131+ false , partitionsConsumedConcurrently , stats ) ;
126132 endTrackingMemory ( `consumer-memory-message-${ mode } .json` ) ;
127- console . log ( "=== Consumer Rate (eachMessage): " , consumerRate ) ;
133+ console . log ( "=== Consumer Rate MB/s (eachMessage): " , consumerRate ) ;
134+ console . log ( "=== Consumer Rate msg/s (eachMessage): " , stats . messageRate ) ;
128135 console . log ( "=== Consumption time (eachMessage): " , stats . durationSeconds ) ;
129136 }
130137
@@ -134,12 +141,17 @@ function logParameters(parameters) {
134141 logParameters ( parameters ) ;
135142 console . log ( ` Topic: ${ topic } ` ) ;
136143 console . log ( ` Message Count: ${ messageCount } ` ) ;
144+ console . log ( ` Partitions consumed concurrently: ${ partitionsConsumedConcurrently } ` ) ;
137145 startTrackingMemory ( ) ;
138- const consumerRate = await runConsumer ( parameters , topic , warmupMessages , messageCount , true , partitionsConsumedConcurrently , stats ) ;
146+ const consumerRate = await runConsumer ( parameters , topic ,
147+ warmupMessages , messageCount ,
148+ true , partitionsConsumedConcurrently , stats ) ;
139149 endTrackingMemory ( `consumer-memory-batch-${ mode } .json` ) ;
140- console . log ( "=== Consumer Rate (eachBatch): " , consumerRate ) ;
150+ console . log ( "=== Consumer Rate MB/s (eachBatch): " , consumerRate ) ;
151+ console . log ( "=== Consumer Rate msg/s (eachBatch): " , stats . messageRate ) ;
141152 console . log ( "=== Average eachBatch lag: " , stats . averageOffsetLag ) ;
142153 console . log ( "=== Max eachBatch lag: " , stats . maxOffsetLag ) ;
154+ console . log ( "=== Average eachBatch size: " , stats . averageBatchSize ) ;
143155 console . log ( "=== Consumption time (eachBatch): " , stats . durationSeconds ) ;
144156 }
145157
0 commit comments