Skip to content

Commit 4f7b7da

Browse files
authored
Add performance benchmarking script modes and README
* Add performance benchmarking script modes and README * Fix createTopics return * Add topic creation to benchmarks * Remove needless batch size (msgs) 1 * Add performance example to semaphore * Clean up the perf runner script
1 parent 2adc75e commit 4f7b7da

File tree

7 files changed

+458
-10
lines changed

7 files changed

+458
-10
lines changed

.semaphore/semaphore.yml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,38 @@ blocks:
108108
commands:
109109
- npx eslint lib/kafkajs
110110

111+
- name: "Linux amd64: Performance"
112+
dependencies: [ ]
113+
task:
114+
agent:
115+
machine:
116+
type: s1-prod-ubuntu20-04-amd64-4
117+
env_vars:
118+
- name: TARGET_PRODUCE_PERFORMANCE
119+
value: "35"
120+
- name: TARGET_CONSUME_PERFORMANCE
121+
value: "18"
122+
- name: TARGET_CTP_PERFORMANCE
123+
value: "0.02"
124+
prologue:
125+
commands:
126+
- sudo apt-get install -y librdkafka-dev bc
127+
- export CKJS_LINKING=dynamic
128+
- export BUILD_LIBRDKAFKA=0
129+
- npm install
130+
- npx node-pre-gyp --build-from-source clean
131+
- npx node-pre-gyp --build-from-source configure
132+
- npx node-pre-gyp --build-from-source build
133+
jobs:
134+
- name: "Performance Test"
135+
commands:
136+
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
137+
- docker compose up -d && sleep 30
138+
- export NODE_OPTIONS='--max-old-space-size=1536'
139+
- cd examples/performance
140+
- npm install
141+
- ../../ci/tests/run_perf_tests.sh
142+
111143
- name: "Linux amd64: Release"
112144
dependencies: [ ]
113145
run:

ci/tests/run_perf_test.sh

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#!/bin/bash
2+
3+
testresultConfluentProducerConsumer=$(mktemp)
4+
testresultConfluentCtp=$(mktemp)
5+
testresultKjsProducerConsumer=$(mktemp)
6+
testresultKjsCtp=$(mktemp)
7+
8+
MODE=confluent MESSAGE_COUNT=500000 node performance-consolidated.js --create-topics --consumer --producer 2>&1 | tee "$testresultConfluentProducerConsumer"
9+
MODE=kafkajs MESSAGE_COUNT=500000 node performance-consolidated.js --create-topics --consumer --producer 2>&1 | tee "$testresultKjsProducerConsumer"
10+
MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp 2>&1 | tee "$testresultConfluentCtp"
11+
MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp 2>&1 | tee "$testresultKjsCtp"
12+
13+
producerConfluent=$(grep "=== Producer Rate:" "$testresultConfluentProducerConsumer" | cut -d':' -f2 | tr -d ' ')
14+
consumerConfluent=$(grep "=== Consumer Rate:" "$testresultConfluentProducerConsumer" | cut -d':' -f2 | tr -d ' ')
15+
ctpConfluent=$(grep "=== Consume-Transform-Produce Rate:" "$testresultConfluentCtp" | cut -d':' -f2 | tr -d ' ')
16+
producerKjs=$(grep "=== Producer Rate:" "$testresultKjsProducerConsumer" | cut -d':' -f2 | tr -d ' ')
17+
consumerKjs=$(grep "=== Consumer Rate:" "$testresultKjsProducerConsumer" | cut -d':' -f2 | tr -d ' ')
18+
ctpKjs=$(grep "=== Consume-Transform-Produce Rate:" "$testresultKjsCtp" | cut -d':' -f2 | tr -d ' ')
19+
20+
echo "Producer rates: confluent $producerConfluent, kafkajs $producerKjs"
21+
echo "Consumer rates: confluent $consumerConfluent, kafkajs $consumerKjs"
22+
echo "CTP rates: confluent $ctpConfluent, kafkajs $ctpKjs"
23+
24+
errcode=0
25+
26+
# Compare against KJS
27+
if [[ $(echo "$producerConfluent < $producerKjs * 70 / 100" | bc -l) -eq 1 ]]; then
28+
echo "Producer rates differ by more than 30%: confluent $producerConfluent, kafkajs $producerKjs"
29+
errcode=1
30+
fi
31+
32+
if [[ $(echo "$consumerConfluent < $consumerKjs * 70 / 100" | bc -l) -eq 1 ]]; then
33+
echo "Consumer rates differ by more than 30%: confluent $consumerConfluent, kafkajs $consumerKjs"
34+
errcode=1
35+
fi
36+
37+
if [[ $(echo "$ctpConfluent < $ctpKjs * 70 / 100" | bc -l) -eq 1 ]]; then
38+
echo "CTP rates differ by more than 30%: confluent $ctpConfluent, kafkajs $ctpKjs"
39+
errcode=1
40+
fi
41+
42+
# Compare against numbers set within semaphore config
43+
TARGET_PRODUCE="${TARGET_PRODUCE_PERFORMANCE:-35}"
44+
TARGET_CONSUME="${TARGET_CONSUME_PERFORMANCE:-18}"
45+
TARGET_CTP="${TARGET_CTP_PERFORMANCE:-0.02}"
46+
47+
if [[ $(echo "$producerConfluent < $TARGET_PRODUCE" | bc -l) -eq 1 ]]; then
48+
echo "Confluent producer rate is below target: $producerConfluent"
49+
errcode=1
50+
fi
51+
52+
if [[ $(echo "$consumerConfluent < $TARGET_CONSUME" | bc -l) -eq 1 ]]; then
53+
echo "Confluent consumer rate is below target: $consumerConfluent"
54+
errcode=1
55+
fi
56+
57+
if [[ $(echo "$ctpConfluent < $TARGET_CTP" | bc -l) -eq 1 ]]; then
58+
echo "Confluent CTP rate is below target: $ctpConfluent"
59+
errcode=1
60+
fi
61+
62+
exit $errcode
63+

examples/performance/README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Performance Benchmarking
2+
3+
The library can be benchmarked by running the following command:
4+
5+
```bash
6+
node performance-consolidated.js [--producer] [--consumer] [--ctp] [--all]
7+
```
8+
9+
The `--producer` flag will run the producer benchmark, the `--consumer` flag
10+
will run the consumer benchmark, and the `--ctp` flag will run the
11+
consume-transform-produce benchmark.
12+
13+
The `--create-topics` flag will create the topics before running the benchmarks
14+
(and delete any existing topics of the same name). It's recommended to use this
15+
unless the number of partitions or replication factor needs to be changed.
16+
17+
If no flags are provided, no benchmarks will be run. If the `--all` flag is
18+
provided, all benchmarks will be run ignoring any other flags.
19+
20+
The benchmarks assume topics are already created (unless usig `--create-topics`).
21+
The consumer benchmark assumes that the topic already has at least `MESSAGE_COUNT` messages within,
22+
which can generally be done by running the producer benchmark along with it.
23+
24+
The following environment variables can be set to configure the benchmark, with
25+
default values given in parentheses.
26+
27+
| Variable | Description | Default |
28+
|----------|-------------|---------|
29+
| KAFKA_BROKERS | Kafka brokers to connect to | localhost:9092 |
30+
| KAFKA_TOPIC | Kafka topic to produce to/consume from | test-topic |
31+
| KAFKA_TOPIC2 | Kafka topic to produce to after consumption in consume-transform-produce | test-topic2 |
32+
| MESSAGE_COUNT | Number of messages to produce/consume | 1000000 |
33+
| MESSAGE_SIZE | Size of each message in bytes | 256 |
34+
| BATCH_SIZE | Number of messages to produce in a single batch | 100 |
35+
| COMPRESSION | Compression codec to use (None, GZIP, Snappy, LZ4, ZSTD) | None |
36+
| WARMUP_MESSAGES | Number of messages to produce before starting the produce benchmark | BATCH_SIZE * 10 |
37+
| MESSAGE_PROCESS_TIME_MS | Time to sleep after consuming each message in the consume-transform-produce benchmark. Simulates "transform". May be 0. | 5 |
38+
| CONSUME_TRANSFORM_PRODUCE_CONCURRENCY | partitionsConsumedConcurrently for the consume-transform-produce benchmark | 1 |
39+
| MODE | Mode to run the benchmarks in (confluent, kafkajs). Can be used for comparison with KafkaJS | confluent |

examples/performance/performance-consolidated.js

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,37 @@
1-
const { runProducer, runConsumer, runConsumeTransformProduce } = require('./performance-primitives');
1+
const mode = process.env.MODE ? process.env.MODE : 'confluent';
22

3-
const { CompressionTypes } = require('../../').KafkaJS;
3+
let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics;
4+
if (mode === 'confluent') {
5+
({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics } = require('./performance-primitives'));
6+
} else {
7+
({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics } = require('./performance-primitives-kafkajs'));
8+
}
49

510
const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
611
const topic = process.env.KAFKA_TOPIC || 'test-topic';
712
const topic2 = process.env.KAFKA_TOPIC2 || 'test-topic2';
813
const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 1000000;
914
const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 256;
1015
const batchSize = process.env.BATCH_SIZE ? +process.env.BATCH_SIZE : 100;
11-
const compression = process.env.COMPRESSION || CompressionTypes.NONE;
16+
const compression = process.env.COMPRESSION || 'None';
1217
const warmupMessages = process.env.WARMUP_MESSAGES ? +process.env.WARMUP_MESSAGES : (batchSize * 10);
18+
const messageProcessTimeMs = process.env.MESSAGE_PROCESS_TIME_MS ? +process.env.MESSAGE_PROCESS_TIME_MS : 5;
19+
const ctpConcurrency = process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? +process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY : 1;
1320

1421
(async function () {
1522
const producer = process.argv.includes('--producer');
1623
const consumer = process.argv.includes('--consumer');
1724
const ctp = process.argv.includes('--ctp');
1825
const all = process.argv.includes('--all');
26+
const createTopics = process.argv.includes('--create-topics');
27+
28+
if (createTopics || all) {
29+
console.log("=== Creating Topics (deleting if they exist already):");
30+
console.log(` Brokers: ${brokers}`);
31+
console.log(` Topic: ${topic}`);
32+
console.log(` Topic2: ${topic2}`);
33+
await runCreateTopics(brokers, topic, topic2);
34+
}
1935

2036
if (producer || all) {
2137
console.log("=== Running Basic Producer Performance Test:")
@@ -48,7 +64,7 @@ const warmupMessages = process.env.WARMUP_MESSAGES ? +process.env.WARMUP_MESSAGE
4864
console.log(` Message Count: ${messageCount}`);
4965
// Seed the topic with messages
5066
await runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
51-
const ctpRate = await runConsumeTransformProduce(brokers, topic, topic2, messageCount);
67+
const ctpRate = await runConsumeTransformProduce(brokers, topic, topic2, messageCount, messageProcessTimeMs, ctpConcurrency);
5268
console.log("=== Consume-Transform-Produce Rate: ", ctpRate);
5369
}
5470

0 commit comments

Comments
 (0)