Skip to content

Commit b71e51b

Browse files
committed
Improve KafkaJS producing, more reliable createTopics, configurable message count and partition number
1 parent 45c4097 commit b71e51b

File tree

4 files changed

+39
-16
lines changed

4 files changed

+39
-16
lines changed

ci/tests/run_perf_test.js

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,22 @@ function belowTarget(value, target) {
4242

4343
// Run performance tests and store outputs in memory
4444
console.log('Running Confluent Producer/Consumer test...');
45-
const outputConfluentProducerConsumer = runCommand('MODE=confluent MESSAGE_COUNT=50000 node performance-consolidated.js --create-topics --consumer --producer');
45+
const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 50000;
46+
const skipCtpTest = process.env.SKIP_CTP_TEST ? process.env.SKIP_CTP_TEST === 'true' : false;
47+
const outputConfluentProducerConsumer = runCommand(`MODE=confluent MESSAGE_COUNT=${messageCount} node performance-consolidated.js --create-topics --consumer --producer`);
4648

4749
console.log('Running KafkaJS Producer/Consumer test...');
48-
const outputKjsProducerConsumer = runCommand('MODE=kafkajs MESSAGE_COUNT=50000 node performance-consolidated.js --create-topics --consumer --producer');
50+
const outputKjsProducerConsumer = runCommand(`MODE=kafkajs MESSAGE_COUNT=${messageCount} node performance-consolidated.js --create-topics --consumer --producer`);
4951

5052
console.log('Running Confluent CTP test...');
5153
const outputConfluentCtp = runCommand('MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp');
5254

5355
console.log('Running KafkaJS CTP test...');
54-
const outputKjsCtp = runCommand('MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp');
56+
const outputKjsCtp = skipCtpTest ? '' :
57+
runCommand('MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp');
5558

5659
// Extract Confluent results
60+
let ctpConfluent, ctpKjs;
5761
const producerConfluent = extractValue(outputConfluentProducerConsumer, '=== Producer Rate:');
5862
const consumerConfluentMessage = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate (eachMessage):');
5963
const consumerConfluentTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachMessage):');
@@ -63,7 +67,9 @@ const consumerConfluentBatchAverageLag = extractValue(outputConfluentProducerCon
6367
const consumerConfluentBatchMaxLag = extractValue(outputConfluentProducerConsumer, '=== Max eachBatch lag:');
6468
const consumerConfluentAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Max Average RSS across tests:');
6569
const consumerConfluentMaxRSS = extractValue(outputConfluentProducerConsumer, '=== Max RSS across tests:');
66-
const ctpConfluent = extractValue(outputConfluentCtp, '=== Consume-Transform-Produce Rate:');
70+
if (!skipCtpTest) {
71+
ctpConfluent = extractValue(outputConfluentCtp, '=== Consume-Transform-Produce Rate:');
72+
}
6773

6874
// Extract KafkaJS results
6975
const producerKjs = extractValue(outputKjsProducerConsumer, '=== Producer Rate:');
@@ -75,7 +81,9 @@ const consumerKjsBatchAverageLag = extractValue(outputKjsProducerConsumer, '===
7581
const consumerKjsBatchMaxLag = extractValue(outputKjsProducerConsumer, '=== Max eachBatch lag:');
7682
const consumerKjsAverageRSS = extractValue(outputKjsProducerConsumer, '=== Max Average RSS across tests:');
7783
const consumerKjsMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max RSS across tests:');
78-
const ctpKjs = extractValue(outputKjsCtp, '=== Consume-Transform-Produce Rate:');
84+
if (!skipCtpTest) {
85+
ctpKjs = extractValue(outputKjsCtp, '=== Consume-Transform-Produce Rate:');
86+
}
7987

8088
// Print results
8189
console.log(`Producer rates: confluent ${producerConfluent}, kafkajs ${producerKjs}`);
@@ -87,7 +95,9 @@ console.log(`Average eachBatch lag: confluent ${consumerConfluentBatchAverageLag
8795
console.log(`Max eachBatch lag: confluent ${consumerConfluentBatchMaxLag}, kafkajs ${consumerKjsBatchMaxLag}`);
8896
console.log(`Average RSS: confluent ${consumerConfluentAverageRSS}, kafkajs ${consumerKjsAverageRSS}`);
8997
console.log(`Max RSS: confluent ${consumerConfluentMaxRSS}, kafkajs ${consumerKjsMaxRSS}`);
90-
console.log(`CTP rates: confluent ${ctpConfluent}, kafkajs ${ctpKjs}`);
98+
if (!skipCtpTest) {
99+
console.log(`CTP rates: confluent ${ctpConfluent}, kafkajs ${ctpKjs}`);
100+
}
91101

92102
let errcode = 0;
93103
const maxPerformanceDifference = 0.7;
@@ -121,7 +131,7 @@ if (belowThreshold(consumerKjsBatchTime, consumerConfluentBatchTime, maxPerforma
121131
errcode = 0;
122132
}
123133

124-
if (belowThreshold(ctpConfluent, ctpKjs, maxPerformanceDifference)) {
134+
if (!skipCtpTest && belowThreshold(ctpConfluent, ctpKjs, maxPerformanceDifference)) {
125135
console.log(`CTP rates differ by more than 30%: confluent ${ctpConfluent}, kafkajs ${ctpKjs}`);
126136
errcode = 1;
127137
}
@@ -141,7 +151,7 @@ if (belowTarget(consumerConfluentMessage, TARGET_CONSUME)) {
141151
errcode = 1;
142152
}
143153

144-
if (belowTarget(ctpConfluent, TARGET_CTP)) {
154+
if (!skipCtpTest && belowTarget(ctpConfluent, TARGET_CTP)) {
145155
console.log(`Confluent CTP rate is below target: ${ctpConfluent}`);
146156
errcode = 1;
147157
}

examples/performance/performance-consolidated.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runPr
55
if (mode === 'confluent') {
66
({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether } = require('./performance-primitives'));
77
} else {
8-
({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether } = require('./performance-primitives-kafkajs'));
8+
({ runProducer, runConsumer, runConsumeTransformProduce, runProducerConsumerTogether } = require('./performance-primitives-kafkajs'));
9+
/* createTopics is more reliable in CKJS */
10+
({ runCreateTopics } = require('./performance-primitives'));
911
}
1012

1113
const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
@@ -18,6 +20,7 @@ const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 10
1820
const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 256;
1921
const batchSize = process.env.BATCH_SIZE ? +process.env.BATCH_SIZE : 100;
2022
const compression = process.env.COMPRESSION || 'None';
23+
const numPartitions = process.env.PARTITIONS ? +process.env.PARTITIONS : 3;
2124
const warmupMessages = process.env.WARMUP_MESSAGES ? +process.env.WARMUP_MESSAGES : (batchSize * 10);
2225
const messageProcessTimeMs = process.env.MESSAGE_PROCESS_TIME_MS ? +process.env.MESSAGE_PROCESS_TIME_MS : 5;
2326
const ctpConcurrency = process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? +process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY : 1;
@@ -92,7 +95,7 @@ function logParameters(parameters) {
9295
logParameters(parameters);
9396
console.log(` Topic: ${topic}`);
9497
console.log(` Topic2: ${topic2}`);
95-
await runCreateTopics(parameters, topic, topic2);
98+
await runCreateTopics(parameters, topic, topic2, numPartitions);
9699
}
97100

98101
if (producer || all) {

examples/performance/performance-primitives-kafkajs.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,15 @@ function baseConfiguration(parameters) {
3232
return ret;
3333
}
3434

35-
async function runCreateTopics(parameters, topic, topic2) {
35+
async function runCreateTopics(parameters, topic, topic2, numPartitions) {
3636
const kafka = new Kafka(baseConfiguration(parameters));
3737

3838
const admin = kafka.admin();
3939
await admin.connect();
4040

4141
for (let t of [topic, topic2]) {
4242
let topicCreated = await admin.createTopics({
43-
topics: [{ topic: t, numPartitions: 3 }],
43+
topics: [{ topic: t, numPartitions }],
4444
}).catch(console.error);
4545
if (topicCreated) {
4646
console.log(`Created topic ${t}`);
@@ -52,7 +52,7 @@ async function runCreateTopics(parameters, topic, topic2) {
5252
await new Promise(resolve => setTimeout(resolve, 1000)); /* Propagate. */
5353
await admin.createTopics({
5454
topics: [
55-
{ topic: t, numPartitions: 3 },
55+
{ topic: t, numPartitions },
5656
],
5757
}).catch(console.error);
5858
console.log(`Created topic ${t}`);
@@ -97,6 +97,7 @@ async function runProducer(parameters, topic, batchSize, warmupMessages, totalMe
9797
// await them all at once. We need the second while loop to keep sending
9898
// in case of queue full errors, which surface only on awaiting.
9999
while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) {
100+
let messagesNotAwaited = 0;
100101
while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) {
101102
promises.push(producer.send({
102103
topic,
@@ -110,8 +111,12 @@ async function runProducer(parameters, topic, batchSize, warmupMessages, totalMe
110111
throw err;
111112
}));
112113
messagesDispatched += batchSize;
114+
messagesNotAwaited += batchSize;
115+
if (messagesNotAwaited >= 10000)
116+
break;
113117
}
114118
await Promise.all(promises);
119+
promises = [];
115120
}
116121
let elapsed = hrtime(startTime);
117122
let durationNanos = elapsed[0] * 1e9 + elapsed[1];

examples/performance/performance-primitives.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ function baseConfiguration(parameters) {
3030
return ret;
3131
}
3232

33-
async function runCreateTopics(parameters, topic, topic2) {
33+
async function runCreateTopics(parameters, topic, topic2, numPartitions) {
3434
const kafka = new Kafka(baseConfiguration(parameters));
3535

3636
const admin = kafka.admin();
3737
await admin.connect();
3838

3939
for (let t of [topic, topic2]) {
4040
let topicCreated = await admin.createTopics({
41-
topics: [{ topic: t, numPartitions: 3 }],
41+
topics: [{ topic: t, numPartitions }],
4242
}).catch(console.error);
4343
if (topicCreated) {
4444
console.log(`Created topic ${t}`);
@@ -50,7 +50,7 @@ async function runCreateTopics(parameters, topic, topic2) {
5050
await new Promise(resolve => setTimeout(resolve, 1000)); /* Propagate. */
5151
await admin.createTopics({
5252
topics: [
53-
{ topic: t, numPartitions: 3 },
53+
{ topic: t, numPartitions },
5454
],
5555
}).catch(console.error);
5656
console.log(`Created topic ${t}`);
@@ -98,6 +98,7 @@ async function runProducer(parameters, topic, batchSize, warmupMessages, totalMe
9898
// await them all at once. We need the second while loop to keep sending
9999
// in case of queue full errors, which surface only on awaiting.
100100
while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) {
101+
let messagesNotAwaited = 0;
101102
while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) {
102103
promises.push(producer.send({
103104
topic,
@@ -115,8 +116,12 @@ async function runProducer(parameters, topic, batchSize, warmupMessages, totalMe
115116
}
116117
}));
117118
messagesDispatched += batchSize;
119+
messagesNotAwaited += batchSize;
120+
if (messagesNotAwaited >= 10000)
121+
break;
118122
}
119123
await Promise.all(promises);
124+
promises = [];
120125
}
121126
let elapsed = hrtime(startTime);
122127
let durationNanos = elapsed[0] * 1e9 + elapsed[1];

0 commit comments

Comments
 (0)