Skip to content

Commit cf504d2

Browse files
committed
Consume rate in msg/s, average batch size, different messages depending on randomness
1 parent fec8bc3 commit cf504d2

File tree

4 files changed

+36
-15
lines changed

4 files changed

+36
-15
lines changed

examples/performance/performance-consolidated.js

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 10
2020
const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 256;
2121
const batchSize = process.env.BATCH_SIZE ? +process.env.BATCH_SIZE : 100;
2222
const compression = process.env.COMPRESSION || 'None';
23+
// Between 0 and 1, percentage of random bytes in each message
24+
const randomness = process.env.RANDOMNESS ? +process.env.RANDOMNESS : 0.5;
2325
const numPartitions = process.env.PARTITIONS ? +process.env.PARTITIONS : 3;
2426
const partitionsConsumedConcurrently = process.env.PARTITIONS_CONSUMED_CONCURRENTLY ? +process.env.PARTITIONS_CONSUMED_CONCURRENTLY : 1;
2527
const warmupMessages = process.env.WARMUP_MESSAGES ? +process.env.WARMUP_MESSAGES : (batchSize * 10);
@@ -110,7 +112,8 @@ function logParameters(parameters) {
110112
console.log(` Compression: ${compression}`);
111113
console.log(` Warmup Messages: ${warmupMessages}`);
112114
startTrackingMemory();
113-
const producerRate = await runProducer(parameters, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
115+
const producerRate = await runProducer(parameters, topic, batchSize,
116+
warmupMessages, messageCount, messageSize, compression, randomness);
114117
endTrackingMemory(`producer-memory-${mode}.json`);
115118
console.log("=== Producer Rate: ", producerRate);
116119
}
@@ -121,10 +124,14 @@ function logParameters(parameters) {
121124
logParameters(parameters);
122125
console.log(` Topic: ${topic}`);
123126
console.log(` Message Count: ${messageCount}`);
127+
console.log(` Partitions consumed concurrently: ${partitionsConsumedConcurrently}`);
124128
startTrackingMemory();
125-
const consumerRate = await runConsumer(parameters, topic, warmupMessages, messageCount, false, partitionsConsumedConcurrently, stats);
129+
const consumerRate = await runConsumer(parameters, topic,
130+
warmupMessages, messageCount,
131+
false, partitionsConsumedConcurrently, stats);
126132
endTrackingMemory(`consumer-memory-message-${mode}.json`);
127-
console.log("=== Consumer Rate (eachMessage): ", consumerRate);
133+
console.log("=== Consumer Rate MB/s (eachMessage): ", consumerRate);
134+
console.log("=== Consumer Rate msg/s (eachMessage): ", stats.messageRate);
128135
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
129136
}
130137

@@ -134,12 +141,17 @@ function logParameters(parameters) {
134141
logParameters(parameters);
135142
console.log(` Topic: ${topic}`);
136143
console.log(` Message Count: ${messageCount}`);
144+
console.log(` Partitions consumed concurrently: ${partitionsConsumedConcurrently}`);
137145
startTrackingMemory();
138-
const consumerRate = await runConsumer(parameters, topic, warmupMessages, messageCount, true, partitionsConsumedConcurrently, stats);
146+
const consumerRate = await runConsumer(parameters, topic,
147+
warmupMessages, messageCount,
148+
true, partitionsConsumedConcurrently, stats);
139149
endTrackingMemory(`consumer-memory-batch-${mode}.json`);
140-
console.log("=== Consumer Rate (eachBatch): ", consumerRate);
150+
console.log("=== Consumer Rate MB/s (eachBatch): ", consumerRate);
151+
console.log("=== Consumer Rate msg/s (eachBatch): ", stats.messageRate);
141152
console.log("=== Average eachBatch lag: ", stats.averageOffsetLag);
142153
console.log("=== Max eachBatch lag: ", stats.maxOffsetLag);
154+
console.log("=== Average eachBatch size: ", stats.averageBatchSize);
143155
console.log("=== Consumption time (eachBatch): ", stats.durationSeconds);
144156
}
145157

examples/performance/performance-primitives-common.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,10 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
9191
if (eachBatch) {
9292
stats.averageOffsetLag = totalBatches > 0 ? (totalOffsetLag / totalBatches) : 0;
9393
stats.maxOffsetLag = maxOffsetLag;
94+
stats.averageBatchSize = totalBatches > 0 ? (messagesMeasured / totalBatches) : 0;
9495
}
96+
stats.messageRate = durationSeconds > 0 ?
97+
(messagesMeasured / durationSeconds) : Infinity;
9598
stats.durationSeconds = durationSeconds;
9699
}
97100
return rate;

examples/performance/performance-primitives-kafkajs.js

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,19 @@ async function runCreateTopics(parameters, topic, topic2, numPartitions) {
6161
await admin.disconnect();
6262
}
6363

64-
async function runProducer(parameters, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression) {
64+
async function runProducer(parameters, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression, randomness) {
6565
let totalMessagesSent = 0;
6666
let totalBytesSent = 0;
6767

68-
const message = {
69-
value: randomBytes(msgSize),
68+
const messages = Array(batchSize);
69+
let staticValue = randomBytes(Math.floor(msgSize * (1 - randomness)));
70+
for (let i = 0; i < batchSize; i++) {
71+
/* Generate a different random value for each message */
72+
messages[i] = {
73+
value: Buffer.concat([staticValue, randomBytes(msgSize - staticValue.length)]),
74+
};
7075
}
7176

72-
const messages = Array(batchSize).fill(message);
73-
7477
const kafka = new Kafka(baseConfiguration(parameters));
7578

7679
const producer = kafka.producer();

examples/performance/performance-primitives.js

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,19 @@ async function runCreateTopics(parameters, topic, topic2, numPartitions) {
6060
await admin.disconnect();
6161
}
6262

63-
async function runProducer(parameters, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression) {
63+
async function runProducer(parameters, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression, randomness) {
6464
let totalMessagesSent = 0;
6565
let totalBytesSent = 0;
6666

67-
const message = {
68-
value: randomBytes(msgSize),
67+
const messages = Array(batchSize);
68+
let staticValue = randomBytes(Math.floor(msgSize * (1 - randomness)));
69+
for (let i = 0; i < batchSize; i++) {
70+
/* Generate a different random value for each message */
71+
messages[i] = {
72+
value: Buffer.concat([staticValue, randomBytes(msgSize - staticValue.length)]),
73+
};
6974
}
7075

71-
const messages = Array(batchSize).fill(message);
72-
7376
const kafka = new Kafka({
7477
...baseConfiguration(parameters),
7578
'compression.codec': CompressionTypes[compression],

0 commit comments

Comments
 (0)