Skip to content

Commit 39bbd4c

Browse files
committed
Run performance tests with SASL_PLAINTEXT or SASL_SSL and plain authentication
1 parent 7cd3c3f commit 39bbd4c

File tree

3 files changed

+95
-63
lines changed

3 files changed

+95
-63
lines changed

examples/performance/performance-consolidated.js

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ if (mode === 'confluent') {
99
}
1010

1111
const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
12+
const securityProtocol = process.env.SECURITY_PROTOCOL;
13+
const saslUsername = process.env.SASL_USERNAME;
14+
const saslPassword = process.env.SASL_PASSWORD;
1215
const topic = process.env.KAFKA_TOPIC || 'test-topic';
1316
const topic2 = process.env.KAFKA_TOPIC2 || 'test-topic2';
1417
const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 1000000;
@@ -20,6 +23,23 @@ const messageProcessTimeMs = process.env.MESSAGE_PROCESS_TIME_MS ? +process.env.
2023
const ctpConcurrency = process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? +process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY : 1;
2124
const consumerProcessingTime = process.env.CONSUMER_PROCESSING_TIME ? +process.env.CONSUMER_PROCESSING_TIME : 100;
2225
const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.env.PRODUCER_PROCESSING_TIME : 100;
26+
const parameters = {
27+
brokers,
28+
securityProtocol,
29+
saslUsername,
30+
saslPassword,
31+
}
32+
33+
function logParameters(parameters) {
34+
console.log(` Brokers: ${parameters.brokers}`);
35+
if (parameters.securityProtocol && parameters.saslUsername && parameters.saslPassword) {
36+
console.log(` Security Protocol: ${parameters.securityProtocol}`);
37+
console.log(` SASL Username: ${parameters.saslUsername ? parameters.saslUsername : 'not set'}`);
38+
console.log(` SASL Password: ${parameters.saslPassword ? '******' : 'not set'}`);
39+
} else {
40+
console.log(" No security protocol configured");
41+
}
42+
}
2343

2444
(async function () {
2545
const producer = process.argv.includes('--producer');
@@ -69,35 +89,35 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
6989

7090
if (createTopics || all) {
7191
console.log("=== Creating Topics (deleting if they exist already):");
72-
console.log(` Brokers: ${brokers}`);
92+
logParameters(parameters);
7393
console.log(` Topic: ${topic}`);
7494
console.log(` Topic2: ${topic2}`);
75-
await runCreateTopics(brokers, topic, topic2);
95+
await runCreateTopics(parameters, topic, topic2);
7696
}
7797

7898
if (producer || all) {
7999
console.log("=== Running Basic Producer Performance Test:")
80-
console.log(` Brokers: ${brokers}`);
100+
logParameters(parameters);
81101
console.log(` Topic: ${topic}`);
82102
console.log(` Message Count: ${messageCount}`);
83103
console.log(` Message Size: ${messageSize}`);
84104
console.log(` Batch Size: ${batchSize}`);
85105
console.log(` Compression: ${compression}`);
86106
console.log(` Warmup Messages: ${warmupMessages}`);
87107
startTrackingMemory();
88-
const producerRate = await runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
108+
const producerRate = await runProducer(parameters, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
89109
endTrackingMemory(`producer-memory-${mode}.json`);
90110
console.log("=== Producer Rate: ", producerRate);
91111
}
92112

93113
if (consumer || all) {
94114
// If user runs this without --producer then they are responsible for seeding the topic.
95115
console.log("=== Running Basic Consumer Performance Test (eachMessage):")
96-
console.log(` Brokers: ${brokers}`);
116+
logParameters(parameters);
97117
console.log(` Topic: ${topic}`);
98118
console.log(` Message Count: ${messageCount}`);
99119
startTrackingMemory();
100-
const consumerRate = await runConsumer(brokers, topic, warmupMessages, messageCount, false, stats);
120+
const consumerRate = await runConsumer(parameters, topic, warmupMessages, messageCount, false, stats);
101121
endTrackingMemory(`consumer-memory-message-${mode}.json`);
102122
console.log("=== Consumer Rate (eachMessage): ", consumerRate);
103123
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
@@ -106,11 +126,11 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
106126
if (consumer || all) {
107127
// If user runs this without --producer then they are responsible for seeding the topic.
108128
console.log("=== Running Basic Consumer Performance Test (eachBatch):")
109-
console.log(` Brokers: ${brokers}`);
129+
logParameters(parameters);
110130
console.log(` Topic: ${topic}`);
111131
console.log(` Message Count: ${messageCount}`);
112132
startTrackingMemory();
113-
const consumerRate = await runConsumer(brokers, topic, warmupMessages, messageCount, true, stats);
133+
const consumerRate = await runConsumer(parameters, topic, warmupMessages, messageCount, true, stats);
114134
endTrackingMemory(`consumer-memory-batch-${mode}.json`);
115135
console.log("=== Consumer Rate (eachBatch): ", consumerRate);
116136
console.log("=== Average eachBatch lag: ", stats.averageOffsetLag);
@@ -120,27 +140,27 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
120140

121141
if (ctp || all) {
122142
console.log("=== Running Consume-Transform-Produce Performance Test:")
123-
console.log(` Brokers: ${brokers}`);
143+
logParameters(parameters);
124144
console.log(` ConsumeTopic: ${topic}`);
125145
console.log(` ProduceTopic: ${topic2}`);
126146
console.log(` Message Count: ${messageCount}`);
127147
// Seed the topic with messages
128-
await runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
148+
await runProducer(parameters, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
129149
startTrackingMemory();
130-
const ctpRate = await runConsumeTransformProduce(brokers, topic, topic2, warmupMessages, messageCount, messageProcessTimeMs, ctpConcurrency);
150+
const ctpRate = await runConsumeTransformProduce(parameters, topic, topic2, warmupMessages, messageCount, messageProcessTimeMs, ctpConcurrency);
131151
endTrackingMemory(`consume-transform-produce-${mode}.json`);
132152
console.log("=== Consume-Transform-Produce Rate: ", ctpRate);
133153
}
134154

135155
if (produceConsumeLatency || all) {
136156
console.log("=== Running Produce-To-Consume Latency Performance Test:")
137-
console.log(` Brokers: ${brokers}`);
157+
logParameters(parameters);
138158
console.log(` Topic: ${topic}`);
139159
console.log(` Message Count: ${messageCount}`);
140160
console.log(` Consumer Processing Time: ${consumerProcessingTime}`);
141161
console.log(` Producer Processing Time: ${producerProcessingTime}`);
142162
startTrackingMemory();
143-
const { mean, p50, p90, p95, outliers } = await runProducerConsumerTogether(brokers, topic, messageCount, messageSize, producerProcessingTime, consumerProcessingTime);
163+
const { mean, p50, p90, p95, outliers } = await runProducerConsumerTogether(parameters, topic, messageCount, messageSize, producerProcessingTime, consumerProcessingTime);
144164
endTrackingMemory(`producer-consumer-together-${mode}.json`);
145165
console.log(`=== Produce-To-Consume Latency (ms): Mean: ${mean}, P50: ${p50}, P90: ${p90}, P95: ${p95}`);
146166

examples/performance/performance-primitives-kafkajs.js

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,29 @@ module.exports = {
1111
runProducerConsumerTogether,
1212
};
1313

14-
async function runCreateTopics(brokers, topic, topic2) {
15-
const kafka = new Kafka({
14+
function baseConfiguration(parameters) {
15+
let ret = {
1616
clientId: 'kafka-test-performance',
17-
brokers: brokers.split(','),
18-
});
17+
brokers: parameters.brokers.split(','),
18+
};
19+
if (parameters.securityProtocol &&
20+
parameters.saslUsername &&
21+
parameters.saslPassword) {
22+
ret = {
23+
...ret,
24+
ssl: parameters.securityProtocol.toLowerCase().includes('ssl'),
25+
sasl: {
26+
mechanism: 'plain',
27+
username: parameters.saslUsername,
28+
password: parameters.saslPassword
29+
}
30+
};
31+
}
32+
return ret;
33+
}
34+
35+
async function runCreateTopics(parameters, topic, topic2) {
36+
const kafka = new Kafka(baseConfiguration(parameters));
1937

2038
const admin = kafka.admin();
2139
await admin.connect();
@@ -43,7 +61,7 @@ async function runCreateTopics(brokers, topic, topic2) {
4361
await admin.disconnect();
4462
}
4563

46-
async function runProducer(brokers, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression) {
64+
async function runProducer(parameters, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression) {
4765
let totalMessagesSent = 0;
4866
let totalBytesSent = 0;
4967

@@ -53,10 +71,7 @@ async function runProducer(brokers, topic, batchSize, warmupMessages, totalMessa
5371

5472
const messages = Array(batchSize).fill(message);
5573

56-
const kafka = new Kafka({
57-
clientId: 'kafka-test-performance',
58-
brokers: brokers.split(','),
59-
});
74+
const kafka = new Kafka(baseConfiguration(parameters));
6075

6176
const producer = kafka.producer();
6277
await producer.connect();
@@ -139,27 +154,21 @@ class CompatibleConsumer {
139154
}
140155
}
141156

142-
function newCompatibleConsumer(brokers) {
143-
const kafka = new Kafka({
144-
clientId: 'kafka-test-performance',
145-
brokers: brokers.split(','),
146-
});
157+
function newCompatibleConsumer(parameters) {
158+
const kafka = new Kafka(baseConfiguration(parameters));
147159

148160
const consumer = kafka.consumer({
149161
groupId: 'test-group' + Math.random(),
150162
});
151163
return new CompatibleConsumer(consumer);
152164
}
153165

154-
async function runConsumer(brokers, topic, warmupMessages, totalMessageCnt, eachBatch, stats) {
155-
return runConsumerCommon(newCompatibleConsumer(brokers), topic, warmupMessages, totalMessageCnt, eachBatch, stats);
166+
async function runConsumer(parameters, topic, warmupMessages, totalMessageCnt, eachBatch, stats) {
167+
return runConsumerCommon(newCompatibleConsumer(parameters), topic, warmupMessages, totalMessageCnt, eachBatch, stats);
156168
}
157169

158-
async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, warmupMessages, totalMessageCnt, messageProcessTimeMs, ctpConcurrency) {
159-
const kafka = new Kafka({
160-
clientId: 'kafka-test-performance',
161-
brokers: brokers.split(','),
162-
});
170+
async function runConsumeTransformProduce(parameters, consumeTopic, produceTopic, warmupMessages, totalMessageCnt, messageProcessTimeMs, ctpConcurrency) {
171+
const kafka = new Kafka(baseConfiguration(parameters));
163172

164173
const producer = kafka.producer({});
165174
await producer.connect();
@@ -234,11 +243,8 @@ async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, w
234243
}
235244

236245

237-
async function runProducerConsumerTogether(brokers, topic, totalMessageCnt, msgSize, produceMessageProcessTimeMs, consumeMessageProcessTimeMs) {
238-
const kafka = new Kafka({
239-
clientId: 'kafka-test-performance',
240-
brokers: brokers.split(','),
241-
});
246+
async function runProducerConsumerTogether(parameters, topic, totalMessageCnt, msgSize, produceMessageProcessTimeMs, consumeMessageProcessTimeMs) {
247+
const kafka = new Kafka(baseConfiguration(parameters));
242248

243249
const producer = kafka.producer({});
244250
await producer.connect();

examples/performance/performance-primitives.js

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,27 @@ module.exports = {
1111
runProducerConsumerTogether,
1212
};
1313

14-
async function runCreateTopics(brokers, topic, topic2) {
15-
const kafka = new Kafka({
14+
function baseConfiguration(parameters) {
15+
let ret = {
1616
'client.id': 'kafka-test-performance',
17-
"metadata.broker.list": brokers,
18-
});
17+
'metadata.broker.list': parameters.brokers,
18+
};
19+
if (parameters.securityProtocol &&
20+
parameters.saslUsername &&
21+
parameters.saslPassword) {
22+
ret = {
23+
...ret,
24+
'security.protocol': parameters.securityProtocol,
25+
'sasl.mechanism': 'plain',
26+
'sasl.username': parameters.saslUsername,
27+
'sasl.password': parameters.saslPassword,
28+
};
29+
}
30+
return ret;
31+
}
32+
33+
async function runCreateTopics(parameters, topic, topic2) {
34+
const kafka = new Kafka(baseConfiguration(parameters));
1935

2036
const admin = kafka.admin();
2137
await admin.connect();
@@ -44,7 +60,7 @@ async function runCreateTopics(brokers, topic, topic2) {
4460
await admin.disconnect();
4561
}
4662

47-
async function runProducer(brokers, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression) {
63+
async function runProducer(parameters, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression) {
4864
let totalMessagesSent = 0;
4965
let totalBytesSent = 0;
5066

@@ -55,8 +71,7 @@ async function runProducer(brokers, topic, batchSize, warmupMessages, totalMessa
5571
const messages = Array(batchSize).fill(message);
5672

5773
const kafka = new Kafka({
58-
'client.id': 'kafka-test-performance',
59-
'metadata.broker.list': brokers,
74+
...baseConfiguration(parameters),
6075
'compression.codec': CompressionTypes[compression],
6176
});
6277

@@ -140,11 +155,8 @@ class CompatibleConsumer {
140155
}
141156
}
142157

143-
function newCompatibleConsumer(brokers) {
144-
const kafka = new Kafka({
145-
'client.id': 'kafka-test-performance',
146-
'metadata.broker.list': brokers,
147-
});
158+
function newCompatibleConsumer(parameters) {
159+
const kafka = new Kafka(baseConfiguration(parameters));
148160

149161
const consumer = kafka.consumer({
150162
'group.id': 'test-group' + Math.random(),
@@ -155,15 +167,12 @@ function newCompatibleConsumer(brokers) {
155167
return new CompatibleConsumer(consumer);
156168
}
157169

158-
async function runConsumer(brokers, topic, warmupMessages, totalMessageCnt, eachBatch, stats) {
159-
return runConsumerCommon(newCompatibleConsumer(brokers), topic, warmupMessages, totalMessageCnt, eachBatch, stats);
170+
async function runConsumer(parameters, topic, warmupMessages, totalMessageCnt, eachBatch, stats) {
171+
return runConsumerCommon(newCompatibleConsumer(parameters), topic, warmupMessages, totalMessageCnt, eachBatch, stats);
160172
}
161173

162-
async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, warmupMessages, totalMessageCnt, messageProcessTimeMs, ctpConcurrency) {
163-
const kafka = new Kafka({
164-
'client.id': 'kafka-test-performance',
165-
'metadata.broker.list': brokers,
166-
});
174+
async function runConsumeTransformProduce(parameters, consumeTopic, produceTopic, warmupMessages, totalMessageCnt, messageProcessTimeMs, ctpConcurrency) {
175+
const kafka = new Kafka(baseConfiguration(parameters));
167176

168177
const producer = kafka.producer({
169178
/* We want things to be flushed immediately as we'll be awaiting this. */
@@ -249,11 +258,8 @@ async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, w
249258
return rate;
250259
}
251260

252-
async function runProducerConsumerTogether(brokers, topic, totalMessageCnt, msgSize, produceMessageProcessTimeMs, consumeMessageProcessTimeMs) {
253-
const kafka = new Kafka({
254-
'client.id': 'kafka-test-performance',
255-
'metadata.broker.list': brokers,
256-
});
261+
async function runProducerConsumerTogether(parameters, topic, totalMessageCnt, msgSize, produceMessageProcessTimeMs, consumeMessageProcessTimeMs) {
262+
const kafka = new Kafka(baseConfiguration(parameters));
257263

258264
const producer = kafka.producer({
259265
/* We want things to be flushed immediately as we'll be awaiting this. */

0 commit comments

Comments
 (0)