Skip to content

Commit 86f9ebf

Browse files
committed
Add broker lag monitoring
1 parent 9a5db79 commit 86f9ebf

File tree

4 files changed

+133
-4
lines changed

4 files changed

+133
-4
lines changed

ci/tests/run_perf_test.js

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,24 @@ async function main() {
7979

8080
if (concurrentRun) {
8181
console.log(`Running ${modeLabel} Producer/Consumer test (concurrently)...`);
82+
const TERMINATE_TIMEOUT_MS = process.env.TERMINATE_TIMEOUT_MS ? +process.env.TERMINATE_TIMEOUT_MS : 600000;
83+
// Wait 2s more to see if all lag is caught up
84+
const TERMINATE_TIMEOUT_MS_CONSUMERS = TERMINATE_TIMEOUT_MS + 2000;
8285

8386
await runCommand(`MODE=${mode} node performance-consolidated.js --create-topics`);
8487
const allPromises = [];
8588
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} node performance-consolidated.js --producer`));
8689
if (consumerModeAll || consumerModeEachMessage) {
87-
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} GROUPID_MESSAGE=${groupIdEachMessage} node performance-consolidated.js --consumer-each-message ${produceToSecondTopicParam}`));
90+
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MESSAGE=${groupIdEachMessage} node performance-consolidated.js --consumer-each-message ${produceToSecondTopicParam}`));
8891
}
8992
if (consumerModeAll || consumerModeEachBatch) {
90-
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} GROUPID_BATCH=${groupIdEachBatch} node performance-consolidated.js --consumer-each-batch ${produceToSecondTopicParam}`));
93+
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_BATCH=${groupIdEachBatch} node performance-consolidated.js --consumer-each-batch ${produceToSecondTopicParam}`));
94+
}
95+
if (consumerModeAll || consumerModeEachMessage) {
96+
allPromises.push(runCommand(`MODE=${mode} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachMessage} node performance-consolidated.js --monitor-lag`));
97+
}
98+
if (consumerModeAll || consumerModeEachBatch) {
99+
allPromises.push(runCommand(`MODE=${mode} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachBatch} node performance-consolidated.js --monitor-lag`));
91100
}
92101
const results = await Promise.allSettled(allPromises);
93102
return results.map(r => r.status === 'fulfilled' ? r.value : '').join('\n');
@@ -123,6 +132,8 @@ async function main() {
123132
let consumerConfluentTime;
124133
let consumerConfluentMessageAverageRSS;
125134
let consumerConfluentMessageMaxRSS;
135+
let consumerConfluentMessageAverageBrokerLag;
136+
let consumerConfluentMessageMaxBrokerLag;
126137
let consumerConfluentBatch;
127138
let consumerConfluentBatchRate;
128139
let consumerConfluentBatchAvgLatency;
@@ -133,6 +144,8 @@ async function main() {
133144
let consumerConfluentBatchAverageSize;
134145
let consumerConfluentBatchAverageRSS;
135146
let consumerConfluentBatchMaxRSS;
147+
let consumerConfluentBatchAverageBrokerLag;
148+
let consumerConfluentBatchMaxBrokerLag;
136149

137150
const producerConfluent = extractValue(outputConfluentProducerConsumer, '=== Producer Rate:');
138151
const producerConfluentAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Average producer RSS KB:');
@@ -145,6 +158,8 @@ async function main() {
145158
consumerConfluentTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachMessage):');
146159
consumerConfluentMessageAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Average consumer-each-message RSS KB:');
147160
consumerConfluentMessageMaxRSS = extractValue(outputConfluentProducerConsumer, '=== Max consumer-each-message RSS KB:');
161+
consumerConfluentMessageAverageBrokerLag = extractValue(outputConfluentProducerConsumer, `=== Average broker lag (${groupIdEachMessageConfluent}):`);
162+
consumerConfluentMessageMaxBrokerLag = extractValue(outputConfluentProducerConsumer, `=== Max broker lag (${groupIdEachMessageConfluent}):`);
148163
}
149164
if (consumerModeAll || consumerModeEachBatch) {
150165
consumerConfluentBatch = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate MB/s (eachBatch):');
@@ -157,6 +172,8 @@ async function main() {
157172
consumerConfluentBatchAverageSize = extractValue(outputConfluentProducerConsumer, '=== Average eachBatch size:');
158173
consumerConfluentBatchAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Average consumer-each-batch RSS KB:');
159174
consumerConfluentBatchMaxRSS = extractValue(outputConfluentProducerConsumer, '=== Max consumer-each-batch RSS KB:');
175+
consumerConfluentBatchAverageBrokerLag = extractValue(outputConfluentProducerConsumer, `=== Average broker lag (${groupIdEachBatchConfluent}):`);
176+
consumerConfluentBatchMaxBrokerLag = extractValue(outputConfluentProducerConsumer, `=== Max broker lag (${groupIdEachBatchConfluent}):`);
160177
}
161178
const consumerConfluentAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Max Average RSS across tests:');
162179
const consumerConfluentMaxRSS = extractValue(outputConfluentProducerConsumer, '=== Max RSS across tests:');
@@ -193,6 +210,8 @@ async function main() {
193210
consumerKjsMessageMaxLatency = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency (eachMessage):');
194211
consumerKjsMessageAverageRSS = extractValue(outputKjsProducerConsumer, '=== Average consumer-each-message RSS KB:');
195212
consumerKjsMessageMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max consumer-each-message RSS KB:');
213+
consumerKjsMessageAverageBrokerLag = extractValue(outputKjsProducerConsumer, `=== Average broker lag (${groupIdEachMessageKafkaJS}):`);
214+
consumerKjsMessageMaxBrokerLag = extractValue(outputKjsProducerConsumer, `=== Max broker lag (${groupIdEachMessageKafkaJS}):`);
196215
}
197216
if (consumerModeAll || consumerModeEachBatch) {
198217
consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):');
@@ -206,6 +225,8 @@ async function main() {
206225
consumerKjsBatchAverageSize = extractValue(outputKjsProducerConsumer, '=== Average eachBatch size:');
207226
consumerKjsBatchAverageRSS = extractValue(outputKjsProducerConsumer, '=== Average consumer-each-batch RSS KB:');
208227
consumerKjsBatchMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max consumer-each-batch RSS KB:');
228+
consumerKjsBatchAverageBrokerLag = extractValue(outputKjsProducerConsumer, `=== Average broker lag (${groupIdEachBatchKafkaJS}):`);
229+
consumerKjsBatchMaxBrokerLag = extractValue(outputKjsProducerConsumer, `=== Max broker lag (${groupIdEachBatchKafkaJS}):`);
209230
}
210231
const consumerKjsAverageRSS = extractValue(outputKjsProducerConsumer, '=== Max Average RSS across tests:');
211232
const consumerKjsMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max RSS across tests:');
@@ -225,6 +246,8 @@ async function main() {
225246
console.log(`Consumption time (eachMessage): confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`);
226247
console.log(`Average RSS (eachMessage): confluent ${consumerConfluentMessageAverageRSS}, kafkajs ${consumerKjsMessageAverageRSS}`);
227248
console.log(`Max RSS (eachMessage): confluent ${consumerConfluentMessageMaxRSS}, kafkajs ${consumerKjsMessageMaxRSS}`);
249+
console.log(`Average broker lag (eachMessage): confluent ${consumerConfluentMessageAverageBrokerLag}, kafkajs ${consumerKjsMessageAverageBrokerLag}`);
250+
console.log(`Max broker lag (eachMessage): confluent ${consumerConfluentMessageMaxBrokerLag}, kafkajs ${consumerKjsMessageMaxBrokerLag}`);
228251
}
229252
if (consumerModeAll || consumerModeEachBatch) {
230253
console.log(`Consumer rates MB/s (eachBatch): confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`);
@@ -237,6 +260,8 @@ async function main() {
237260
console.log(`Average eachBatch size: confluent ${consumerConfluentBatchAverageSize}, kafkajs ${consumerKjsBatchAverageSize}`);
238261
console.log(`Average RSS (eachBatch): confluent ${consumerConfluentBatchAverageRSS}, kafkajs ${consumerKjsBatchAverageRSS}`);
239262
console.log(`Max RSS (eachBatch): confluent ${consumerConfluentBatchMaxRSS}, kafkajs ${consumerKjsBatchMaxRSS}`);
263+
console.log(`Average broker lag (eachBatch): confluent ${consumerConfluentBatchAverageBrokerLag}, kafkajs ${consumerKjsBatchAverageBrokerLag}`);
264+
console.log(`Max broker lag (eachBatch): confluent ${consumerConfluentBatchMaxBrokerLag}, kafkajs ${consumerKjsBatchMaxBrokerLag}`);
240265
}
241266
if (!concurrentRun) {
242267
console.log(`Average RSS: confluent ${consumerConfluentAverageRSS}, kafkajs ${consumerKjsAverageRSS}`);

examples/performance/performance-consolidated.js

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ const mode = process.env.MODE ? process.env.MODE : 'confluent';
33

44
let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether;
55
if (mode === 'confluent') {
6-
({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether } = require('./performance-primitives'));
6+
({ runProducer, runConsumer, runConsumeTransformProduce,
7+
runCreateTopics, runLagMonitoring,
8+
runProducerConsumerTogether } = require('./performance-primitives'));
79
} else {
810
({ runProducer, runConsumer, runConsumeTransformProduce, runProducerConsumerTogether } = require('./performance-primitives-kafkajs'));
911
/* createTopics is more reliable in CKJS */
10-
({ runCreateTopics } = require('./performance-primitives'));
12+
({ runCreateTopics, runLagMonitoring } = require('./performance-primitives'));
1113
}
1214

1315
const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
@@ -58,6 +60,7 @@ function logParameters(parameters) {
5860
const produceConsumeLatency = process.argv.includes('--latency');
5961
const all = process.argv.includes('--all');
6062
const createTopics = process.argv.includes('--create-topics');
63+
const monitorLag = process.argv.includes('--monitor-lag');
6164
let maxAverageRSSKB, maxMaxRSSKB;
6265
const stats = {};
6366

@@ -106,6 +109,21 @@ function logParameters(parameters) {
106109
await runCreateTopics(parameters, topic, topic2, numPartitions);
107110
}
108111

112+
if (monitorLag) {
113+
console.log("=== Starting Lag Monitoring:");
114+
logParameters(parameters);
115+
console.log(` Topic: ${topic}`);
116+
const {
117+
averageLag,
118+
maxLag,
119+
totalMeasurements
120+
} = await runLagMonitoring(parameters, topic);
121+
const monitoredGroupId = process.env.GROUPID_MONITOR;
122+
console.log(`=== Average broker lag (${monitoredGroupId}): `, averageLag);
123+
console.log(`=== Max broker lag (${monitoredGroupId}): `, maxLag);
124+
console.log(`=== Total broker lag measurements (${monitoredGroupId}): `, totalMeasurements);
125+
}
126+
109127
if (producer || all) {
110128
console.log("=== Running Basic Producer Performance Test:")
111129
logParameters(parameters);

examples/performance/performance-primitives-common.js

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,9 +306,86 @@ async function runProducer(producer, topic, batchSize, warmupMessages, totalMess
306306
return rate;
307307
}
308308

309+
async function runLagMonitoring(admin, topic) {
310+
const handlers = installHandlers();
311+
let groupId = process.env.GROUPID_MONITOR;
312+
if (!groupId) {
313+
throw new Error("GROUPID_MONITOR environment variable not set");
314+
}
315+
316+
await admin.connect();
317+
318+
const fetchTotalLag = async () => {
319+
const partitionCompleteLag = {};
320+
const partitionHWM = {};
321+
const partitionLag = {};
322+
let totalLag = 0n;
323+
const operations = [
324+
admin.fetchTopicOffsets(topic, { timeout: 30000 }),
325+
admin.fetchOffsets({ groupId, topics: [topic], timeout: 30000 }),
326+
]
327+
let [topicOffsets, groupOffsets] = await Promise.all(operations);
328+
groupOffsets = groupOffsets[0];
329+
330+
for (const partitionOffset of topicOffsets) {
331+
partitionHWM[partitionOffset.partition] = BigInt(partitionOffset.high);
332+
partitionCompleteLag[partitionOffset.partition] = BigInt(partitionOffset.high) - BigInt(partitionOffset.low);
333+
}
334+
335+
if (groupOffsets && groupOffsets.partitions) {
336+
for (const partitionOffset of groupOffsets.partitions) {
337+
const partition = partitionOffset.partition;
338+
const hwm = partitionHWM[partition];
339+
if (hwm && partitionOffset.offset && hwm >= BigInt(partitionOffset.offset)) {
340+
const currentLag = hwm - BigInt(partitionOffset.offset);
341+
partitionLag[partition] = currentLag;
342+
totalLag += currentLag;
343+
}
344+
}
345+
} else {
346+
throw new Error(`No offsets found for group ${groupId} on topic ${topic}`);
347+
}
348+
for (const partition of Object.keys(partitionHWM)) {
349+
if (partitionLag[partition] === undefined) {
350+
const currentLag = partitionCompleteLag[partition];
351+
partitionLag[partition] = currentLag;
352+
totalLag += currentLag;
353+
}
354+
}
355+
return totalLag;
356+
}
357+
358+
let totalAverageLag = 0n;
359+
let maxLag = 0n;
360+
let totalMeasurements = 0;
361+
362+
while (!handlers.terminationRequested) {
363+
try {
364+
const lag = await fetchTotalLag();
365+
totalAverageLag += lag;
366+
maxLag = lag > maxLag ? lag : maxLag;
367+
totalMeasurements++;
368+
} catch (e) {
369+
console.error(`Error fetching lag: ${e}`);
370+
}
371+
await new Promise(resolve => setTimeout(resolve, 100));
372+
}
373+
const averageLag = totalMeasurements > 0 ? (Number(totalAverageLag) / totalMeasurements) : NaN;
374+
maxLag = Number(maxLag);
375+
376+
await admin.disconnect();
377+
removeHandlers(handlers);
378+
return {
379+
averageLag,
380+
maxLag,
381+
totalMeasurements
382+
};
383+
}
384+
309385
module.exports = {
310386
runConsumer,
311387
runProducer,
388+
runLagMonitoring,
312389
genericProduceToTopic,
313390
getAutoCommit,
314391
};

examples/performance/performance-primitives.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const { hrtime } = require('process');
44
const {
55
runConsumer: runConsumerCommon,
66
runProducer: runProducerCommon,
7+
runLagMonitoring: runLagMonitoringCommon,
78
genericProduceToTopic,
89
getAutoCommit,
910
} = require('./performance-primitives-common');
@@ -14,6 +15,7 @@ module.exports = {
1415
runConsumeTransformProduce,
1516
runCreateTopics,
1617
runProducerConsumerTogether,
18+
runLagMonitoring,
1719
};
1820

1921
function baseConfiguration(parameters) {
@@ -65,6 +67,13 @@ async function runCreateTopics(parameters, topic, topic2, numPartitions) {
6567
await admin.disconnect();
6668
}
6769

70+
function runLagMonitoring(parameters, topic) {
71+
const kafka = new Kafka(baseConfiguration(parameters));
72+
const admin = kafka.admin();
73+
74+
return runLagMonitoringCommon(admin, topic);
75+
}
76+
6877
class CompatibleProducer {
6978
constructor(producer) {
7079
this.producer = producer;

0 commit comments

Comments
 (0)