Skip to content

Commit 1735d04

Browse files
committed
Average and max E2E latency
1 parent 81ade90 commit 1735d04

File tree

5 files changed

+85
-13
lines changed

5 files changed

+85
-13
lines changed

ci/tests/run_perf_test.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,13 @@ let ctpConfluent, ctpKjs;
6262
const producerConfluent = extractValue(outputConfluentProducerConsumer, '=== Producer Rate:');
6363
const consumerConfluentMessage = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate MB/s (eachMessage):');
6464
const consumerConfluentMessageRate = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate msg/s (eachMessage):');
65+
const consumerConfluentMessageAvgLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency (eachMessage):');
66+
const consumerConfluentMessageMaxLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency (eachMessage):');
6567
const consumerConfluentTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachMessage):');
6668
const consumerConfluentBatch = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate MB/s (eachBatch):');
6769
const consumerConfluentBatchRate = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate msg/s (eachBatch):');
70+
const consumerConfluentBatchAvgLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency (eachBatch):');
71+
const consumerConfluentBatchMaxLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency (eachBatch):');
6872
const consumerConfluentBatchTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachBatch):');
6973
const consumerConfluentBatchAverageLag = extractValue(outputConfluentProducerConsumer, '=== Average eachBatch lag:');
7074
const consumerConfluentBatchMaxLag = extractValue(outputConfluentProducerConsumer, '=== Max eachBatch lag:');
@@ -78,9 +82,13 @@ if (!skipCtpTest) {
7882
const producerKjs = extractValue(outputKjsProducerConsumer, '=== Producer Rate:');
7983
const consumerKjsMessage = extractValue(outputKjsProducerConsumer, '=== Consumer Rate MB/s (eachMessage):');
8084
const consumerKjsMessageRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachMessage):');
85+
const consumerKjsMessageAvgLatency = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency (eachMessage):');
86+
const consumerKjsMessageMaxLatency = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency (eachMessage):');
8187
const consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):');
8288
const consumerKjsBatch = extractValue(outputKjsProducerConsumer, '=== Consumer Rate MB/s (eachBatch):');
8389
const consumerKjsBatchRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachBatch):');
90+
const consumerKjsBatchAvgLatency = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency (eachBatch):');
91+
const consumerKjsBatchMaxLatency = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency (eachBatch):');
8492
const consumerKjsBatchTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachBatch):');
8593
const consumerKjsBatchAverageLag = extractValue(outputKjsProducerConsumer, '=== Average eachBatch lag:');
8694
const consumerKjsBatchMaxLag = extractValue(outputKjsProducerConsumer, '=== Max eachBatch lag:');
@@ -94,9 +102,13 @@ if (!skipCtpTest) {
94102
console.log(`Producer rates: confluent ${producerConfluent}, kafkajs ${producerKjs}`);
95103
console.log(`Consumer rates MB/s (eachMessage): confluent ${consumerConfluentMessage}, kafkajs ${consumerKjsMessage}`);
96104
console.log(`Consumer rates msg/s (eachMessage): confluent ${consumerConfluentMessageRate}, kafkajs ${consumerKjsMessageRate}`);
105+
console.log(`Consumer average E2E latency (eachMessage): confluent ${consumerConfluentMessageAvgLatency}, kafkajs ${consumerKjsMessageAvgLatency}`);
106+
console.log(`Consumer max E2E latency (eachMessage): confluent ${consumerConfluentMessageMaxLatency}, kafkajs ${consumerKjsMessageMaxLatency}`);
97107
console.log(`Consumption time (eachMessage): confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`);
98108
console.log(`Consumer rates MB/s (eachBatch): confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`);
99109
console.log(`Consumer rates msg/s (eachBatch): confluent ${consumerConfluentBatchRate}, kafkajs ${consumerKjsBatchRate}`);
110+
console.log(`Consumer average E2E latency (eachBatch): confluent ${consumerConfluentBatchAvgLatency}, kafkajs ${consumerKjsBatchAvgLatency}`);
111+
console.log(`Consumer max E2E latency (eachBatch): confluent ${consumerConfluentBatchMaxLatency}, kafkajs ${consumerKjsBatchMaxLatency}`);
100112
console.log(`Consumption time (eachBatch): confluent ${consumerConfluentBatchTime}, kafkajs ${consumerKjsBatchTime}`);
101113
console.log(`Average eachBatch lag: confluent ${consumerConfluentBatchAverageLag}, kafkajs ${consumerKjsBatchAverageLag}`);
102114
console.log(`Max eachBatch lag: confluent ${consumerConfluentBatchMaxLag}, kafkajs ${consumerKjsBatchMaxLag}`);

examples/performance/performance-consolidated.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ function logParameters(parameters) {
132132
endTrackingMemory(`consumer-memory-message-${mode}.json`);
133133
console.log("=== Consumer Rate MB/s (eachMessage): ", consumerRate);
134134
console.log("=== Consumer Rate msg/s (eachMessage): ", stats.messageRate);
135+
console.log("=== Consumer average E2E latency (eachMessage): ", stats.avgLatency);
136+
console.log("=== Consumer max E2E latency (eachMessage): ", stats.maxLatency);
135137
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
136138
}
137139

@@ -152,6 +154,8 @@ function logParameters(parameters) {
152154
console.log("=== Average eachBatch lag: ", stats.averageOffsetLag);
153155
console.log("=== Max eachBatch lag: ", stats.maxOffsetLag);
154156
console.log("=== Average eachBatch size: ", stats.averageBatchSize);
157+
console.log("=== Consumer average E2E latency (eachBatch): ", stats.avgLatency);
158+
console.log("=== Consumer max E2E latency (eachBatch): ", stats.maxLatency);
155159
console.log("=== Consumption time (eachBatch): ", stats.durationSeconds);
156160
}
157161

examples/performance/performance-primitives-common.js

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,23 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
1414
let startTime;
1515
let rate;
1616
const skippedMessages = warmupMessages;
17+
const decoder = new TextDecoder('utf-8');
18+
19+
const updateLatency = (receivedAt, numMessages, message) => {
20+
if (!stats)
21+
return;
22+
23+
const sentAt = Number(decoder.decode(message.value.slice(0, 13)));
24+
const latency = receivedAt - sentAt;
25+
26+
if (stats.maxLatency === undefined) {
27+
stats.maxLatency = latency;
28+
stats.avgLatency = latency;
29+
} else {
30+
stats.maxLatency = Math.max(stats.maxLatency, latency);
31+
stats.avgLatency = ((stats.avgLatency * (numMessages - 1)) + latency) / numMessages;
32+
}
33+
};
1734

1835
console.log("Starting consumer.");
1936
let consumeMethod = {
@@ -24,6 +41,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
2441
if (messagesReceived >= skippedMessages) {
2542
messagesMeasured++;
2643
totalMessageSize += message.value.length;
44+
updateLatency(Date.now(), messagesMeasured, message);
2745

2846
if (messagesReceived === skippedMessages) {
2947
startTime = hrtime.bigint();
@@ -54,8 +72,14 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
5472
if (messagesBeforeBatch < skippedMessages) {
5573
messages = messages.slice(messages.length - messagesMeasured);
5674
}
57-
for (const message of messages)
75+
const now = Date.now();
76+
const messagesBase = messagesMeasured - messages.length;
77+
let i = 1;
78+
for (const message of messages) {
5879
totalMessageSize += message.value.length;
80+
updateLatency(now, messagesBase + i, message);
81+
i++;
82+
}
5983

6084
if (!startTime) {
6185
startTime = hrtime.bigint();

examples/performance/performance-primitives-kafkajs.js

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,22 @@ async function runProducer(parameters, topic, batchSize, warmupMessages, totalMe
6565
let totalMessagesSent = 0;
6666
let totalBytesSent = 0;
6767

68-
const messages = Array(batchSize);
69-
let staticValue = randomBytes(Math.floor(msgSize * (1 - randomness)));
70-
for (let i = 0; i < batchSize; i++) {
68+
const messages = Array(totalMessageCnt);
69+
const encoder = new TextEncoder();
70+
let staticValueLength = Math.floor(msgSize * (1 - randomness));
71+
if (staticValueLength < 13)
72+
staticValueLength = 13;
73+
const staticValueRemainder = staticValueLength - 13;
74+
if (staticValueRemainder > 0) {
75+
staticValueRemainder = randomBytes(staticValueRemainder);
76+
} else {
77+
staticValueRemainder = Buffer.alloc(0);
78+
}
79+
80+
for (let i = 0; i < totalMessageCnt; i++) {
7181
/* Generate a different random value for each message */
7282
messages[i] = {
73-
value: Buffer.concat([staticValue, randomBytes(msgSize - staticValue.length)]),
83+
value: Buffer.concat([staticValueRemainder, randomBytes(msgSize - staticValueLength)]),
7484
};
7585
}
7686

@@ -83,7 +93,7 @@ async function runProducer(parameters, topic, batchSize, warmupMessages, totalMe
8393
while (warmupMessages > 0) {
8494
await producer.send({
8595
topic,
86-
messages,
96+
messages: messages.slice(0, batchSize),
8797
compression: CompressionTypes[compression],
8898
});
8999
warmupMessages -= batchSize;
@@ -102,9 +112,15 @@ async function runProducer(parameters, topic, batchSize, warmupMessages, totalMe
102112
while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) {
103113
let messagesNotAwaited = 0;
104114
while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) {
115+
const modifiedMessages = [];
116+
for (const msg of messages.slice(messagesDispatched, messagesDispatched + batchSize)) {
117+
modifiedMessages.push({
118+
value: Buffer.concat([encoder.encode(Date.now().toString()), msg.value])
119+
});
120+
}
105121
promises.push(producer.send({
106122
topic,
107-
messages,
123+
messages: modifiedMessages,
108124
compression: CompressionTypes[compression],
109125
}).then(() => {
110126
totalMessagesSent += batchSize;

examples/performance/performance-primitives.js

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,22 @@ async function runProducer(parameters, topic, batchSize, warmupMessages, totalMe
6464
let totalMessagesSent = 0;
6565
let totalBytesSent = 0;
6666

67-
const messages = Array(batchSize);
68-
let staticValue = randomBytes(Math.floor(msgSize * (1 - randomness)));
69-
for (let i = 0; i < batchSize; i++) {
67+
const messages = Array(totalMessageCnt);
68+
const encoder = new TextEncoder();
69+
let staticValueLength = Math.floor(msgSize * (1 - randomness));
70+
if (staticValueLength < 13)
71+
staticValueLength = 13;
72+
const staticValueRemainder = staticValueLength - 13;
73+
if (staticValueRemainder > 0) {
74+
staticValueRemainder = randomBytes(staticValueRemainder);
75+
} else {
76+
staticValueRemainder = Buffer.alloc(0);
77+
}
78+
79+
for (let i = 0; i < totalMessageCnt; i++) {
7080
/* Generate a different random value for each message */
7181
messages[i] = {
72-
value: Buffer.concat([staticValue, randomBytes(msgSize - staticValue.length)]),
82+
value: Buffer.concat([staticValueRemainder, randomBytes(msgSize - staticValueLength)]),
7383
};
7484
}
7585

@@ -85,7 +95,7 @@ async function runProducer(parameters, topic, batchSize, warmupMessages, totalMe
8595
while (warmupMessages > 0) {
8696
await producer.send({
8797
topic,
88-
messages,
98+
messages: messages.slice(0, batchSize),
8999
});
90100
warmupMessages -= batchSize;
91101
}
@@ -103,9 +113,15 @@ async function runProducer(parameters, topic, batchSize, warmupMessages, totalMe
103113
while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) {
104114
let messagesNotAwaited = 0;
105115
while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) {
116+
const modifiedMessages = [];
117+
for (const msg of messages.slice(messagesDispatched, messagesDispatched + batchSize)) {
118+
modifiedMessages.push({
119+
value: Buffer.concat([encoder.encode(Date.now().toString()), msg.value])
120+
});
121+
}
106122
promises.push(producer.send({
107123
topic,
108-
messages,
124+
messages: modifiedMessages,
109125
}).then(() => {
110126
totalMessagesSent += batchSize;
111127
totalBytesSent += batchSize * msgSize;

0 commit comments

Comments
 (0)