Skip to content

Commit 195cf73

Browse files
committed
USE_KEYS and timestamp in headers
1 parent 89f4ea4 commit 195cf73

File tree

2 files changed

+22
-13
lines changed

2 files changed

+22
-13
lines changed

examples/performance/performance-primitives-common.js

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ const PERCENTILES = [50, 75, 90, 95, 99, 99.9, 99.99, 100];
55
const TERMINATE_TIMEOUT_MS = process.env.TERMINATE_TIMEOUT_MS ? +process.env.TERMINATE_TIMEOUT_MS : 600000;
66
const AUTO_COMMIT = process.env.AUTO_COMMIT || 'false';
77
const AUTO_COMMIT_ON_BATCH_END = process.env.AUTO_COMMIT_ON_BATCH_END === 'true';
8+
const USE_KEYS = process.env.USE_KEYS === 'true';
9+
810
let autoCommit;
911
if (AUTO_COMMIT && AUTO_COMMIT === 'false')
1012
autoCommit = null;
@@ -168,13 +170,16 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
168170
let consumptionStopped = false;
169171
let lastMessageReceivedAt;
170172
const skippedMessages = warmupMessages;
171-
const decoder = new TextDecoder('utf-8');
172173

173174
const updateLatency = (receivedAt, numMessages, message, isT0T2) => {
174175
if (!stats)
175176
return;
176177

177-
const sentAt = Number(decoder.decode(message.value.slice(0, 13)));
178+
if (!message.headers || !message.headers['timestamp']) {
179+
console.log('WARN: message without timestamp header received, cannot measure latency');
180+
return;
181+
}
182+
const sentAt = Number(message.headers['timestamp']);
178183
let latency = receivedAt - sentAt;
179184

180185
if (isNaN(latency)) {
@@ -370,15 +375,11 @@ async function runProducer(producer, topic, batchSize, warmupMessages, totalMess
370375
let totalMessagesSent = 0;
371376
let totalBytesSent = 0;
372377

373-
const encoder = new TextEncoder();
374378
let staticValueLength = Math.floor(msgSize * (1 - randomness));
375-
if (staticValueLength < 13)
376-
staticValueLength = 13;
377-
let staticValueRemainder = staticValueLength - 13;
378-
if (staticValueRemainder > 0) {
379-
staticValueRemainder = randomBytes(staticValueRemainder);
379+
if (staticValueLength > 0) {
380+
staticValueBytes = randomBytes(staticValueLength);
380381
} else {
381-
staticValueRemainder = Buffer.alloc(0);
382+
staticValueBytes = Buffer.alloc(0);
382383
}
383384

384385
let messageCnt = totalMessageCnt;
@@ -389,8 +390,11 @@ async function runProducer(producer, topic, batchSize, warmupMessages, totalMess
389390
for (let i = 0; i < messageCnt; i++) {
390391
/* Generate a different random value for each message */
391392
messages[i] = {
392-
value: Buffer.concat([staticValueRemainder, randomBytes(msgSize - staticValueLength)]),
393+
value: Buffer.concat([staticValueBytes, randomBytes(msgSize - staticValueLength)]),
393394
};
395+
if (USE_KEYS) {
396+
messages[i].key = randomBytes(8);
397+
}
394398
}
395399

396400
await producer.connect();
@@ -422,8 +426,12 @@ async function runProducer(producer, topic, batchSize, warmupMessages, totalMess
422426
const modifiedMessages = [];
423427
const batchStart = messagesDispatched % messageCnt;
424428
for (const msg of messages.slice(batchStart, batchStart + batchSize)) {
425-
modifiedMessages.push({
426-
value: Buffer.concat([encoder.encode(Date.now().toString()), msg.value])
429+
modifiedMessages.push({
430+
key: msg.key,
431+
value: msg.value,
432+
headers: {
433+
'timestamp': Date.now().toString(),
434+
}
427435
});
428436
}
429437
promises.push(producer.send({

examples/performance/performance-primitives-kafkajs.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
const { Kafka, CompressionTypes } = require('kafkajs');
1+
const { Kafka, CompressionTypes, logLevel } = require('kafkajs');
22
const { randomBytes } = require('crypto');
33
const { hrtime } = require('process');
44
const {
@@ -26,6 +26,7 @@ function baseConfiguration(parameters) {
2626
let ret = {
2727
clientId: 'kafka-test-performance',
2828
brokers: parameters.brokers.split(','),
29+
logLevel: logLevel.WARN,
2930
};
3031
if (parameters.securityProtocol &&
3132
parameters.saslUsername &&

0 commit comments

Comments
 (0)