Skip to content

Commit 3acec88

Browse files
committed
Use same producer and different E2E latencies
1 parent cd8d7ac commit 3acec88

File tree

5 files changed

+117
-50
lines changed

5 files changed

+117
-50
lines changed

ci/tests/run_perf_test.js

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,11 @@ async function main() {
106106
}
107107

108108
const runProducerConsumer = async () => {
109+
console.log(`Running Producer/Consumer tests with Confluent Kafka JS at ${new Date().toISOString()}`);
109110
outputConfluentProducerConsumer = await runProducerConsumerMode('confluent');
111+
console.log(`Running Producer/Consumer tests with KafkaJS at ${new Date().toISOString()}`);
110112
outputKjsProducerConsumer = await runProducerConsumerMode('kafkajs');
113+
console.log(`Producer/Consumer tests completed at ${new Date().toISOString()}`);
111114
}
112115

113116
await runProducerConsumer();
@@ -126,8 +129,10 @@ async function main() {
126129
let ctpConfluent, ctpKjs;
127130
let consumerConfluentMessage;
128131
let consumerConfluentMessageRate;
129-
let consumerConfluentMessageAvgLatency;
130-
let consumerConfluentMessageMaxLatency;
132+
let consumerConfluentMessageAvgLatencyT0T1;
133+
let consumerConfluentMessageMaxLatencyT0T1;
134+
let consumerConfluentMessageAvgLatencyT0T2;
135+
let consumerConfluentMessageMaxLatencyT0T2;
131136
let consumerConfluentTime;
132137
let consumerConfluentMessageAverageRSS;
133138
let consumerConfluentMessageMaxRSS;
@@ -137,8 +142,10 @@ async function main() {
137142

138143
let consumerConfluentBatch;
139144
let consumerConfluentBatchRate;
140-
let consumerConfluentBatchAvgLatency;
141-
let consumerConfluentBatchMaxLatency;
145+
let consumerConfluentBatchAvgLatencyT0T1;
146+
let consumerConfluentBatchMaxLatencyT0T1;
147+
let consumerConfluentBatchAvgLatencyT0T2;
148+
let consumerConfluentBatchMaxLatencyT0T2;
142149
let consumerConfluentBatchTime;
143150
let consumerConfluentBatchAverageLag;
144151
let consumerConfluentBatchMaxLag;
@@ -155,8 +162,10 @@ async function main() {
155162
if (consumerModeAll || consumerModeEachMessage) {
156163
consumerConfluentMessage = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate MB/s (eachMessage):');
157164
consumerConfluentMessageRate = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate msg/s (eachMessage):');
158-
consumerConfluentMessageAvgLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency (eachMessage):');
159-
consumerConfluentMessageMaxLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency (eachMessage):');
165+
consumerConfluentMessageAvgLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachMessage):');
166+
consumerConfluentMessageMaxLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachMessage):');
167+
consumerConfluentMessageAvgLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachMessage):');
168+
consumerConfluentMessageMaxLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachMessage):');
160169
consumerConfluentTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachMessage):');
161170
consumerConfluentMessageAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Average consumer-each-message RSS KB:');
162171
consumerConfluentMessageMaxRSS = extractValue(outputConfluentProducerConsumer, '=== Max consumer-each-message RSS KB:');
@@ -167,8 +176,10 @@ async function main() {
167176
if (consumerModeAll || consumerModeEachBatch) {
168177
consumerConfluentBatch = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate MB/s (eachBatch):');
169178
consumerConfluentBatchRate = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate msg/s (eachBatch):');
170-
consumerConfluentBatchAvgLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency (eachBatch):');
171-
consumerConfluentBatchMaxLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency (eachBatch):');
179+
consumerConfluentBatchAvgLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachBatch):');
180+
consumerConfluentBatchMaxLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachBatch):');
181+
consumerConfluentBatchAvgLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachBatch):');
182+
consumerConfluentBatchMaxLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachBatch):');
172183
consumerConfluentBatchTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachBatch):');
173184
consumerConfluentBatchAverageLag = extractValue(outputConfluentProducerConsumer, '=== Average eachBatch lag:');
174185
consumerConfluentBatchMaxLag = extractValue(outputConfluentProducerConsumer, '=== Max eachBatch lag:');
@@ -188,8 +199,8 @@ async function main() {
188199
// Extract KafkaJS results
189200
let consumerKjsMessage;
190201
let consumerKjsMessageRate;
191-
let consumerKjsMessageAvgLatency;
192-
let consumerKjsMessageMaxLatency;
202+
let consumerKjsMessageAvgLatencyT0T1;
203+
let consumerKjsMessageMaxLatencyT0T1;
193204
let consumerKjsTime;
194205
let consumerKjsMessageAverageRSS;
195206
let consumerKjsMessageMaxRSS;
@@ -199,8 +210,8 @@ async function main() {
199210

200211
let consumerKjsBatch;
201212
let consumerKjsBatchRate;
202-
let consumerKjsBatchAvgLatency;
203-
let consumerKjsBatchMaxLatency;
213+
let consumerKjsBatchAvgLatencyT0T1;
214+
let consumerKjsBatchMaxLatencyT0T1;
204215
let consumerKjsBatchTime;
205216
let consumerKjsBatchAverageLag;
206217
let consumerKjsBatchMaxLag;
@@ -217,8 +228,10 @@ async function main() {
217228
if (consumerModeAll || consumerModeEachMessage) {
218229
consumerKjsMessage = extractValue(outputKjsProducerConsumer, '=== Consumer Rate MB/s (eachMessage):');
219230
consumerKjsMessageRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachMessage):');
220-
consumerKjsMessageAvgLatency = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency (eachMessage):');
221-
consumerKjsMessageMaxLatency = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency (eachMessage):');
231+
consumerKjsMessageAvgLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachMessage):');
232+
consumerKjsMessageMaxLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachMessage):');
233+
consumerKjsMessageAvgLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachMessage):');
234+
consumerKjsMessageMaxLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachMessage):');
222235
consumerKjsMessageAverageRSS = extractValue(outputKjsProducerConsumer, '=== Average consumer-each-message RSS KB:');
223236
consumerKjsMessageMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max consumer-each-message RSS KB:');
224237
consumerKjsMessageAverageBrokerLag = extractValue(outputKjsProducerConsumer, `=== Average broker lag (${groupIdEachMessageKafkaJS}):`);
@@ -229,8 +242,10 @@ async function main() {
229242
consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):');
230243
consumerKjsBatch = extractValue(outputKjsProducerConsumer, '=== Consumer Rate MB/s (eachBatch):');
231244
consumerKjsBatchRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachBatch):');
232-
consumerKjsBatchAvgLatency = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency (eachBatch):');
233-
consumerKjsBatchMaxLatency = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency (eachBatch):');
245+
consumerKjsBatchAvgLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachBatch):');
246+
consumerKjsBatchMaxLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachBatch):');
247+
consumerKjsBatchAvgLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachBatch):');
248+
consumerKjsBatchMaxLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachBatch):');
234249
consumerKjsBatchTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachBatch):');
235250
consumerKjsBatchAverageLag = extractValue(outputKjsProducerConsumer, '=== Average eachBatch lag:');
236251
consumerKjsBatchMaxLag = extractValue(outputKjsProducerConsumer, '=== Max eachBatch lag:');
@@ -254,8 +269,12 @@ async function main() {
254269
if (consumerModeAll || consumerModeEachMessage) {
255270
console.log(`Consumer rates MB/s (eachMessage): confluent ${consumerConfluentMessage}, kafkajs ${consumerKjsMessage}`);
256271
console.log(`Consumer rates msg/s (eachMessage): confluent ${consumerConfluentMessageRate}, kafkajs ${consumerKjsMessageRate}`);
257-
console.log(`Consumer average E2E latency (eachMessage): confluent ${consumerConfluentMessageAvgLatency}, kafkajs ${consumerKjsMessageAvgLatency}`);
258-
console.log(`Consumer max E2E latency (eachMessage): confluent ${consumerConfluentMessageMaxLatency}, kafkajs ${consumerKjsMessageMaxLatency}`);
272+
console.log(`Consumer average E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T1}, kafkajs ${consumerKjsMessageAvgLatencyT0T1}`);
273+
console.log(`Consumer max E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T1}, kafkajs ${consumerKjsMessageMaxLatencyT0T1}`);
274+
if (produceToSecondTopic) {
275+
console.log(`Consumer average E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T2}, kafkajs ${consumerKjsMessageAvgLatencyT0T2}`);
276+
console.log(`Consumer max E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T2}, kafkajs ${consumerKjsMessageMaxLatencyT0T2}`);
277+
}
259278
console.log(`Consumption time (eachMessage): confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`);
260279
console.log(`Average RSS (eachMessage): confluent ${consumerConfluentMessageAverageRSS}, kafkajs ${consumerKjsMessageAverageRSS}`);
261280
console.log(`Max RSS (eachMessage): confluent ${consumerConfluentMessageMaxRSS}, kafkajs ${consumerKjsMessageMaxRSS}`);
@@ -266,8 +285,12 @@ async function main() {
266285
if (consumerModeAll || consumerModeEachBatch) {
267286
console.log(`Consumer rates MB/s (eachBatch): confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`);
268287
console.log(`Consumer rates msg/s (eachBatch): confluent ${consumerConfluentBatchRate}, kafkajs ${consumerKjsBatchRate}`);
269-
console.log(`Consumer average E2E latency (eachBatch): confluent ${consumerConfluentBatchAvgLatency}, kafkajs ${consumerKjsBatchAvgLatency}`);
270-
console.log(`Consumer max E2E latency (eachBatch): confluent ${consumerConfluentBatchMaxLatency}, kafkajs ${consumerKjsBatchMaxLatency}`);
288+
console.log(`Consumer average E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T1}, kafkajs ${consumerKjsBatchAvgLatencyT0T1}`);
289+
console.log(`Consumer max E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T1}, kafkajs ${consumerKjsBatchMaxLatencyT0T1}`);
290+
if (produceToSecondTopic) {
291+
console.log(`Consumer average E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T2}, kafkajs ${consumerKjsBatchAvgLatencyT0T2}`);
292+
console.log(`Consumer max E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T2}, kafkajs ${consumerKjsBatchMaxLatencyT0T2}`);
293+
}
271294
console.log(`Consumption time (eachBatch): confluent ${consumerConfluentBatchTime}, kafkajs ${consumerKjsBatchTime}`);
272295
console.log(`Average eachBatch lag: confluent ${consumerConfluentBatchAverageLag}, kafkajs ${consumerKjsBatchAverageLag}`);
273296
console.log(`Max eachBatch lag: confluent ${consumerConfluentBatchMaxLag}, kafkajs ${consumerKjsBatchMaxLag}`);

examples/performance/performance-consolidated.js

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
const fs = require('fs');
22
const mode = process.env.MODE ? process.env.MODE : 'confluent';
33

4-
let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether;
4+
let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether, runProducerCKJS;
55
if (mode === 'confluent') {
66
({ runProducer, runConsumer, runConsumeTransformProduce,
77
runCreateTopics, runLagMonitoring,
88
runProducerConsumerTogether } = require('./performance-primitives'));
9+
runProducerCKJS = runProducer;
910
} else {
1011
({ runProducer, runConsumer, runConsumeTransformProduce, runProducerConsumerTogether } = require('./performance-primitives-kafkajs'));
1112
/* createTopics is more reliable in CKJS */
12-
({ runCreateTopics, runLagMonitoring } = require('./performance-primitives'));
13+
({ runCreateTopics, runLagMonitoring, runProducer: runProducerCKJS } = require('./performance-primitives'));
1314
}
1415

1516
const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
@@ -32,6 +33,7 @@ const ctpConcurrency = process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? +proc
3233
const consumerProcessingTime = process.env.CONSUMER_PROCESSING_TIME ? +process.env.CONSUMER_PROCESSING_TIME : 100;
3334
const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.env.PRODUCER_PROCESSING_TIME : 100;
3435
const limitRPS = process.env.LIMIT_RPS ? +process.env.LIMIT_RPS : null;
36+
const useCKJSProducerEverywhere = process.env.USE_CKJS_PRODUCER_EVERYWHERE === 'true';
3537
const parameters = {
3638
brokers,
3739
securityProtocol,
@@ -134,8 +136,13 @@ function logParameters(parameters) {
134136
console.log(` Compression: ${compression}`);
135137
console.log(` Limit RPS: ${limitRPS}`);
136138
console.log(` Warmup Messages: ${warmupMessages}`);
139+
console.log(` Use CKJS Producer Everywhere: ${useCKJSProducerEverywhere}`);
137140
startTrackingMemory();
138-
const producerRate = await runProducer(parameters, topic, batchSize,
141+
let runProducerFunction = runProducer;
142+
if (useCKJSProducerEverywhere) {
143+
runProducerFunction = runProducerCKJS;
144+
}
145+
const producerRate = await runProducerFunction(parameters, topic, batchSize,
139146
warmupMessages, messageCount, messageSize, compression,
140147
randomness, limitRPS);
141148
endTrackingMemory('producer', `producer-memory-${mode}.json`);
@@ -153,12 +160,16 @@ function logParameters(parameters) {
153160
const consumerRate = await runConsumer(parameters, topic,
154161
warmupMessages, messageCount,
155162
false, partitionsConsumedConcurrently, stats,
156-
produceToSecondTopic ? topic2 : null, compression);
163+
produceToSecondTopic ? topic2 : null, compression, useCKJSProducerEverywhere);
157164
endTrackingMemory('consumer-each-message', `consumer-memory-message-${mode}.json`);
158165
console.log("=== Consumer Rate MB/s (eachMessage): ", consumerRate);
159166
console.log("=== Consumer Rate msg/s (eachMessage): ", stats.messageRate);
160-
console.log("=== Consumer average E2E latency (eachMessage): ", stats.avgLatency);
161-
console.log("=== Consumer max E2E latency (eachMessage): ", stats.maxLatency);
167+
console.log("=== Consumer average E2E latency T0-T1 (eachMessage): ", stats.avgLatencyT0T1);
168+
console.log("=== Consumer max E2E latency T0-T1 (eachMessage): ", stats.maxLatencyT0T1);
169+
if (produceToSecondTopic) {
170+
console.log("=== Consumer average E2E latency T0-T2 (eachMessage): ", stats.avgLatencyT0T2);
171+
console.log("=== Consumer max E2E latency T0-T2 (eachMessage): ", stats.maxLatencyT0T2);
172+
}
162173
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
163174
}
164175

@@ -173,15 +184,19 @@ function logParameters(parameters) {
173184
const consumerRate = await runConsumer(parameters, topic,
174185
warmupMessages, messageCount,
175186
true, partitionsConsumedConcurrently, stats,
176-
produceToSecondTopic ? topic2 : null, compression);
187+
produceToSecondTopic ? topic2 : null, compression, useCKJSProducerEverywhere);
177188
endTrackingMemory('consumer-each-batch', `consumer-memory-batch-${mode}.json`);
178189
console.log("=== Consumer Rate MB/s (eachBatch): ", consumerRate);
179190
console.log("=== Consumer Rate msg/s (eachBatch): ", stats.messageRate);
180191
console.log("=== Average eachBatch lag: ", stats.averageOffsetLag);
181192
console.log("=== Max eachBatch lag: ", stats.maxOffsetLag);
182193
console.log("=== Average eachBatch size: ", stats.averageBatchSize);
183-
console.log("=== Consumer average E2E latency (eachBatch): ", stats.avgLatency);
184-
console.log("=== Consumer max E2E latency (eachBatch): ", stats.maxLatency);
194+
console.log("=== Consumer average E2E latency T0-T1 (eachBatch): ", stats.avgLatencyT0T1);
195+
console.log("=== Consumer max E2E latency T0-T1 (eachBatch): ", stats.maxLatencyT0T1);
196+
if (produceToSecondTopic) {
197+
console.log("=== Consumer average E2E latency T0-T2 (eachBatch): ", stats.avgLatencyT0T2);
198+
console.log("=== Consumer max E2E latency T0-T2 (eachBatch): ", stats.maxLatencyT0T2);
199+
}
185200
console.log("=== Consumption time (eachBatch): ", stats.durationSeconds);
186201
}
187202

0 commit comments

Comments
 (0)