@@ -79,15 +79,24 @@ async function main() {
7979
8080 if ( concurrentRun ) {
8181 console . log ( `Running ${ modeLabel } Producer/Consumer test (concurrently)...` ) ;
82+ const TERMINATE_TIMEOUT_MS = process . env . TERMINATE_TIMEOUT_MS ? + process . env . TERMINATE_TIMEOUT_MS : 600000 ;
83+ // Wait 2s more to see if all lag is caught up
84+ const TERMINATE_TIMEOUT_MS_CONSUMERS = TERMINATE_TIMEOUT_MS + 2000 ;
8285
8386 await runCommand ( `MODE=${ mode } node performance-consolidated.js --create-topics` ) ;
8487 const allPromises = [ ] ;
8588 allPromises . push ( runCommand ( `MODE=${ mode } MESSAGE_COUNT=${ messageCount } node performance-consolidated.js --producer` ) ) ;
8689 if ( consumerModeAll || consumerModeEachMessage ) {
87- allPromises . push ( runCommand ( `MODE=${ mode } MESSAGE_COUNT=${ messageCount } GROUPID_MESSAGE=${ groupIdEachMessage } node performance-consolidated.js --consumer-each-message ${ produceToSecondTopicParam } ` ) ) ;
90+ allPromises . push ( runCommand ( `MODE=${ mode } MESSAGE_COUNT=${ messageCount } TERMINATE_TIMEOUT_MS= ${ TERMINATE_TIMEOUT_MS_CONSUMERS } GROUPID_MESSAGE=${ groupIdEachMessage } node performance-consolidated.js --consumer-each-message ${ produceToSecondTopicParam } ` ) ) ;
8891 }
8992 if ( consumerModeAll || consumerModeEachBatch ) {
90- allPromises . push ( runCommand ( `MODE=${ mode } MESSAGE_COUNT=${ messageCount } GROUPID_BATCH=${ groupIdEachBatch } node performance-consolidated.js --consumer-each-batch ${ produceToSecondTopicParam } ` ) ) ;
93+ allPromises . push ( runCommand ( `MODE=${ mode } MESSAGE_COUNT=${ messageCount } TERMINATE_TIMEOUT_MS=${ TERMINATE_TIMEOUT_MS_CONSUMERS } GROUPID_BATCH=${ groupIdEachBatch } node performance-consolidated.js --consumer-each-batch ${ produceToSecondTopicParam } ` ) ) ;
94+ }
95+ if ( consumerModeAll || consumerModeEachMessage ) {
96+ allPromises . push ( runCommand ( `MODE=${ mode } TERMINATE_TIMEOUT_MS=${ TERMINATE_TIMEOUT_MS_CONSUMERS } GROUPID_MONITOR=${ groupIdEachMessage } node performance-consolidated.js --monitor-lag` ) ) ;
97+ }
98+ if ( consumerModeAll || consumerModeEachBatch ) {
99+ allPromises . push ( runCommand ( `MODE=${ mode } TERMINATE_TIMEOUT_MS=${ TERMINATE_TIMEOUT_MS_CONSUMERS } GROUPID_MONITOR=${ groupIdEachBatch } node performance-consolidated.js --monitor-lag` ) ) ;
91100 }
92101 const results = await Promise . allSettled ( allPromises ) ;
93102 return results . map ( r => r . status === 'fulfilled' ? r . value : '' ) . join ( '\n' ) ;
@@ -123,6 +132,10 @@ async function main() {
123132 let consumerConfluentTime ;
124133 let consumerConfluentMessageAverageRSS ;
125134 let consumerConfluentMessageMaxRSS ;
135+ let consumerConfluentMessageAverageBrokerLag ;
136+ let consumerConfluentMessageMaxBrokerLag ;
137+ let consumerConfluentMessageTotalLagMeasurements ;
138+
126139 let consumerConfluentBatch ;
127140 let consumerConfluentBatchRate ;
128141 let consumerConfluentBatchAvgLatency ;
@@ -133,6 +146,9 @@ async function main() {
133146 let consumerConfluentBatchAverageSize ;
134147 let consumerConfluentBatchAverageRSS ;
135148 let consumerConfluentBatchMaxRSS ;
149+ let consumerConfluentBatchAverageBrokerLag ;
150+ let consumerConfluentBatchMaxBrokerLag ;
151+ let consumerConfluentBatchTotalLagMeasurements ;
136152
137153 const producerConfluent = extractValue ( outputConfluentProducerConsumer , '=== Producer Rate:' ) ;
138154 const producerConfluentAverageRSS = extractValue ( outputConfluentProducerConsumer , '=== Average producer RSS KB:' ) ;
@@ -145,6 +161,9 @@ async function main() {
145161 consumerConfluentTime = extractValue ( outputConfluentProducerConsumer , '=== Consumption time (eachMessage):' ) ;
146162 consumerConfluentMessageAverageRSS = extractValue ( outputConfluentProducerConsumer , '=== Average consumer-each-message RSS KB:' ) ;
147163 consumerConfluentMessageMaxRSS = extractValue ( outputConfluentProducerConsumer , '=== Max consumer-each-message RSS KB:' ) ;
164+ consumerConfluentMessageAverageBrokerLag = extractValue ( outputConfluentProducerConsumer , `=== Average broker lag (${ groupIdEachMessageConfluent } ):` ) ;
165+ consumerConfluentMessageMaxBrokerLag = extractValue ( outputConfluentProducerConsumer , `=== Max broker lag (${ groupIdEachMessageConfluent } ):` ) ;
166+ consumerConfluentMessageTotalLagMeasurements = extractValue ( outputConfluentProducerConsumer , `=== Total broker lag measurements (${ groupIdEachMessageConfluent } ):` ) ;
148167 }
149168 if ( consumerModeAll || consumerModeEachBatch ) {
150169 consumerConfluentBatch = extractValue ( outputConfluentProducerConsumer , '=== Consumer Rate MB/s (eachBatch):' ) ;
@@ -157,6 +176,9 @@ async function main() {
157176 consumerConfluentBatchAverageSize = extractValue ( outputConfluentProducerConsumer , '=== Average eachBatch size:' ) ;
158177 consumerConfluentBatchAverageRSS = extractValue ( outputConfluentProducerConsumer , '=== Average consumer-each-batch RSS KB:' ) ;
159178 consumerConfluentBatchMaxRSS = extractValue ( outputConfluentProducerConsumer , '=== Max consumer-each-batch RSS KB:' ) ;
179+ consumerConfluentBatchAverageBrokerLag = extractValue ( outputConfluentProducerConsumer , `=== Average broker lag (${ groupIdEachBatchConfluent } ):` ) ;
180+ consumerConfluentBatchMaxBrokerLag = extractValue ( outputConfluentProducerConsumer , `=== Max broker lag (${ groupIdEachBatchConfluent } ):` ) ;
181+ consumerConfluentBatchTotalLagMeasurements = extractValue ( outputConfluentProducerConsumer , `=== Total broker lag measurements (${ groupIdEachBatchConfluent } ):` ) ;
160182 }
161183 const consumerConfluentAverageRSS = extractValue ( outputConfluentProducerConsumer , '=== Max Average RSS across tests:' ) ;
162184 const consumerConfluentMaxRSS = extractValue ( outputConfluentProducerConsumer , '=== Max RSS across tests:' ) ;
@@ -172,6 +194,10 @@ async function main() {
172194 let consumerKjsTime ;
173195 let consumerKjsMessageAverageRSS ;
174196 let consumerKjsMessageMaxRSS ;
197+ let consumerKjsMessageAverageBrokerLag ;
198+ let consumerKjsMessageMaxBrokerLag ;
199+ let consumerKjsMessageTotalLagMeasurements ;
200+
175201 let consumerKjsBatch ;
176202 let consumerKjsBatchRate ;
177203 let consumerKjsBatchAvgLatency ;
@@ -182,6 +208,9 @@ async function main() {
182208 let consumerKjsBatchAverageSize ;
183209 let consumerKjsBatchAverageRSS ;
184210 let consumerKjsBatchMaxRSS ;
211+ let consumerKjsBatchAverageBrokerLag ;
212+ let consumerKjsBatchMaxBrokerLag ;
213+ let consumerKjsBatchTotalLagMeasurements ;
185214
186215 const producerKjs = extractValue ( outputKjsProducerConsumer , '=== Producer Rate:' ) ;
187216 const producerKjsAverageRSS = extractValue ( outputKjsProducerConsumer , '=== Average producer RSS KB:' ) ;
@@ -193,6 +222,9 @@ async function main() {
193222 consumerKjsMessageMaxLatency = extractValue ( outputKjsProducerConsumer , '=== Consumer max E2E latency (eachMessage):' ) ;
194223 consumerKjsMessageAverageRSS = extractValue ( outputKjsProducerConsumer , '=== Average consumer-each-message RSS KB:' ) ;
195224 consumerKjsMessageMaxRSS = extractValue ( outputKjsProducerConsumer , '=== Max consumer-each-message RSS KB:' ) ;
225+ consumerKjsMessageAverageBrokerLag = extractValue ( outputKjsProducerConsumer , `=== Average broker lag (${ groupIdEachMessageKafkaJS } ):` ) ;
226+ consumerKjsMessageMaxBrokerLag = extractValue ( outputKjsProducerConsumer , `=== Max broker lag (${ groupIdEachMessageKafkaJS } ):` ) ;
227+ consumerKjsMessageTotalLagMeasurements = extractValue ( outputKjsProducerConsumer , `=== Total broker lag measurements (${ groupIdEachMessageKafkaJS } ):` ) ;
196228 }
197229 if ( consumerModeAll || consumerModeEachBatch ) {
198230 consumerKjsTime = extractValue ( outputKjsProducerConsumer , '=== Consumption time (eachMessage):' ) ;
@@ -206,6 +238,9 @@ async function main() {
206238 consumerKjsBatchAverageSize = extractValue ( outputKjsProducerConsumer , '=== Average eachBatch size:' ) ;
207239 consumerKjsBatchAverageRSS = extractValue ( outputKjsProducerConsumer , '=== Average consumer-each-batch RSS KB:' ) ;
208240 consumerKjsBatchMaxRSS = extractValue ( outputKjsProducerConsumer , '=== Max consumer-each-batch RSS KB:' ) ;
241+ consumerKjsBatchAverageBrokerLag = extractValue ( outputKjsProducerConsumer , `=== Average broker lag (${ groupIdEachBatchKafkaJS } ):` ) ;
242+ consumerKjsBatchMaxBrokerLag = extractValue ( outputKjsProducerConsumer , `=== Max broker lag (${ groupIdEachBatchKafkaJS } ):` ) ;
243+ consumerKjsBatchTotalLagMeasurements = extractValue ( outputKjsProducerConsumer , `=== Total broker lag measurements (${ groupIdEachBatchKafkaJS } ):` ) ;
209244 }
210245 const consumerKjsAverageRSS = extractValue ( outputKjsProducerConsumer , '=== Max Average RSS across tests:' ) ;
211246 const consumerKjsMaxRSS = extractValue ( outputKjsProducerConsumer , '=== Max RSS across tests:' ) ;
@@ -225,6 +260,9 @@ async function main() {
225260 console . log ( `Consumption time (eachMessage): confluent ${ consumerConfluentTime } , kafkajs ${ consumerKjsTime } ` ) ;
226261 console . log ( `Average RSS (eachMessage): confluent ${ consumerConfluentMessageAverageRSS } , kafkajs ${ consumerKjsMessageAverageRSS } ` ) ;
227262 console . log ( `Max RSS (eachMessage): confluent ${ consumerConfluentMessageMaxRSS } , kafkajs ${ consumerKjsMessageMaxRSS } ` ) ;
263+ console . log ( `Average broker lag (eachMessage): confluent ${ consumerConfluentMessageAverageBrokerLag } , kafkajs ${ consumerKjsMessageAverageBrokerLag } ` ) ;
264+ console . log ( `Max broker lag (eachMessage): confluent ${ consumerConfluentMessageMaxBrokerLag } , kafkajs ${ consumerKjsMessageMaxBrokerLag } ` ) ;
265+ console . log ( `Total broker lag measurements (eachMessage): confluent ${ consumerConfluentMessageTotalLagMeasurements } , kafkajs ${ consumerKjsMessageTotalLagMeasurements } ` ) ;
228266 }
229267 if ( consumerModeAll || consumerModeEachBatch ) {
230268 console . log ( `Consumer rates MB/s (eachBatch): confluent ${ consumerConfluentBatch } , kafkajs ${ consumerKjsBatch } ` ) ;
@@ -237,6 +275,9 @@ async function main() {
237275 console . log ( `Average eachBatch size: confluent ${ consumerConfluentBatchAverageSize } , kafkajs ${ consumerKjsBatchAverageSize } ` ) ;
238276 console . log ( `Average RSS (eachBatch): confluent ${ consumerConfluentBatchAverageRSS } , kafkajs ${ consumerKjsBatchAverageRSS } ` ) ;
239277 console . log ( `Max RSS (eachBatch): confluent ${ consumerConfluentBatchMaxRSS } , kafkajs ${ consumerKjsBatchMaxRSS } ` ) ;
278+ console . log ( `Average broker lag (eachBatch): confluent ${ consumerConfluentBatchAverageBrokerLag } , kafkajs ${ consumerKjsBatchAverageBrokerLag } ` ) ;
279+ console . log ( `Max broker lag (eachBatch): confluent ${ consumerConfluentBatchMaxBrokerLag } , kafkajs ${ consumerKjsBatchMaxBrokerLag } ` ) ;
280+ console . log ( `Total broker lag measurements (eachBatch): confluent ${ consumerConfluentBatchTotalLagMeasurements } , kafkajs ${ consumerKjsBatchTotalLagMeasurements } ` ) ;
240281 }
241282 if ( ! concurrentRun ) {
242283 console . log ( `Average RSS: confluent ${ consumerConfluentAverageRSS } , kafkajs ${ consumerKjsAverageRSS } ` ) ;
0 commit comments