Skip to content

Commit ed7226b

Browse files
committed
Add example to measure performance
1 parent 08ecfc3 commit ed7226b

File tree

1 file changed

+149
-0
lines changed

1 file changed

+149
-0
lines changed
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
const { Kafka, CompressionTypes, ErrorCodes } = require('../../').KafkaJS;
2+
const { randomBytes } = require('crypto');
3+
const { hrtime } = require('process');
4+
5+
async function runProducer(brokers, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression) {
6+
let totalMessagesSent = 0;
7+
let totalBytesSent = 0;
8+
9+
const message = {
10+
value: randomBytes(msgSize),
11+
}
12+
13+
const messages = Array(batchSize).fill(message);
14+
15+
const kafka = new Kafka({
16+
'client.id': 'kafka-test-performance',
17+
'metadata.broker.list': brokers,
18+
'compression.codec': compression,
19+
});
20+
21+
const producer = kafka.producer();
22+
await producer.connect();
23+
24+
console.log('Sending ' + warmupMessages + ' warmup messages.');
25+
while (warmupMessages > 0) {
26+
await producer.send({
27+
topic,
28+
messages,
29+
});
30+
warmupMessages -= batchSize;
31+
}
32+
console.log('Sent warmup messages');
33+
34+
// Now that warmup is done, start measuring...
35+
let startTime;
36+
let promises = [];
37+
startTime = hrtime();
38+
let messagesDispatched = 0;
39+
40+
// The double while-loop allows us to send a bunch of messages and then
41+
// await them all at once. We need the second while loop to keep sending
42+
// in case of queue full errors, which surface only on awaiting.
43+
while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) {
44+
while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) {
45+
promises.push(producer.send({
46+
topic,
47+
messages,
48+
}).then(() => {
49+
totalMessagesSent += batchSize;
50+
totalBytesSent += batchSize * msgSize;
51+
}).catch((err) => {
52+
if (err.code === ErrorCodes.ERR__QUEUE_FULL) {
53+
/* do nothing, just send them again */
54+
messagesDispatched -= batchSize;
55+
} else {
56+
console.error(err);
57+
throw err;
58+
}
59+
}));
60+
messagesDispatched += batchSize;
61+
}
62+
await Promise.all(promises);
63+
}
64+
console.log({messagesDispatched, totalMessageCnt})
65+
let elapsed = hrtime(startTime);
66+
let durationNanos = elapsed[0] * 1e9 + elapsed[1];
67+
let rate = (totalBytesSent / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */
68+
console.log(`Sent ${totalMessagesSent} messages, ${totalBytesSent} bytes; rate is ${rate} MB/s`);
69+
70+
await producer.disconnect();
71+
return rate;
72+
}
73+
74+
async function runConsumer(brokers, topic, totalMessageCnt) {
75+
const kafka = new Kafka({
76+
'client.id': 'kafka-test-performance',
77+
'metadata.broker.list': brokers,
78+
});
79+
80+
const consumer = kafka.consumer({
81+
'group.id': 'test-group' + Math.random(),
82+
'enable.auto.commit': false,
83+
'auto.offset.reset': 'earliest',
84+
});
85+
await consumer.connect();
86+
await consumer.subscribe({ topic });
87+
88+
let messagesReceived = 0;
89+
let totalMessageSize = 0;
90+
let startTime;
91+
let rate;
92+
consumer.run({
93+
eachMessage: async ({ topic, partition, message }) => {
94+
messagesReceived++;
95+
totalMessageSize += message.value.length;
96+
if (messagesReceived === 1) {
97+
consumer.pause([{ topic }]);
98+
} else if (messagesReceived === totalMessageCnt) {
99+
let elapsed = hrtime(startTime);
100+
let durationNanos = elapsed[0] * 1e9 + elapsed[1];
101+
rate = (totalMessageSize / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */
102+
console.log(`Recvd ${messagesReceived} messages, ${totalMessageSize} bytes; rate is ${rate} MB/s`);
103+
consumer.pause([{ topic }]);
104+
// } else if (messagesReceived % 100 == 0) {
105+
// console.log(`Recvd ${messagesReceived} messages, ${totalMessageSize} bytes`);
106+
}
107+
}
108+
});
109+
110+
// Wait until the first message is received
111+
await new Promise((resolve) => {
112+
let interval = setInterval(() => {
113+
if (messagesReceived > 0) {
114+
clearInterval(interval);
115+
resolve();
116+
}
117+
}, 100);
118+
});
119+
120+
console.log("Starting consumer.")
121+
122+
totalMessageSize = 0;
123+
startTime = hrtime();
124+
consumer.resume([{ topic }]);
125+
await new Promise((resolve) => {
126+
let interval = setInterval(() => {
127+
if (messagesReceived >= totalMessageCnt) {
128+
clearInterval(interval);
129+
resolve();
130+
}
131+
}, 1000);
132+
});
133+
134+
await consumer.disconnect();
135+
return rate;
136+
}
137+
138+
const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
139+
const topic = process.env.KAFKA_TOPIC || 'test-topic';
140+
const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 1000000;
141+
const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 256;
142+
const batchSize = process.env.BATCH_SIZE ? +process.env.BATCH_SIZE : 100;
143+
const compression = process.env.COMPRESSION || CompressionTypes.NONE;
144+
const warmupMessages = process.env.WARMUP_MESSAGES ? +process.env.WARMUP_MESSAGES : (batchSize * 10);
145+
146+
runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression).then(async (producerRate) => {
147+
const consumerRate = await runConsumer(brokers, topic, messageCount);
148+
console.log(producerRate, consumerRate);
149+
});

0 commit comments

Comments
 (0)