Skip to content

Commit 4209743

Browse files
committed
Use same producer and different E2E latencies
1 parent 476a38e commit 4209743

File tree

5 files changed

+114
-50
lines changed

5 files changed

+114
-50
lines changed

ci/tests/run_perf_test.js

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,10 @@ async function main() {
126126
let ctpConfluent, ctpKjs;
127127
let consumerConfluentMessage;
128128
let consumerConfluentMessageRate;
129-
let consumerConfluentMessageAvgLatency;
130-
let consumerConfluentMessageMaxLatency;
129+
let consumerConfluentMessageAvgLatencyT0T1;
130+
let consumerConfluentMessageMaxLatencyT0T1;
131+
let consumerConfluentMessageAvgLatencyT0T2;
132+
let consumerConfluentMessageMaxLatencyT0T2;
131133
let consumerConfluentTime;
132134
let consumerConfluentMessageAverageRSS;
133135
let consumerConfluentMessageMaxRSS;
@@ -137,8 +139,10 @@ async function main() {
137139

138140
let consumerConfluentBatch;
139141
let consumerConfluentBatchRate;
140-
let consumerConfluentBatchAvgLatency;
141-
let consumerConfluentBatchMaxLatency;
142+
let consumerConfluentBatchAvgLatencyT0T1;
143+
let consumerConfluentBatchMaxLatencyT0T1;
144+
let consumerConfluentBatchAvgLatencyT0T2;
145+
let consumerConfluentBatchMaxLatencyT0T2;
142146
let consumerConfluentBatchTime;
143147
let consumerConfluentBatchAverageLag;
144148
let consumerConfluentBatchMaxLag;
@@ -155,8 +159,10 @@ async function main() {
155159
if (consumerModeAll || consumerModeEachMessage) {
156160
consumerConfluentMessage = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate MB/s (eachMessage):');
157161
consumerConfluentMessageRate = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate msg/s (eachMessage):');
158-
consumerConfluentMessageAvgLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency (eachMessage):');
159-
consumerConfluentMessageMaxLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency (eachMessage):');
162+
consumerConfluentMessageAvgLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachMessage):');
163+
consumerConfluentMessageMaxLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachMessage):');
164+
consumerConfluentMessageAvgLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachMessage):');
165+
consumerConfluentMessageMaxLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachMessage):');
160166
consumerConfluentTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachMessage):');
161167
consumerConfluentMessageAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Average consumer-each-message RSS KB:');
162168
consumerConfluentMessageMaxRSS = extractValue(outputConfluentProducerConsumer, '=== Max consumer-each-message RSS KB:');
@@ -167,8 +173,10 @@ async function main() {
167173
if (consumerModeAll || consumerModeEachBatch) {
168174
consumerConfluentBatch = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate MB/s (eachBatch):');
169175
consumerConfluentBatchRate = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate msg/s (eachBatch):');
170-
consumerConfluentBatchAvgLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency (eachBatch):');
171-
consumerConfluentBatchMaxLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency (eachBatch):');
176+
consumerConfluentBatchAvgLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachBatch):');
177+
consumerConfluentBatchMaxLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachBatch):');
178+
consumerConfluentBatchAvgLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachBatch):');
179+
consumerConfluentBatchMaxLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachBatch):');
172180
consumerConfluentBatchTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachBatch):');
173181
consumerConfluentBatchAverageLag = extractValue(outputConfluentProducerConsumer, '=== Average eachBatch lag:');
174182
consumerConfluentBatchMaxLag = extractValue(outputConfluentProducerConsumer, '=== Max eachBatch lag:');
@@ -188,8 +196,8 @@ async function main() {
188196
// Extract KafkaJS results
189197
let consumerKjsMessage;
190198
let consumerKjsMessageRate;
191-
let consumerKjsMessageAvgLatency;
192-
let consumerKjsMessageMaxLatency;
199+
let consumerKjsMessageAvgLatencyT0T1;
200+
let consumerKjsMessageMaxLatencyT0T1;
193201
let consumerKjsTime;
194202
let consumerKjsMessageAverageRSS;
195203
let consumerKjsMessageMaxRSS;
@@ -199,8 +207,8 @@ async function main() {
199207

200208
let consumerKjsBatch;
201209
let consumerKjsBatchRate;
202-
let consumerKjsBatchAvgLatency;
203-
let consumerKjsBatchMaxLatency;
210+
let consumerKjsBatchAvgLatencyT0T1;
211+
let consumerKjsBatchMaxLatencyT0T1;
204212
let consumerKjsBatchTime;
205213
let consumerKjsBatchAverageLag;
206214
let consumerKjsBatchMaxLag;
@@ -217,8 +225,10 @@ async function main() {
217225
if (consumerModeAll || consumerModeEachMessage) {
218226
consumerKjsMessage = extractValue(outputKjsProducerConsumer, '=== Consumer Rate MB/s (eachMessage):');
219227
consumerKjsMessageRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachMessage):');
220-
consumerKjsMessageAvgLatency = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency (eachMessage):');
221-
consumerKjsMessageMaxLatency = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency (eachMessage):');
228+
consumerKjsMessageAvgLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachMessage):');
229+
consumerKjsMessageMaxLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachMessage):');
230+
consumerKjsMessageAvgLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachMessage):');
231+
consumerKjsMessageMaxLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachMessage):');
222232
consumerKjsMessageAverageRSS = extractValue(outputKjsProducerConsumer, '=== Average consumer-each-message RSS KB:');
223233
consumerKjsMessageMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max consumer-each-message RSS KB:');
224234
consumerKjsMessageAverageBrokerLag = extractValue(outputKjsProducerConsumer, `=== Average broker lag (${groupIdEachMessageKafkaJS}):`);
@@ -229,8 +239,10 @@ async function main() {
229239
consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):');
230240
consumerKjsBatch = extractValue(outputKjsProducerConsumer, '=== Consumer Rate MB/s (eachBatch):');
231241
consumerKjsBatchRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachBatch):');
232-
consumerKjsBatchAvgLatency = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency (eachBatch):');
233-
consumerKjsBatchMaxLatency = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency (eachBatch):');
242+
consumerKjsBatchAvgLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachBatch):');
243+
consumerKjsBatchMaxLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachBatch):');
244+
consumerKjsBatchAvgLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachBatch):');
245+
consumerKjsBatchMaxLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachBatch):');
234246
consumerKjsBatchTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachBatch):');
235247
consumerKjsBatchAverageLag = extractValue(outputKjsProducerConsumer, '=== Average eachBatch lag:');
236248
consumerKjsBatchMaxLag = extractValue(outputKjsProducerConsumer, '=== Max eachBatch lag:');
@@ -254,8 +266,12 @@ async function main() {
254266
if (consumerModeAll || consumerModeEachMessage) {
255267
console.log(`Consumer rates MB/s (eachMessage): confluent ${consumerConfluentMessage}, kafkajs ${consumerKjsMessage}`);
256268
console.log(`Consumer rates msg/s (eachMessage): confluent ${consumerConfluentMessageRate}, kafkajs ${consumerKjsMessageRate}`);
257-
console.log(`Consumer average E2E latency (eachMessage): confluent ${consumerConfluentMessageAvgLatency}, kafkajs ${consumerKjsMessageAvgLatency}`);
258-
console.log(`Consumer max E2E latency (eachMessage): confluent ${consumerConfluentMessageMaxLatency}, kafkajs ${consumerKjsMessageMaxLatency}`);
269+
console.log(`Consumer average E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T1}, kafkajs ${consumerKjsMessageAvgLatencyT0T1}`);
270+
console.log(`Consumer max E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T1}, kafkajs ${consumerKjsMessageMaxLatencyT0T1}`);
271+
if (produceToSecondTopic) {
272+
console.log(`Consumer average E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T2}, kafkajs ${consumerKjsMessageAvgLatencyT0T2}`);
273+
console.log(`Consumer max E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T2}, kafkajs ${consumerKjsMessageMaxLatencyT0T2}`);
274+
}
259275
console.log(`Consumption time (eachMessage): confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`);
260276
console.log(`Average RSS (eachMessage): confluent ${consumerConfluentMessageAverageRSS}, kafkajs ${consumerKjsMessageAverageRSS}`);
261277
console.log(`Max RSS (eachMessage): confluent ${consumerConfluentMessageMaxRSS}, kafkajs ${consumerKjsMessageMaxRSS}`);
@@ -266,8 +282,12 @@ async function main() {
266282
if (consumerModeAll || consumerModeEachBatch) {
267283
console.log(`Consumer rates MB/s (eachBatch): confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`);
268284
console.log(`Consumer rates msg/s (eachBatch): confluent ${consumerConfluentBatchRate}, kafkajs ${consumerKjsBatchRate}`);
269-
console.log(`Consumer average E2E latency (eachBatch): confluent ${consumerConfluentBatchAvgLatency}, kafkajs ${consumerKjsBatchAvgLatency}`);
270-
console.log(`Consumer max E2E latency (eachBatch): confluent ${consumerConfluentBatchMaxLatency}, kafkajs ${consumerKjsBatchMaxLatency}`);
285+
console.log(`Consumer average E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T1}, kafkajs ${consumerKjsBatchAvgLatencyT0T1}`);
286+
console.log(`Consumer max E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T1}, kafkajs ${consumerKjsBatchMaxLatencyT0T1}`);
287+
if (produceToSecondTopic) {
288+
console.log(`Consumer average E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T2}, kafkajs ${consumerKjsBatchAvgLatencyT0T2}`);
289+
console.log(`Consumer max E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T2}, kafkajs ${consumerKjsBatchMaxLatencyT0T2}`);
290+
}
271291
console.log(`Consumption time (eachBatch): confluent ${consumerConfluentBatchTime}, kafkajs ${consumerKjsBatchTime}`);
272292
console.log(`Average eachBatch lag: confluent ${consumerConfluentBatchAverageLag}, kafkajs ${consumerKjsBatchAverageLag}`);
273293
console.log(`Max eachBatch lag: confluent ${consumerConfluentBatchMaxLag}, kafkajs ${consumerKjsBatchMaxLag}`);

examples/performance/performance-consolidated.js

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
const fs = require('fs');
22
const mode = process.env.MODE ? process.env.MODE : 'confluent';
33

4-
let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether;
4+
let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether, runProducerCKJS;
55
if (mode === 'confluent') {
66
({ runProducer, runConsumer, runConsumeTransformProduce,
77
runCreateTopics, runLagMonitoring,
88
runProducerConsumerTogether } = require('./performance-primitives'));
9+
runProducerCKJS = runProducer;
910
} else {
1011
({ runProducer, runConsumer, runConsumeTransformProduce, runProducerConsumerTogether } = require('./performance-primitives-kafkajs'));
1112
/* createTopics is more reliable in CKJS */
12-
({ runCreateTopics, runLagMonitoring } = require('./performance-primitives'));
13+
({ runCreateTopics, runLagMonitoring, runProducer: runProducerCKJS } = require('./performance-primitives'));
1314
}
1415

1516
const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
@@ -32,6 +33,7 @@ const ctpConcurrency = process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? +proc
3233
const consumerProcessingTime = process.env.CONSUMER_PROCESSING_TIME ? +process.env.CONSUMER_PROCESSING_TIME : 100;
3334
const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.env.PRODUCER_PROCESSING_TIME : 100;
3435
const limitRPS = process.env.LIMIT_RPS ? +process.env.LIMIT_RPS : null;
36+
const useCKJSProducerEverywhere = process.env.USE_CKJS_PRODUCER_EVERYWHERE === 'true';
3537
const parameters = {
3638
brokers,
3739
securityProtocol,
@@ -134,8 +136,13 @@ function logParameters(parameters) {
134136
console.log(` Compression: ${compression}`);
135137
console.log(` Limit RPS: ${limitRPS}`);
136138
console.log(` Warmup Messages: ${warmupMessages}`);
139+
console.log(` Use CKJS Producer Everywhere: ${useCKJSProducerEverywhere}`);
137140
startTrackingMemory();
138-
const producerRate = await runProducer(parameters, topic, batchSize,
141+
let runProducerFunction = runProducer;
142+
if (useCKJSProducerEverywhere) {
143+
runProducerFunction = runProducerCKJS;
144+
}
145+
const producerRate = await runProducerFunction(parameters, topic, batchSize,
139146
warmupMessages, messageCount, messageSize, compression,
140147
randomness, limitRPS);
141148
endTrackingMemory('producer', `producer-memory-${mode}.json`);
@@ -153,12 +160,16 @@ function logParameters(parameters) {
153160
const consumerRate = await runConsumer(parameters, topic,
154161
warmupMessages, messageCount,
155162
false, partitionsConsumedConcurrently, stats,
156-
produceToSecondTopic ? topic2 : null, compression);
163+
produceToSecondTopic ? topic2 : null, compression, useCKJSProducerEverywhere);
157164
endTrackingMemory('consumer-each-message', `consumer-memory-message-${mode}.json`);
158165
console.log("=== Consumer Rate MB/s (eachMessage): ", consumerRate);
159166
console.log("=== Consumer Rate msg/s (eachMessage): ", stats.messageRate);
160-
console.log("=== Consumer average E2E latency (eachMessage): ", stats.avgLatency);
161-
console.log("=== Consumer max E2E latency (eachMessage): ", stats.maxLatency);
167+
console.log("=== Consumer average E2E latency T0-T1 (eachMessage): ", stats.avgLatencyT0T1);
168+
console.log("=== Consumer max E2E latency T0-T1 (eachMessage): ", stats.maxLatencyT0T1);
169+
if (produceToSecondTopic) {
170+
console.log("=== Consumer average E2E latency T0-T2 (eachMessage): ", stats.avgLatencyT0T2);
171+
console.log("=== Consumer max E2E latency T0-T2 (eachMessage): ", stats.maxLatencyT0T2);
172+
}
162173
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
163174
}
164175

@@ -173,15 +184,19 @@ function logParameters(parameters) {
173184
const consumerRate = await runConsumer(parameters, topic,
174185
warmupMessages, messageCount,
175186
true, partitionsConsumedConcurrently, stats,
176-
produceToSecondTopic ? topic2 : null, compression);
187+
produceToSecondTopic ? topic2 : null, compression, useCKJSProducerEverywhere);
177188
endTrackingMemory('consumer-each-batch', `consumer-memory-batch-${mode}.json`);
178189
console.log("=== Consumer Rate MB/s (eachBatch): ", consumerRate);
179190
console.log("=== Consumer Rate msg/s (eachBatch): ", stats.messageRate);
180191
console.log("=== Average eachBatch lag: ", stats.averageOffsetLag);
181192
console.log("=== Max eachBatch lag: ", stats.maxOffsetLag);
182193
console.log("=== Average eachBatch size: ", stats.averageBatchSize);
183-
console.log("=== Consumer average E2E latency (eachBatch): ", stats.avgLatency);
184-
console.log("=== Consumer max E2E latency (eachBatch): ", stats.maxLatency);
194+
console.log("=== Consumer average E2E latency T0-T1 (eachBatch): ", stats.avgLatencyT0T1);
195+
console.log("=== Consumer max E2E latency T0-T1 (eachBatch): ", stats.maxLatencyT0T1);
196+
if (produceToSecondTopic) {
197+
console.log("=== Consumer average E2E latency T0-T2 (eachBatch): ", stats.avgLatencyT0T2);
198+
console.log("=== Consumer max E2E latency T0-T2 (eachBatch): ", stats.maxLatencyT0T2);
199+
}
185200
console.log("=== Consumption time (eachBatch): ", stats.durationSeconds);
186201
}
187202

0 commit comments

Comments
 (0)