1+ #!/usr/bin/env node
2+
3+ const { execSync } = require ( 'child_process' ) ;
4+
5+ function runCommand ( command ) {
6+ try {
7+ const output = execSync ( command , { encoding : 'utf8' , stdio : 'pipe' } ) ;
8+ console . log ( output ) ;
9+ return output ;
10+ } catch ( error ) {
11+ const errorOutput = error . stdout || error . stderr || error . message ;
12+ console . log ( errorOutput ) ;
13+ return errorOutput ;
14+ }
15+ }
16+
17+ function extractValue ( content , pattern ) {
18+ try {
19+ const lines = content . split ( '\n' ) ;
20+ const matchingLine = lines . find ( line => line . includes ( pattern ) ) ;
21+ if ( matchingLine ) {
22+ const value = matchingLine . split ( ':' ) [ 1 ] ?. trim ( ) ;
23+ return Number ( value || '' ) ;
24+ }
25+ return NaN ;
26+ } catch ( error ) {
27+ return NaN ;
28+ }
29+ }
30+
31+ function belowThreshold ( value , target , threshold = 0.7 ) {
32+ if ( isNaN ( value ) || isNaN ( target ) )
33+ throw new Error ( `Invalid number comparison: value=${ value } , target=${ target } ` ) ;
34+ return value < ( target * threshold ) ;
35+ }
36+
37+ function belowTarget ( value , target ) {
38+ return belowThreshold ( value , target , 1 ) ;
39+ }
40+
41+ // Run performance tests and store outputs in memory
42+ console . log ( 'Running Confluent Producer/Consumer test...' ) ;
43+ const outputConfluentProducerConsumer = runCommand ( 'MODE=confluent MESSAGE_COUNT=50000 node performance-consolidated.js --create-topics --consumer --producer' ) ;
44+
45+ console . log ( 'Running KafkaJS Producer/Consumer test...' ) ;
46+ const outputKjsProducerConsumer = runCommand ( 'MODE=kafkajs MESSAGE_COUNT=50000 node performance-consolidated.js --create-topics --consumer --producer' ) ;
47+
48+ console . log ( 'Running Confluent CTP test...' ) ;
49+ const outputConfluentCtp = runCommand ( 'MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp' ) ;
50+
51+ console . log ( 'Running KafkaJS CTP test...' ) ;
52+ const outputKjsCtp = runCommand ( 'MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp' ) ;
53+
54+ // Extract Confluent results
55+ const producerConfluent = extractValue ( outputConfluentProducerConsumer , '=== Producer Rate:' ) ;
56+ const consumerConfluentMessage = extractValue ( outputConfluentProducerConsumer , '=== Consumer Rate (eachMessage):' ) ;
57+ const consumerConfluentTime = extractValue ( outputConfluentProducerConsumer , '=== Consumption time (eachMessage):' ) ;
58+ const consumerConfluentBatch = extractValue ( outputConfluentProducerConsumer , '=== Consumer Rate (eachBatch):' ) ;
59+ const consumerConfluentBatchTime = extractValue ( outputConfluentProducerConsumer , '=== Consumption time (eachBatch):' ) ;
60+ const consumerConfluentBatchAverageLag = extractValue ( outputConfluentProducerConsumer , '=== Average eachBatch lag:' ) ;
61+ const consumerConfluentBatchMaxLag = extractValue ( outputConfluentProducerConsumer , '=== Max eachBatch lag:' ) ;
62+ const consumerConfluentAverageRSS = extractValue ( outputConfluentProducerConsumer , '=== Max Average RSS across tests:' ) ;
63+ const consumerConfluentMaxRSS = extractValue ( outputConfluentProducerConsumer , '=== Max RSS across tests:' ) ;
64+ const ctpConfluent = extractValue ( outputConfluentCtp , '=== Consume-Transform-Produce Rate:' ) ;
65+
66+ // Extract KafkaJS results
67+ const producerKjs = extractValue ( outputKjsProducerConsumer , '=== Producer Rate:' ) ;
68+ const consumerKjsMessage = extractValue ( outputKjsProducerConsumer , '=== Consumer Rate (eachMessage):' ) ;
69+ const consumerKjsTime = extractValue ( outputKjsProducerConsumer , '=== Consumption time (eachMessage):' ) ;
70+ const consumerKjsBatch = extractValue ( outputKjsProducerConsumer , '=== Consumer Rate (eachBatch):' ) ;
71+ const consumerKjsBatchTime = extractValue ( outputKjsProducerConsumer , '=== Consumption time (eachBatch):' ) ;
72+ const consumerKjsBatchAverageLag = extractValue ( outputKjsProducerConsumer , '=== Average eachBatch lag:' ) ;
73+ const consumerKjsBatchMaxLag = extractValue ( outputKjsProducerConsumer , '=== Max eachBatch lag:' ) ;
74+ const consumerKjsAverageRSS = extractValue ( outputKjsProducerConsumer , '=== Max Average RSS across tests:' ) ;
75+ const consumerKjsMaxRSS = extractValue ( outputKjsProducerConsumer , '=== Max RSS across tests:' ) ;
76+ const ctpKjs = extractValue ( outputKjsCtp , '=== Consume-Transform-Produce Rate:' ) ;
77+
78+ // Print results
79+ console . log ( `Producer rates: confluent ${ producerConfluent } , kafkajs ${ producerKjs } ` ) ;
80+ console . log ( `Consumer rates (eachMessage): confluent ${ consumerConfluentMessage } , kafkajs ${ consumerKjsMessage } ` ) ;
81+ console . log ( `Consumption time (eachMessage): confluent ${ consumerConfluentTime } , kafkajs ${ consumerKjsTime } ` ) ;
82+ console . log ( `Consumer rates (eachBatch): confluent ${ consumerConfluentBatch } , kafkajs ${ consumerKjsBatch } ` ) ;
83+ console . log ( `Consumption time (eachBatch): confluent ${ consumerConfluentBatchTime } , kafkajs ${ consumerKjsBatchTime } ` ) ;
84+ console . log ( `Average eachBatch lag: confluent ${ consumerConfluentBatchAverageLag } , kafkajs ${ consumerKjsBatchAverageLag } ` ) ;
85+ console . log ( `Max eachBatch lag: confluent ${ consumerConfluentBatchMaxLag } , kafkajs ${ consumerKjsBatchMaxLag } ` ) ;
86+ console . log ( `Average RSS: confluent ${ consumerConfluentAverageRSS } , kafkajs ${ consumerKjsAverageRSS } ` ) ;
87+ console . log ( `Max RSS: confluent ${ consumerConfluentMaxRSS } , kafkajs ${ consumerKjsMaxRSS } ` ) ;
88+ console . log ( `CTP rates: confluent ${ ctpConfluent } , kafkajs ${ ctpKjs } ` ) ;
89+
90+ let errcode = 0 ;
91+ const maxPerformanceDifference = 0.7 ;
92+
93+ // Compare against KJS (30% threshold)
94+ if ( belowThreshold ( producerConfluent , producerKjs , maxPerformanceDifference ) ) {
95+ console . log ( `Producer rates differ by more than 30%: confluent ${ producerConfluent } , kafkajs ${ producerKjs } ` ) ;
96+ errcode = 1 ;
97+ }
98+
99+ if ( belowThreshold ( consumerConfluentMessage , consumerKjsMessage , maxPerformanceDifference ) ) {
100+ console . log ( `Consumer rates (eachMessage) differ by more than 30%: confluent ${ consumerConfluentMessage } , kafkajs ${ consumerKjsMessage } ` ) ;
101+ // FIXME: improve consumer performance at least to KafkaJS level
102+ errcode = 0 ;
103+ }
104+
105+ // Lower is better for time
106+ if ( belowThreshold ( consumerKjsTime , consumerConfluentTime , maxPerformanceDifference ) ) {
107+ console . log ( `Consumption time (eachMessage) differ by more than 30%: confluent ${ consumerConfluentTime } , kafkajs ${ consumerKjsTime } ` ) ;
108+ errcode = 0 ;
109+ }
110+
111+ if ( belowThreshold ( consumerConfluentBatch , consumerKjsBatch , maxPerformanceDifference ) ) {
112+ console . log ( `Consumer rates (eachBatch) differ by more than 30%: confluent ${ consumerConfluentBatch } , kafkajs ${ consumerKjsBatch } ` ) ;
113+ errcode = 0 ;
114+ }
115+
116+ // Lower is better for time
117+ if ( belowThreshold ( consumerKjsBatchTime , consumerConfluentBatchTime , maxPerformanceDifference ) ) {
118+ console . log ( `Consumption time (eachBatch) differ by more than 30%: confluent ${ consumerConfluentBatchTime } , kafkajs ${ consumerKjsBatchTime } ` ) ;
119+ errcode = 0 ;
120+ }
121+
122+ if ( belowThreshold ( ctpConfluent , ctpKjs , maxPerformanceDifference ) ) {
123+ console . log ( `CTP rates differ by more than 30%: confluent ${ ctpConfluent } , kafkajs ${ ctpKjs } ` ) ;
124+ errcode = 1 ;
125+ }
126+
127+ // Compare against target numbers
128+ const TARGET_PRODUCE = process . env . TARGET_PRODUCE_PERFORMANCE || '35' ;
129+ const TARGET_CONSUME = process . env . TARGET_CONSUME_PERFORMANCE || '18' ;
130+ const TARGET_CTP = process . env . TARGET_CTP_PERFORMANCE || '0.02' ;
131+
132+ if ( belowTarget ( producerConfluent , TARGET_PRODUCE ) ) {
133+ console . log ( `Confluent producer rate is below target: ${ producerConfluent } ` ) ;
134+ errcode = 1 ;
135+ }
136+
137+ if ( belowTarget ( consumerConfluentMessage , TARGET_CONSUME ) ) {
138+ console . log ( `Confluent consumer rate is below target: ${ consumerConfluentMessage } ` ) ;
139+ errcode = 1 ;
140+ }
141+
142+ if ( belowTarget ( ctpConfluent , TARGET_CTP ) ) {
143+ console . log ( `Confluent CTP rate is below target: ${ ctpConfluent } ` ) ;
144+ errcode = 1 ;
145+ }
146+
147+ process . exit ( errcode ) ;
0 commit comments