Skip to content

Commit e332111

Browse files
committed
Configurable partitions consumed concurrently
1 parent 33bd5ec commit e332111

File tree

4 files changed

+10
-7
lines changed

4 files changed

+10
-7
lines changed

examples/performance/performance-consolidated.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 256;
2121
const batchSize = process.env.BATCH_SIZE ? +process.env.BATCH_SIZE : 100;
2222
const compression = process.env.COMPRESSION || 'None';
2323
const numPartitions = process.env.PARTITIONS ? +process.env.PARTITIONS : 3;
24+
const partitionsConsumedConcurrently = process.env.PARTITIONS_CONSUMED_CONCURRENTLY ? +process.env.PARTITIONS_CONSUMED_CONCURRENTLY : 1;
2425
const warmupMessages = process.env.WARMUP_MESSAGES ? +process.env.WARMUP_MESSAGES : (batchSize * 10);
2526
const messageProcessTimeMs = process.env.MESSAGE_PROCESS_TIME_MS ? +process.env.MESSAGE_PROCESS_TIME_MS : 5;
2627
const ctpConcurrency = process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? +process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY : 1;
@@ -121,7 +122,7 @@ function logParameters(parameters) {
121122
console.log(` Topic: ${topic}`);
122123
console.log(` Message Count: ${messageCount}`);
123124
startTrackingMemory();
124-
const consumerRate = await runConsumer(parameters, topic, warmupMessages, messageCount, false, stats);
125+
const consumerRate = await runConsumer(parameters, topic, warmupMessages, messageCount, false, partitionsConsumedConcurrently, stats);
125126
endTrackingMemory(`consumer-memory-message-${mode}.json`);
126127
console.log("=== Consumer Rate (eachMessage): ", consumerRate);
127128
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
@@ -134,7 +135,7 @@ function logParameters(parameters) {
134135
console.log(` Topic: ${topic}`);
135136
console.log(` Message Count: ${messageCount}`);
136137
startTrackingMemory();
137-
const consumerRate = await runConsumer(parameters, topic, warmupMessages, messageCount, true, stats);
138+
const consumerRate = await runConsumer(parameters, topic, warmupMessages, messageCount, true, partitionsConsumedConcurrently, stats);
138139
endTrackingMemory(`consumer-memory-batch-${mode}.json`);
139140
console.log("=== Consumer Rate (eachBatch): ", consumerRate);
140141
console.log("=== Average eachBatch lag: ", stats.averageOffsetLag);

examples/performance/performance-primitives-common.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
const { hrtime } = require('process');
22

3-
async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eachBatch, stats) {
3+
async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats) {
44
await consumer.connect();
55
await consumer.subscribe({ topic });
66

@@ -17,6 +17,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
1717

1818
console.log("Starting consumer.");
1919
let consumeMethod = {
20+
partitionsConsumedConcurrently,
2021
eachMessage: async ({ topic, partition, message }) => {
2122
messagesReceived++;
2223

@@ -38,6 +39,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
3839
}
3940
if (eachBatch) {
4041
consumeMethod = {
42+
partitionsConsumedConcurrently,
4143
eachBatch: async ({ batch }) => {
4244
const messagesBeforeBatch = messagesReceived;
4345
const topic = batch.topic;

examples/performance/performance-primitives-kafkajs.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ function newCompatibleConsumer(parameters) {
168168
return new CompatibleConsumer(consumer);
169169
}
170170

171-
async function runConsumer(parameters, topic, warmupMessages, totalMessageCnt, eachBatch, stats) {
172-
return runConsumerCommon(newCompatibleConsumer(parameters), topic, warmupMessages, totalMessageCnt, eachBatch, stats);
171+
async function runConsumer(parameters, topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats) {
172+
return runConsumerCommon(newCompatibleConsumer(parameters), topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats);
173173
}
174174

175175
async function runConsumeTransformProduce(parameters, consumeTopic, produceTopic, warmupMessages, totalMessageCnt, messageProcessTimeMs, ctpConcurrency) {

examples/performance/performance-primitives.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ function newCompatibleConsumer(parameters) {
172172
return new CompatibleConsumer(consumer);
173173
}
174174

175-
async function runConsumer(parameters, topic, warmupMessages, totalMessageCnt, eachBatch, stats) {
176-
return runConsumerCommon(newCompatibleConsumer(parameters), topic, warmupMessages, totalMessageCnt, eachBatch, stats);
175+
async function runConsumer(parameters, topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats) {
176+
return runConsumerCommon(newCompatibleConsumer(parameters), topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats);
177177
}
178178

179179
async function runConsumeTransformProduce(parameters, consumeTopic, produceTopic, warmupMessages, totalMessageCnt, messageProcessTimeMs, ctpConcurrency) {

0 commit comments

Comments
 (0)