Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
85547e2
Performance test improvements, measure eachBatch rate, time and lag, …
emasab Oct 8, 2025
8e75878
Run performance tests with SASL_PLAINTEXT or SASL_SSL and plain authe…
emasab Oct 9, 2025
0ff62eb
Fixes
emasab Oct 9, 2025
2c639f7
Improve KafkaJS producing, more reliable createTopics, configurable m…
emasab Oct 14, 2025
735ec4b
Log number of partitions
emasab Oct 14, 2025
22e6832
Fix skipCtpTest
emasab Oct 14, 2025
3f7c9b1
Configurable partitions consumed concurrently
emasab Oct 14, 2025
c1784c6
Consume rate in msg/s, average batch size, different messages dependi…
emasab Oct 14, 2025
8114681
fixup
emasab Oct 14, 2025
09416d3
Average and max E2E latency
emasab Oct 14, 2025
4e573a5
Log average and max size
emasab Oct 14, 2025
9d32c27
Changes to run the test for a longer time
emasab Oct 15, 2025
76e8cc2
Add broker lag monitoring
emasab Oct 15, 2025
4ad63b3
fixup
emasab Oct 16, 2025
3c39da3
Use same producer and different E2E latencies
emasab Oct 17, 2025
db39e68
Auto commit on batch end
emasab Oct 17, 2025
dd09384
fixup
emasab Oct 29, 2025
4460183
Fix CI performance run
emasab Oct 19, 2025
9955813
Two partitions consumed concurrently
emasab Oct 29, 2025
a8306ad
Remove E2E latency metrics for serial run
emasab Oct 19, 2025
7532d59
More accurate latency and throughput measurement
emasab Oct 23, 2025
f44e414
Add latency percentiles calculation
emasab Oct 24, 2025
4957a06
USE_KEYS and timestamp in headers
emasab Oct 29, 2025
6204ff9
Don't modify passed messages in sendOptions
emasab Oct 28, 2025
7e63cc1
CONSUMER_MAX_BATCH_SIZE in performance test
emasab Oct 29, 2025
1e5c310
Changelog entry for `producer.send` fix
emasab Oct 29, 2025
9dee07d
Fix to avoid counting the message size two times
emasab Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ coverage
.nyc_output/
*lcov.info
**/lcov-report
examples/performance/*.json
*.log
2 changes: 1 addition & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ blocks:
- export NODE_OPTIONS='--max-old-space-size=1536'
- cd examples/performance
- npm install
- bash -c '../../ci/tests/run_perf_test.sh'
- node '../../ci/tests/run_perf_test.js'
- rm -rf ./node_modules

- name: "Linux amd64: Release"
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ v1.6.1 is a maintenance release. It is supported for all usage.
property (#393).
2. Fix for at-least-once guarantee not ensured in case a seek happens on one
partition and there are messages being fetched about other partitions (#393).
3. Messages passed to the producer `send` method are kept constant (#394).


# confluent-kafka-javascript 1.6.0
Expand Down
406 changes: 406 additions & 0 deletions ci/tests/run_perf_test.js

Large diffs are not rendered by default.

64 changes: 0 additions & 64 deletions ci/tests/run_perf_test.sh

This file was deleted.

209 changes: 188 additions & 21 deletions examples/performance/performance-consolidated.js
Original file line number Diff line number Diff line change
@@ -1,84 +1,246 @@
const fs = require('fs');
const mode = process.env.MODE ? process.env.MODE : 'confluent';

let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether;
let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether, runProducerCKJS;
if (mode === 'confluent') {
({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether } = require('./performance-primitives'));
({ runProducer, runConsumer, runConsumeTransformProduce,
runCreateTopics, runLagMonitoring,
runProducerConsumerTogether } = require('./performance-primitives'));
runProducerCKJS = runProducer;
} else {
({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether } = require('./performance-primitives-kafkajs'));
({ runProducer, runConsumer, runConsumeTransformProduce, runProducerConsumerTogether } = require('./performance-primitives-kafkajs'));
/* createTopics is more reliable in CKJS */
({ runCreateTopics, runLagMonitoring, runProducer: runProducerCKJS } = require('./performance-primitives'));
}

const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
const topic = process.env.KAFKA_TOPIC || 'test-topic';
const topic2 = process.env.KAFKA_TOPIC2 || 'test-topic2';
const securityProtocol = process.env.SECURITY_PROTOCOL;
const saslUsername = process.env.SASL_USERNAME;
const saslPassword = process.env.SASL_PASSWORD;
const topic = process.env.KAFKA_TOPIC || `test-topic-${mode}`;
const topic2 = process.env.KAFKA_TOPIC2 || `test-topic2-${mode}`;
const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 1000000;
const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 256;
const batchSize = process.env.BATCH_SIZE ? +process.env.BATCH_SIZE : 100;
const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 4096;
const batchSize = process.env.PRODUCER_BATCH_SIZE ? +process.env.PRODUCER_BATCH_SIZE : 100;
const compression = process.env.COMPRESSION || 'None';
// Between 0 and 1, percentage of random bytes in each message
const randomness = process.env.RANDOMNESS ? +process.env.RANDOMNESS : 0.5;
const initialDelayMs = process.env.INITIAL_DELAY_MS ? +process.env.INITIAL_DELAY_MS : 0;
const numPartitions = process.env.PARTITIONS ? +process.env.PARTITIONS : 3;
const partitionsConsumedConcurrently = process.env.PARTITIONS_CONSUMED_CONCURRENTLY ? +process.env.PARTITIONS_CONSUMED_CONCURRENTLY : 1;
const warmupMessages = process.env.WARMUP_MESSAGES ? +process.env.WARMUP_MESSAGES : (batchSize * 10);
const messageProcessTimeMs = process.env.MESSAGE_PROCESS_TIME_MS ? +process.env.MESSAGE_PROCESS_TIME_MS : 5;
const ctpConcurrency = process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? +process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY : 1;
const consumerProcessingTime = process.env.CONSUMER_PROCESSING_TIME ? +process.env.CONSUMER_PROCESSING_TIME : 100;
const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.env.PRODUCER_PROCESSING_TIME : 100;
const limitRPS = process.env.LIMIT_RPS ? +process.env.LIMIT_RPS : null;
const useCKJSProducerEverywhere = process.env.USE_CKJS_PRODUCER_EVERYWHERE === 'true';
const parameters = {
brokers,
securityProtocol,
saslUsername,
saslPassword,
}

function logParameters(parameters) {
console.log(` Brokers: ${parameters.brokers}`);
if (parameters.securityProtocol && parameters.saslUsername && parameters.saslPassword) {
console.log(` Security Protocol: ${parameters.securityProtocol}`);
console.log(` SASL Username: ${parameters.saslUsername ? parameters.saslUsername : 'not set'}`);
console.log(` SASL Password: ${parameters.saslPassword ? '******' : 'not set'}`);
} else {
console.log(" No security protocol configured");
}
}

function printPercentiles(percentiles, type) {
for (const { percentile, value, count, total } of percentiles) {
const percentileStr = `P${percentile}`.padStart(6, ' ');
console.log(`=== Consumer ${percentileStr} E2E latency ${type}: ${value.toFixed(2)} ms (${count}/${total})`);
}
}

(async function () {
const producer = process.argv.includes('--producer');
const consumer = process.argv.includes('--consumer');
const consumerEachMessage = process.argv.includes('--consumer-each-message');
const consumerEachBatch = process.argv.includes('--consumer-each-batch');
const produceToSecondTopic = process.argv.includes('--produce-to-second-topic');
const ctp = process.argv.includes('--ctp');
const produceConsumeLatency = process.argv.includes('--latency');
const all = process.argv.includes('--all');
const createTopics = process.argv.includes('--create-topics');
const monitorLag = process.argv.includes('--monitor-lag');
let maxAverageRSSKB, maxMaxRSSKB;
const stats = {};

let measures = [];
let interval;
const startTrackingMemory = () => {
interval = setInterval(() => {
measures.push({ rss: process.memoryUsage().rss,
timestamp: Date.now() });
}, 100);
};

const datapointToJSON = (m) =>
({ rss: m.rss.toString(), timestamp: m.timestamp.toString() });

const endTrackingMemory = (name, fileName) => {
clearInterval(interval);
interval = null;
const averageRSS = measures.reduce((sum, m) => sum + m.rss, 0) / measures.length;
const averageRSSKB = averageRSS / 1024;
maxAverageRSSKB = !maxAverageRSSKB ? averageRSSKB : Math.max(averageRSSKB, maxAverageRSSKB);
console.log(`=== Average ${name} RSS KB: ${averageRSSKB}`);
const max = measures.reduce((prev, current) => (prev.rss > current.rss) ? prev : current);
const maxRSSKB = max.rss / 1024;
maxMaxRSSKB = !maxMaxRSSKB ? maxRSSKB : Math.max(maxRSSKB, maxMaxRSSKB);
console.log(`=== Max ${name} RSS KB: ${maxRSSKB}`);
if (fileName) {
const measuresJSON = JSON.stringify({
measures: measures.map(datapointToJSON),
averageRSS: averageRSS.toString(),
maxRSS: datapointToJSON(max)
}, null, 2);
fs.writeFileSync(fileName, measuresJSON);
}
measures = [];
}

console.log(`=== Starting Performance Tests - Mode ${mode} ===`);
if (initialDelayMs > 0) {
console.log(`=== Initial delay of ${initialDelayMs}ms before starting tests ===`);
await new Promise(resolve => setTimeout(resolve, initialDelayMs));
}

if (createTopics || all) {
console.log("=== Creating Topics (deleting if they exist already):");
console.log(` Brokers: ${brokers}`);
logParameters(parameters);
console.log(` Topic: ${topic}`);
console.log(` Topic2: ${topic2}`);
await runCreateTopics(brokers, topic, topic2);
console.log(` Partitions: ${numPartitions}`);
await runCreateTopics(parameters, topic, topic2, numPartitions);
}

if (monitorLag) {
console.log("=== Starting Lag Monitoring:");
logParameters(parameters);
console.log(` Topic: ${topic}`);
const {
averageLag,
maxLag,
totalMeasurements
} = await runLagMonitoring(parameters, topic);
const monitoredGroupId = process.env.GROUPID_MONITOR;
console.log(`=== Average broker lag (${monitoredGroupId}): `, averageLag);
console.log(`=== Max broker lag (${monitoredGroupId}): `, maxLag);
console.log(`=== Sample size for broker lag measurement (${monitoredGroupId}): `, totalMeasurements);
}

if (producer || all) {
console.log("=== Running Basic Producer Performance Test:")
console.log(` Brokers: ${brokers}`);
logParameters(parameters);
console.log(` Topic: ${topic}`);
console.log(` Message Count: ${messageCount}`);
console.log(` Message Size: ${messageSize}`);
console.log(` Batch Size: ${batchSize}`);
console.log(` Compression: ${compression}`);
console.log(` Limit RPS: ${limitRPS}`);
console.log(` Warmup Messages: ${warmupMessages}`);
const producerRate = await runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
console.log(` Use CKJS Producer Everywhere: ${useCKJSProducerEverywhere}`);
startTrackingMemory();
let runProducerFunction = runProducer;
if (useCKJSProducerEverywhere) {
runProducerFunction = runProducerCKJS;
}
const producerRate = await runProducerFunction(parameters, topic, batchSize,
warmupMessages, messageCount, messageSize, compression,
randomness, limitRPS);
endTrackingMemory('producer', `producer-memory-${mode}.json`);
console.log("=== Producer Rate: ", producerRate);
}

if (consumer || all) {
if (consumer || consumerEachMessage || all) {
// If user runs this without --producer then they are responsible for seeding the topic.
console.log("=== Running Basic Consumer Performance Test:")
console.log(` Brokers: ${brokers}`);
console.log("=== Running Basic Consumer Performance Test (eachMessage):")
logParameters(parameters);
console.log(` Topic: ${topic}`);
console.log(` Message Count: ${messageCount}`);
const consumerRate = await runConsumer(brokers, topic, messageCount);
console.log("=== Consumer Rate: ", consumerRate);
console.log(` Partitions consumed concurrently: ${partitionsConsumedConcurrently}`);
startTrackingMemory();
const consumerRate = await runConsumer(parameters, topic,
warmupMessages, messageCount,
false, partitionsConsumedConcurrently, stats,
produceToSecondTopic ? topic2 : null, compression, useCKJSProducerEverywhere);
endTrackingMemory('consumer-each-message', `consumer-memory-message-${mode}.json`);
console.log("=== Consumer Rate MB/s (eachMessage): ", consumerRate);
console.log("=== Consumer Rate msg/s (eachMessage): ", stats.messageRate);
printPercentiles(stats.percentilesTOT1, 'T0-T1 (eachMessage)');
console.log("=== Consumer max E2E latency T0-T1 (eachMessage): ", stats.maxLatencyT0T1);
if (produceToSecondTopic) {
console.log("=== Consumer average E2E latency T0-T2 (eachMessage): ", stats.avgLatencyT0T2);
printPercentiles(stats.percentilesTOT2, 'T0-T2 (eachMessage)');
console.log("=== Consumer max E2E latency T0-T2 (eachMessage): ", stats.maxLatencyT0T2);
}
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
}

if (consumer || consumerEachBatch || all) {
// If user runs this without --producer then they are responsible for seeding the topic.
console.log("=== Running Basic Consumer Performance Test (eachBatch):")
logParameters(parameters);
console.log(` Topic: ${topic}`);
console.log(` Message Count: ${messageCount}`);
console.log(` Partitions consumed concurrently: ${partitionsConsumedConcurrently}`);
startTrackingMemory();
const consumerRate = await runConsumer(parameters, topic,
warmupMessages, messageCount,
true, partitionsConsumedConcurrently, stats,
produceToSecondTopic ? topic2 : null, compression, useCKJSProducerEverywhere);
endTrackingMemory('consumer-each-batch', `consumer-memory-batch-${mode}.json`);
console.log("=== Consumer Rate MB/s (eachBatch): ", consumerRate);
console.log("=== Consumer Rate msg/s (eachBatch): ", stats.messageRate);
console.log("=== Average eachBatch lag: ", stats.averageOffsetLag);
console.log("=== Max eachBatch lag: ", stats.maxOffsetLag);
console.log("=== Average eachBatch size: ", stats.averageBatchSize);
console.log("=== Consumer average E2E latency T0-T1 (eachBatch): ", stats.avgLatencyT0T1);
printPercentiles(stats.percentilesTOT1, 'T0-T1 (eachBatch)');
console.log("=== Consumer max E2E latency T0-T1 (eachBatch): ", stats.maxLatencyT0T1);
if (produceToSecondTopic) {
console.log("=== Consumer average E2E latency T0-T2 (eachBatch): ", stats.avgLatencyT0T2);
printPercentiles(stats.percentilesTOT2, 'T0-T2 (eachBatch)');
console.log("=== Consumer max E2E latency T0-T2 (eachBatch): ", stats.maxLatencyT0T2);
}
console.log("=== Consumption time (eachBatch): ", stats.durationSeconds);
}

if (ctp || all) {
console.log("=== Running Consume-Transform-Produce Performance Test:")
console.log(` Brokers: ${brokers}`);
logParameters(parameters);
console.log(` ConsumeTopic: ${topic}`);
console.log(` ProduceTopic: ${topic2}`);
console.log(` Message Count: ${messageCount}`);
// Seed the topic with messages
await runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
const ctpRate = await runConsumeTransformProduce(brokers, topic, topic2, warmupMessages, messageCount, messageProcessTimeMs, ctpConcurrency);
await runProducerCKJS(parameters, topic, batchSize,
warmupMessages, messageCount, messageSize, compression,
randomness, limitRPS);
startTrackingMemory();
const ctpRate = await runConsumeTransformProduce(parameters, topic, topic2, warmupMessages, messageCount, messageProcessTimeMs, ctpConcurrency);
endTrackingMemory('consume-transform-produce', `consume-transform-produce-${mode}.json`);
console.log("=== Consume-Transform-Produce Rate: ", ctpRate);
}

if (produceConsumeLatency || all) {
console.log("=== Running Produce-To-Consume Latency Performance Test:")
console.log(` Brokers: ${brokers}`);
logParameters(parameters);
console.log(` Topic: ${topic}`);
console.log(` Message Count: ${messageCount}`);
console.log(` Consumer Processing Time: ${consumerProcessingTime}`);
console.log(` Producer Processing Time: ${producerProcessingTime}`);
const { mean, p50, p90, p95, outliers } = await runProducerConsumerTogether(brokers, topic, messageCount, messageSize, producerProcessingTime, consumerProcessingTime);
startTrackingMemory();
const { mean, p50, p90, p95, outliers } = await runProducerConsumerTogether(parameters, topic, messageCount, messageSize, producerProcessingTime, consumerProcessingTime);
endTrackingMemory('producer-consumer-together', `producer-consumer-together-${mode}.json`);
console.log(`=== Produce-To-Consume Latency (ms): Mean: ${mean}, P50: ${p50}, P90: ${p90}, P95: ${p95}`);

// The presence of outliers invalidates the mean measurement, and rasies concerns as to why there are any.
Expand All @@ -87,4 +249,9 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
console.log("=== Outliers (ms): ", outliers);
}
}

if (maxAverageRSSKB !== undefined && maxMaxRSSKB !== undefined) {
console.log(`=== Max Average RSS across tests: `, maxAverageRSSKB);
console.log(`=== Max RSS across tests: `, maxMaxRSSKB);
}
})();
Loading