Skip to content

Commit 7532d59

Browse files
committed
More accurate latency and throughput measurement
1 parent a8306ad commit 7532d59

File tree

3 files changed

+42
-12
lines changed

3 files changed

+42
-12
lines changed

ci/tests/run_perf_test.js

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,24 +87,26 @@ async function main() {
8787

8888
if (concurrentRun) {
8989
console.log(`Running ${modeLabel} Producer/Consumer test (concurrently)...`);
90+
const INITIAL_DELAY_MS = 2000;
9091
const TERMINATE_TIMEOUT_MS = process.env.TERMINATE_TIMEOUT_MS ? +process.env.TERMINATE_TIMEOUT_MS : 600000;
91-
// Wait 2s more to see if all lag is caught up
92-
const TERMINATE_TIMEOUT_MS_CONSUMERS = TERMINATE_TIMEOUT_MS + 2000;
92+
// Wait INITIAL_DELAY_MS more to see if all lag is caught up, start earlier than the producer to check
93+
// E2E latencies more accurately.
94+
const TERMINATE_TIMEOUT_MS_CONSUMERS = TERMINATE_TIMEOUT_MS + INITIAL_DELAY_MS * 2;
9395

9496
await runCommand(`MODE=${mode} node performance-consolidated.js --create-topics`);
9597
const allPromises = [];
96-
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} node performance-consolidated.js --producer`));
98+
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} INITIAL_DELAY_MS=${INITIAL_DELAY_MS} node performance-consolidated.js --producer`));
9799
if (consumerModeAll || consumerModeEachMessage) {
98-
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MESSAGE=${groupIdEachMessage} node performance-consolidated.js --consumer-each-message ${produceToSecondTopicParam}`));
100+
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} INITIAL_DELAY_MS=0 TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MESSAGE=${groupIdEachMessage} node performance-consolidated.js --consumer-each-message ${produceToSecondTopicParam}`));
99101
}
100102
if (consumerModeAll || consumerModeEachBatch) {
101-
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_BATCH=${groupIdEachBatch} node performance-consolidated.js --consumer-each-batch ${produceToSecondTopicParam}`));
103+
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} INITIAL_DELAY_MS=0 TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_BATCH=${groupIdEachBatch} node performance-consolidated.js --consumer-each-batch ${produceToSecondTopicParam}`));
102104
}
103105
if (consumerModeAll || consumerModeEachMessage) {
104-
allPromises.push(runCommand(`MODE=${mode} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachMessage} node performance-consolidated.js --monitor-lag`));
106+
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=0 TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachMessage} node performance-consolidated.js --monitor-lag`));
105107
}
106108
if (consumerModeAll || consumerModeEachBatch) {
107-
allPromises.push(runCommand(`MODE=${mode} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachBatch} node performance-consolidated.js --monitor-lag`));
109+
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=0 TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachBatch} node performance-consolidated.js --monitor-lag`));
108110
}
109111
const results = await Promise.allSettled(allPromises);
110112
return results.map(r => r.status === 'fulfilled' ? r.value : '').join('\n');

examples/performance/performance-consolidated.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const batchSize = process.env.BATCH_SIZE ? +process.env.BATCH_SIZE : 100;
2525
const compression = process.env.COMPRESSION || 'None';
2626
// Between 0 and 1, percentage of random bytes in each message
2727
const randomness = process.env.RANDOMNESS ? +process.env.RANDOMNESS : 0.5;
28+
const initialDelayMs = process.env.INITIAL_DELAY_MS ? +process.env.INITIAL_DELAY_MS : 0;
2829
const numPartitions = process.env.PARTITIONS ? +process.env.PARTITIONS : 3;
2930
const partitionsConsumedConcurrently = process.env.PARTITIONS_CONSUMED_CONCURRENTLY ? +process.env.PARTITIONS_CONSUMED_CONCURRENTLY : 1;
3031
const warmupMessages = process.env.WARMUP_MESSAGES ? +process.env.WARMUP_MESSAGES : (batchSize * 10);
@@ -101,6 +102,10 @@ function logParameters(parameters) {
101102
}
102103

103104
console.log(`=== Starting Performance Tests - Mode ${mode} ===`);
105+
if (initialDelayMs > 0) {
106+
console.log(`=== Initial delay of ${initialDelayMs}ms before starting tests ===`);
107+
await new Promise(resolve => setTimeout(resolve, initialDelayMs));
108+
}
104109

105110
if (createTopics || all) {
106111
console.log("=== Creating Topics (deleting if they exist already):");

examples/performance/performance-primitives-common.js

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,22 @@ function genericProduceToTopic(producer, topic, messages) {
6060

6161
async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats, actionOnMessages) {
6262
const handlers = installHandlers(totalMessageCnt === -1);
63-
await consumer.connect();
64-
await consumer.subscribe({ topic });
63+
while (true) {
64+
try {
65+
await consumer.connect();
66+
break;
67+
} catch (e) {
68+
console.error(`Error connecting consumer: ${e}`);
69+
}
70+
}
71+
while (true) {
72+
try {
73+
await consumer.subscribe({ topic });
74+
break;
75+
} catch (e) {
76+
console.error(`Error subscribing consumer: ${e}`);
77+
}
78+
}
6579

6680
let messagesReceived = 0;
6781
let messagesMeasured = 0;
@@ -73,6 +87,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
7387
let startTime;
7488
let rate;
7589
let consumptionStopped = false;
90+
let lastMessageReceivedAt;
7691
const skippedMessages = warmupMessages;
7792
const decoder = new TextDecoder('utf-8');
7893

@@ -111,11 +126,16 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
111126
if (consumptionStopped)
112127
return;
113128
consumptionStopped = true;
114-
let durationNanos = Number(hrtime.bigint() - startTime);
129+
const now = lastMessageReceivedAt || hrtime.bigint();
130+
let durationNanos = Number(now - startTime);
115131
durationSeconds = durationNanos / 1e9;
116132
rate = (totalMessageSize / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */
117133
console.log(`Recvd ${messagesMeasured} messages in ${durationSeconds} seconds, ${totalMessageSize} bytes; rate is ${rate} MB/s`);
118-
consumer.pause([{ topic }]);
134+
try {
135+
consumer.pause([{ topic }]);
136+
} catch (e) {
137+
console.error(`Error pausing consumer: ${e}`);
138+
}
119139
}
120140

121141
console.log("Starting consumer.");
@@ -153,6 +173,8 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
153173
consumeMethod = {
154174
partitionsConsumedConcurrently,
155175
eachBatch: async ({ batch }) => {
176+
if (!batch.messages)
177+
return;
156178
const messagesBeforeBatch = messagesReceived;
157179
const topic = batch.topic;
158180
const partition = batch.partition;
@@ -170,6 +192,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
170192
messages = messages.slice(messages.length - messagesMeasured);
171193
}
172194
const now = Date.now();
195+
lastMessageReceivedAt = hrtime.bigint();
173196
messagesBase = messagesMeasured - messages.length;
174197
let i = 1;
175198
for (const message of messages) {
@@ -187,7 +210,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
187210

188211
if (actionOnMessages) {
189212
await actionOnMessages(batch.messages);
190-
if (messagesMeasured > 0 && messages.length > 0) {
213+
if (messagesMeasured > 0 && messages && messages.length > 0) {
191214
let i = 1;
192215
const now = Date.now();
193216
for (const message of messages) {

0 commit comments

Comments
 (0)