@@ -42,18 +42,22 @@ function belowTarget(value, target) {
4242
4343// Run performance tests and store outputs in memory
4444console . log ( 'Running Confluent Producer/Consumer test...' ) ;
45- const outputConfluentProducerConsumer = runCommand ( 'MODE=confluent MESSAGE_COUNT=50000 node performance-consolidated.js --create-topics --consumer --producer' ) ;
45+ const messageCount = process . env . MESSAGE_COUNT ? + process . env . MESSAGE_COUNT : 50000 ;
46+ const skipCtpTest = process . env . SKIP_CTP_TEST ? process . env . SKIP_CTP_TEST === 'true' : false ;
47+ const outputConfluentProducerConsumer = runCommand ( `MODE=confluent MESSAGE_COUNT=${ messageCount } node performance-consolidated.js --create-topics --consumer --producer` ) ;
4648
4749console . log ( 'Running KafkaJS Producer/Consumer test...' ) ;
48- const outputKjsProducerConsumer = runCommand ( ' MODE=kafkajs MESSAGE_COUNT=50000 node performance-consolidated.js --create-topics --consumer --producer' ) ;
50+ const outputKjsProducerConsumer = runCommand ( ` MODE=kafkajs MESSAGE_COUNT=${ messageCount } node performance-consolidated.js --create-topics --consumer --producer` ) ;
4951
5052console . log ( 'Running Confluent CTP test...' ) ;
5153const outputConfluentCtp = runCommand ( 'MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp' ) ;
5254
5355console . log ( 'Running KafkaJS CTP test...' ) ;
54- const outputKjsCtp = runCommand ( 'MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp' ) ;
56+ const outputKjsCtp = skipCtpTest ? '' :
57+ runCommand ( 'MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp' ) ;
5558
5659// Extract Confluent results
60+ let ctpConfluent , ctpKjs ;
5761const producerConfluent = extractValue ( outputConfluentProducerConsumer , '=== Producer Rate:' ) ;
5862const consumerConfluentMessage = extractValue ( outputConfluentProducerConsumer , '=== Consumer Rate (eachMessage):' ) ;
5963const consumerConfluentTime = extractValue ( outputConfluentProducerConsumer , '=== Consumption time (eachMessage):' ) ;
@@ -63,7 +67,9 @@ const consumerConfluentBatchAverageLag = extractValue(outputConfluentProducerCon
6367const consumerConfluentBatchMaxLag = extractValue ( outputConfluentProducerConsumer , '=== Max eachBatch lag:' ) ;
6468const consumerConfluentAverageRSS = extractValue ( outputConfluentProducerConsumer , '=== Max Average RSS across tests:' ) ;
6569const consumerConfluentMaxRSS = extractValue ( outputConfluentProducerConsumer , '=== Max RSS across tests:' ) ;
66- const ctpConfluent = extractValue ( outputConfluentCtp , '=== Consume-Transform-Produce Rate:' ) ;
70+ if ( ! skipCtpTest ) {
71+ ctpConfluent = extractValue ( outputConfluentCtp , '=== Consume-Transform-Produce Rate:' ) ;
72+ }
6773
6874// Extract KafkaJS results
6975const producerKjs = extractValue ( outputKjsProducerConsumer , '=== Producer Rate:' ) ;
@@ -75,7 +81,9 @@ const consumerKjsBatchAverageLag = extractValue(outputKjsProducerConsumer, '===
7581const consumerKjsBatchMaxLag = extractValue ( outputKjsProducerConsumer , '=== Max eachBatch lag:' ) ;
7682const consumerKjsAverageRSS = extractValue ( outputKjsProducerConsumer , '=== Max Average RSS across tests:' ) ;
7783const consumerKjsMaxRSS = extractValue ( outputKjsProducerConsumer , '=== Max RSS across tests:' ) ;
78- const ctpKjs = extractValue ( outputKjsCtp , '=== Consume-Transform-Produce Rate:' ) ;
84+ if ( ! skipCtpTest ) {
85+ ctpKjs = extractValue ( outputKjsCtp , '=== Consume-Transform-Produce Rate:' ) ;
86+ }
7987
8088// Print results
8189console . log ( `Producer rates: confluent ${ producerConfluent } , kafkajs ${ producerKjs } ` ) ;
@@ -87,7 +95,9 @@ console.log(`Average eachBatch lag: confluent ${consumerConfluentBatchAverageLag
8795console . log ( `Max eachBatch lag: confluent ${ consumerConfluentBatchMaxLag } , kafkajs ${ consumerKjsBatchMaxLag } ` ) ;
8896console . log ( `Average RSS: confluent ${ consumerConfluentAverageRSS } , kafkajs ${ consumerKjsAverageRSS } ` ) ;
8997console . log ( `Max RSS: confluent ${ consumerConfluentMaxRSS } , kafkajs ${ consumerKjsMaxRSS } ` ) ;
90- console . log ( `CTP rates: confluent ${ ctpConfluent } , kafkajs ${ ctpKjs } ` ) ;
98+ if ( ! skipCtpTest ) {
99+ console . log ( `CTP rates: confluent ${ ctpConfluent } , kafkajs ${ ctpKjs } ` ) ;
100+ }
91101
92102let errcode = 0 ;
93103const maxPerformanceDifference = 0.7 ;
@@ -121,7 +131,7 @@ if (belowThreshold(consumerKjsBatchTime, consumerConfluentBatchTime, maxPerforma
121131 errcode = 0 ;
122132}
123133
124- if ( belowThreshold ( ctpConfluent , ctpKjs , maxPerformanceDifference ) ) {
134+ if ( ! skipCtpTest && belowThreshold ( ctpConfluent , ctpKjs , maxPerformanceDifference ) ) {
125135 console . log ( `CTP rates differ by more than 30%: confluent ${ ctpConfluent } , kafkajs ${ ctpKjs } ` ) ;
126136 errcode = 1 ;
127137}
@@ -141,7 +151,7 @@ if (belowTarget(consumerConfluentMessage, TARGET_CONSUME)) {
141151 errcode = 1 ;
142152}
143153
144- if ( belowTarget ( ctpConfluent , TARGET_CTP ) ) {
154+ if ( ! skipCtpTest && belowTarget ( ctpConfluent , TARGET_CTP ) ) {
145155 console . log ( `Confluent CTP rate is below target: ${ ctpConfluent } ` ) ;
146156 errcode = 1 ;
147157}
0 commit comments