Skip to content

Commit b357ee6

Browse files
committed
CONSUMER_MAX_BATCH_SIZE in performance test
1 parent c8c3130 commit b357ee6

File tree

3 files changed

+10
-2
lines changed

3 files changed

+10
-2
lines changed

ci/tests/run_perf_test.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ async function main() {
4747
if (concurrentRun) {
4848
skipCtpTest = true;
4949
}
50+
if (!process.env.CONSUMER_MAX_BATCH_SIZE) {
51+
process.env.CONSUMER_MAX_BATCH_SIZE = '-1';
52+
}
5053
if (!process.env.PARTITIONS_CONSUMED_CONCURRENTLY) {
5154
process.env.PARTITIONS_CONSUMED_CONCURRENTLY = '2';
5255
}

examples/performance/performance-consolidated.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ const saslPassword = process.env.SASL_PASSWORD;
2020
const topic = process.env.KAFKA_TOPIC || `test-topic-${mode}`;
2121
const topic2 = process.env.KAFKA_TOPIC2 || `test-topic2-${mode}`;
2222
const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 1000000;
23-
const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 256;
24-
const batchSize = process.env.BATCH_SIZE ? +process.env.BATCH_SIZE : 100;
23+
const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 4096;
24+
const batchSize = process.env.PRODUCER_BATCH_SIZE ? +process.env.PRODUCER_BATCH_SIZE : 100;
2525
const compression = process.env.COMPRESSION || 'None';
2626
// Between 0 and 1, percentage of random bytes in each message
2727
const randomness = process.env.RANDOMNESS ? +process.env.RANDOMNESS : 0.5;

examples/performance/performance-primitives.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ module.exports = {
2020
};
2121

2222

23+
const CONSUMER_MAX_BATCH_SIZE = process.env.CONSUMER_MAX_BATCH_SIZE ? +process.env.CONSUMER_MAX_BATCH_SIZE : null;
2324
const IS_HIGHER_LATENCY_CLUSTER = process.env.IS_HIGHER_LATENCY_CLUSTER === 'true';
2425

2526
function baseConfiguration(parameters) {
@@ -156,6 +157,9 @@ function newCompatibleConsumer(parameters, eachBatch) {
156157
const autoCommitOpts = autoCommit > 0 ?
157158
{ 'enable.auto.commit': true, 'auto.commit.interval.ms': autoCommit } :
158159
{ 'enable.auto.commit': false };
160+
const jsOpts = (eachBatch && CONSUMER_MAX_BATCH_SIZE !== null) ? {
161+
'js.consumer.max.batch.size': CONSUMER_MAX_BATCH_SIZE
162+
} : {};
159163
const higherLatencyClusterOpts = IS_HIGHER_LATENCY_CLUSTER ? {
160164
'max.partition.fetch.bytes': '8388608'
161165
} : {};
@@ -170,6 +174,7 @@ function newCompatibleConsumer(parameters, eachBatch) {
170174
'auto.offset.reset': 'earliest',
171175
'fetch.queue.backoff.ms': '100',
172176
...autoCommitOpts,
177+
...jsOpts,
173178
...higherLatencyClusterOpts,
174179
});
175180
return new CompatibleConsumer(consumer);

0 commit comments

Comments
 (0)