Skip to content

Commit f44e414

Browse files
committed
Add latency percentiles calculation
1 parent 7532d59 commit f44e414

File tree

5 files changed

+147
-8
lines changed

5 files changed

+147
-8
lines changed

ci/tests/run_perf_test.js

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,19 @@ async function main() {
8787

8888
if (concurrentRun) {
8989
console.log(`Running ${modeLabel} Producer/Consumer test (concurrently)...`);
90-
const INITIAL_DELAY_MS = 2000;
90+
const INITIAL_DELAY_MS = 10000;
9191
const TERMINATE_TIMEOUT_MS = process.env.TERMINATE_TIMEOUT_MS ? +process.env.TERMINATE_TIMEOUT_MS : 600000;
9292
// Wait INITIAL_DELAY_MS more to see if all lag is caught up, start earlier than the producer to check
9393
// E2E latencies more accurately.
94-
const TERMINATE_TIMEOUT_MS_CONSUMERS = TERMINATE_TIMEOUT_MS + INITIAL_DELAY_MS * 2;
94+
const TERMINATE_TIMEOUT_MS_CONSUMERS = TERMINATE_TIMEOUT_MS + INITIAL_DELAY_MS + 2000;
95+
const TERMINATE_TIMEOUT_MS_LAG_MONITORING = TERMINATE_TIMEOUT_MS + 1000;
9596

9697
await runCommand(`MODE=${mode} node performance-consolidated.js --create-topics`);
98+
99+
console.log(`Waiting 10s ms after topic creation before starting producer and consumers...`);
100+
await new Promise(resolve => setTimeout(resolve, 10000));
101+
102+
console.log(`Starting producer and consumers...`);
97103
const allPromises = [];
98104
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} INITIAL_DELAY_MS=${INITIAL_DELAY_MS} node performance-consolidated.js --producer`));
99105
if (consumerModeAll || consumerModeEachMessage) {
@@ -103,10 +109,10 @@ async function main() {
103109
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} INITIAL_DELAY_MS=0 TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_BATCH=${groupIdEachBatch} node performance-consolidated.js --consumer-each-batch ${produceToSecondTopicParam}`));
104110
}
105111
if (consumerModeAll || consumerModeEachMessage) {
106-
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=0 TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachMessage} node performance-consolidated.js --monitor-lag`));
112+
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=${INITIAL_DELAY_MS} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_LAG_MONITORING} GROUPID_MONITOR=${groupIdEachMessage} node performance-consolidated.js --monitor-lag`));
107113
}
108114
if (consumerModeAll || consumerModeEachBatch) {
109-
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=0 TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachBatch} node performance-consolidated.js --monitor-lag`));
115+
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=${INITIAL_DELAY_MS} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_LAG_MONITORING} GROUPID_MONITOR=${groupIdEachBatch} node performance-consolidated.js --monitor-lag`));
110116
}
111117
const results = await Promise.allSettled(allPromises);
112118
return results.map(r => r.status === 'fulfilled' ? r.value : '').join('\n');

examples/performance/performance-consolidated.js

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
1717
const securityProtocol = process.env.SECURITY_PROTOCOL;
1818
const saslUsername = process.env.SASL_USERNAME;
1919
const saslPassword = process.env.SASL_PASSWORD;
20-
const topic = process.env.KAFKA_TOPIC || 'test-topic';
21-
const topic2 = process.env.KAFKA_TOPIC2 || 'test-topic2';
20+
const topic = process.env.KAFKA_TOPIC || `test-topic-${mode}`;
21+
const topic2 = process.env.KAFKA_TOPIC2 || `test-topic2-${mode}`;
2222
const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 1000000;
2323
const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 256;
2424
const batchSize = process.env.BATCH_SIZE ? +process.env.BATCH_SIZE : 100;
@@ -53,6 +53,13 @@ function logParameters(parameters) {
5353
}
5454
}
5555

56+
function printPercentiles(percentiles, type) {
57+
for (const { percentile, value, count, total } of percentiles) {
58+
const percentileStr = `P${percentile}`.padStart(6, ' ');
59+
console.log(`=== Consumer ${percentileStr} E2E latency ${type}: ${value.toFixed(2)} ms (${count}/${total})`);
60+
}
61+
}
62+
5663
(async function () {
5764
const producer = process.argv.includes('--producer');
5865
const consumer = process.argv.includes('--consumer');
@@ -169,10 +176,11 @@ function logParameters(parameters) {
169176
endTrackingMemory('consumer-each-message', `consumer-memory-message-${mode}.json`);
170177
console.log("=== Consumer Rate MB/s (eachMessage): ", consumerRate);
171178
console.log("=== Consumer Rate msg/s (eachMessage): ", stats.messageRate);
172-
console.log("=== Consumer average E2E latency T0-T1 (eachMessage): ", stats.avgLatencyT0T1);
179+
printPercentiles(stats.percentilesTOT1, 'T0-T1 (eachMessage)');
173180
console.log("=== Consumer max E2E latency T0-T1 (eachMessage): ", stats.maxLatencyT0T1);
174181
if (produceToSecondTopic) {
175182
console.log("=== Consumer average E2E latency T0-T2 (eachMessage): ", stats.avgLatencyT0T2);
183+
printPercentiles(stats.percentilesTOT2, 'T0-T2 (eachMessage)');
176184
console.log("=== Consumer max E2E latency T0-T2 (eachMessage): ", stats.maxLatencyT0T2);
177185
}
178186
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
@@ -197,9 +205,11 @@ function logParameters(parameters) {
197205
console.log("=== Max eachBatch lag: ", stats.maxOffsetLag);
198206
console.log("=== Average eachBatch size: ", stats.averageBatchSize);
199207
console.log("=== Consumer average E2E latency T0-T1 (eachBatch): ", stats.avgLatencyT0T1);
208+
printPercentiles(stats.percentilesTOT1, 'T0-T1 (eachBatch)');
200209
console.log("=== Consumer max E2E latency T0-T1 (eachBatch): ", stats.maxLatencyT0T1);
201210
if (produceToSecondTopic) {
202211
console.log("=== Consumer average E2E latency T0-T2 (eachBatch): ", stats.avgLatencyT0T2);
212+
printPercentiles(stats.percentilesTOT2, 'T0-T2 (eachBatch)');
203213
console.log("=== Consumer max E2E latency T0-T2 (eachBatch): ", stats.maxLatencyT0T2);
204214
}
205215
console.log("=== Consumption time (eachBatch): ", stats.durationSeconds);

examples/performance/performance-primitives-common.js

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const { hrtime } = require('process');
22
const { randomBytes } = require('crypto');
3+
const PERCENTILES = [50, 75, 90, 95, 99, 99.9, 99.99, 100];
34

45
const TERMINATE_TIMEOUT_MS = process.env.TERMINATE_TIMEOUT_MS ? +process.env.TERMINATE_TIMEOUT_MS : 600000;
56
const AUTO_COMMIT = process.env.AUTO_COMMIT || 'false';
@@ -58,8 +59,86 @@ function genericProduceToTopic(producer, topic, messages) {
5859
});
5960
}
6061

62+
63+
// We use a simple count-sketch for latency percentiles to avoid storing all latencies in memory.
64+
// because we're also measuring the memory usage of the consumer as part of the performance tests.
65+
class LatencyCountSketch {
66+
#numBuckets;
67+
#minValue;
68+
#maxValue;
69+
#buckets;
70+
#counts;
71+
#changeBaseLogarithm;
72+
#totalCount = 0;
73+
#base;
74+
75+
constructor({
76+
error = 0.01, // 1% error
77+
minValue = 0.01, // min 10μs latency
78+
maxValue = 60000, // max 60s latency
79+
}) {
80+
// Each bucket represents [x, x * (1 + error))
81+
this.#base = 1 + error;
82+
// Change base from natural log to log base this.#base
83+
this.#changeBaseLogarithm = Math.log(this.#base);
84+
this.#numBuckets = Math.ceil(Math.log(maxValue / minValue) / Math.log(this.#base));
85+
this.#maxValue = maxValue;
86+
87+
this.#buckets = new Array(this.#numBuckets + 2).fill(0);
88+
this.#buckets[this.#numBuckets + 1] = Number.POSITIVE_INFINITY;
89+
this.#buckets[this.#numBuckets] = this.#maxValue;
90+
this.#buckets[0] = 0;
91+
let i = this.#numBuckets - 1;
92+
let currentValue = maxValue;
93+
while (i >= 1) {
94+
let nextMinimum = currentValue / this.#base;
95+
this.#buckets[i] = nextMinimum;
96+
currentValue = nextMinimum;
97+
i--;
98+
}
99+
this.#minValue = this.#buckets[1];
100+
this.#counts = new Array(this.#numBuckets + 2).fill(0);
101+
}
102+
103+
add(latency) {
104+
let idx = 0;
105+
if (latency > 0)
106+
idx = Math.ceil(Math.log(latency / this.#minValue) / this.#changeBaseLogarithm);
107+
idx = (idx < 0) ? 0 :
108+
(idx > this.#buckets.length - 2) ? (this.#buckets.length - 2) :
109+
idx;
110+
111+
this.#counts[idx]++;
112+
this.#totalCount++;
113+
}
114+
115+
percentiles(percentilesArray) {
116+
const percentileCounts = percentilesArray.map(p => Math.ceil(this.#totalCount * p / 100));
117+
const percentileResults = new Array(percentilesArray.length);
118+
var totalCountSoFar = 0;
119+
let j = 0;
120+
let sum = 0;
121+
for (let i = 0; i < this.#counts.length; i++) {
122+
sum += this.#counts[i];
123+
}
124+
for (let i = 0; i < percentileCounts.length; i++) {
125+
while ((totalCountSoFar < percentileCounts[i]) && (j < this.#counts.length - 1)) {
126+
totalCountSoFar += this.#counts[j];
127+
j++;
128+
}
129+
const bucketIndex = (j < this.#counts.length - 1) ? j : this.#counts.length - 2;
130+
percentileResults[i] = [this.#buckets[bucketIndex], totalCountSoFar, this.#totalCount];
131+
}
132+
return percentileResults;
133+
}
134+
}
135+
61136
async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats, actionOnMessages) {
62137
const handlers = installHandlers(totalMessageCnt === -1);
138+
if (stats) {
139+
stats.percentilesTOT1 = new LatencyCountSketch({});
140+
stats.percentilesTOT2 = new LatencyCountSketch({});
141+
}
63142
while (true) {
64143
try {
65144
await consumer.connect();
@@ -96,7 +175,17 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
96175
return;
97176

98177
const sentAt = Number(decoder.decode(message.value.slice(0, 13)));
99-
const latency = receivedAt - sentAt;
178+
let latency = receivedAt - sentAt;
179+
180+
if (isNaN(latency)) {
181+
console.log(`WARN: NaN latency received message timestamp: ${message.value.slice(0, 13)}`);
182+
return;
183+
} else if (latency < 0) {
184+
console.log(`WARN: negative latency ${latency} sentAt ${sentAt} receivedAt ${receivedAt}`);
185+
latency = 0;
186+
} else if (latency > 60000) {
187+
console.log(`WARN: received large latency ${latency} sentAt ${sentAt} receivedAt ${receivedAt}`);
188+
}
100189

101190
if (!isT0T2) {
102191
if (!stats.maxLatencyT0T1) {
@@ -106,6 +195,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
106195
stats.maxLatencyT0T1 = Math.max(stats.maxLatencyT0T1, latency);
107196
stats.avgLatencyT0T1 = ((stats.avgLatencyT0T1 * (numMessages - 1)) + latency) / numMessages;
108197
}
198+
stats.percentilesTOT1.add(latency);
109199
} else {
110200
if (!stats.maxLatencyT0T2) {
111201
stats.maxLatencyT0T2 = latency;
@@ -114,6 +204,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
114204
stats.maxLatencyT0T2 = Math.max(stats.maxLatencyT0T2, latency);
115205
stats.avgLatencyT0T2 = ((stats.avgLatencyT0T2 * (numMessages - 1)) + latency) / numMessages;
116206
}
207+
stats.percentilesTOT2.add(latency);
117208
}
118209
};
119210

@@ -257,6 +348,18 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
257348
stats.messageRate = durationSeconds > 0 ?
258349
(messagesMeasured / durationSeconds) : Infinity;
259350
stats.durationSeconds = durationSeconds;
351+
stats.percentilesTOT1 = stats.percentilesTOT1.percentiles(PERCENTILES).map((value, index) => ({
352+
percentile: PERCENTILES[index],
353+
value: value[0],
354+
count: value[1],
355+
total: value[2],
356+
}));
357+
stats.percentilesTOT2 = stats.percentilesTOT2.percentiles(PERCENTILES).map((value, index) => ({
358+
percentile: PERCENTILES[index],
359+
value: value[0],
360+
count: value[1],
361+
total: value[2],
362+
}));
260363
}
261364
removeHandlers(handlers);
262365
return rate;

examples/performance/performance-primitives-kafkajs.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ module.exports = {
2020
runProducerConsumerTogether,
2121
};
2222

23+
const IS_HIGHER_LATENCY_CLUSTER = process.env.IS_HIGHER_LATENCY_CLUSTER === 'true';
24+
2325
function baseConfiguration(parameters) {
2426
let ret = {
2527
clientId: 'kafka-test-performance',
@@ -147,13 +149,17 @@ class CompatibleConsumer {
147149

148150
function newCompatibleConsumer(parameters, eachBatch) {
149151
const kafka = new Kafka(baseConfiguration(parameters));
152+
const higherLatencyClusterOpts = IS_HIGHER_LATENCY_CLUSTER ? {
153+
maxBytesPerPartition: 8388608
154+
} : {};
150155

151156
let groupId = eachBatch ? process.env.GROUPID_BATCH : process.env.GROUPID_MESSAGE;
152157
if (!groupId) {
153158
groupId = 'test-group' + Math.random();
154159
}
155160
console.log(`New KafkaJS group id: ${groupId}`);
156161
const consumer = kafka.consumer({
162+
...higherLatencyClusterOpts,
157163
groupId,
158164
});
159165
return new CompatibleConsumer(consumer);

examples/performance/performance-primitives.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ module.exports = {
1919
newCompatibleProducer,
2020
};
2121

22+
23+
const IS_HIGHER_LATENCY_CLUSTER = process.env.IS_HIGHER_LATENCY_CLUSTER === 'true';
24+
2225
function baseConfiguration(parameters) {
2326
let ret = {
2427
'client.id': 'kafka-test-performance',
@@ -97,9 +100,16 @@ class CompatibleProducer {
97100
}
98101
}
99102
function newCompatibleProducer(parameters, compression) {
103+
const higherLatencyClusterOpts = IS_HIGHER_LATENCY_CLUSTER ? {
104+
'linger.ms': '200',
105+
'sticky.partitioning.linger.ms': '200',
106+
'message.max.bytes': '2148352',
107+
'batch.size': '2097152',
108+
} : {};
100109
return new CompatibleProducer(
101110
new Kafka({
102111
...baseConfiguration(parameters),
112+
...higherLatencyClusterOpts,
103113
'compression.codec': CompressionTypes[compression],
104114
}).producer());
105115
}
@@ -146,6 +156,9 @@ function newCompatibleConsumer(parameters, eachBatch) {
146156
const autoCommitOpts = autoCommit > 0 ?
147157
{ 'enable.auto.commit': true, 'auto.commit.interval.ms': autoCommit } :
148158
{ 'enable.auto.commit': false };
159+
const higherLatencyClusterOpts = IS_HIGHER_LATENCY_CLUSTER ? {
160+
'max.partition.fetch.bytes': '8388608'
161+
} : {};
149162

150163
let groupId = eachBatch ? process.env.GROUPID_BATCH : process.env.GROUPID_MESSAGE;
151164
if (!groupId) {
@@ -157,6 +170,7 @@ function newCompatibleConsumer(parameters, eachBatch) {
157170
'auto.offset.reset': 'earliest',
158171
'fetch.queue.backoff.ms': '100',
159172
...autoCommitOpts,
173+
...higherLatencyClusterOpts,
160174
});
161175
return new CompatibleConsumer(consumer);
162176
}

0 commit comments

Comments
 (0)