Skip to content

Commit f781cab

Browse files
authored
Reduce poll timeout and add wakeups for new queue messages (#135)
* Add performance tests for produce to consume latency * Reduce poll timeout and add wakeups for new queue messages
1 parent e1bd8e8 commit f781cab

11 files changed

+319
-26
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ v0.4.0 is a limited availability feature release. It is supported for all usage.
66

77
1. Fixes an issue where headers were not passed correctly to the `eachBatch` callback (#130).
88
2. Add support for an Admin API to list a consumer group's offsets (#49).
9+
3. Reduce consumer poll timeout to nil and add wakeups for new messages. This improves
10+
the consumer efficiency, and resolves issues while running multiple consumers within
11+
the same node process (#135).
912

1013

1114
# confluent-kafka-javascript v0.3.0

examples/performance/README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
The library can be benchmarked by running the following command:
44

55
```bash
6-
node performance-consolidated.js [--producer] [--consumer] [--ctp] [--all]
6+
node performance-consolidated.js [--producer] [--consumer] [--ctp] [--latency] [--all]
77
```
88

99
The `--producer` flag will run the producer benchmark, the `--consumer` flag
1010
will run the consumer benchmark, and the `--ctp` flag will run the
11-
consume-transform-produce benchmark.
11+
consume-transform-produce benchmark. The `--latency` flag will run the latency
12+
test for produce-to-consume latency.
1213

1314
The `--create-topics` flag will create the topics before running the benchmarks
1415
(and delete any existing topics of the same name). It's recommended to use this
@@ -36,4 +37,6 @@ default values given in parentheses.
3637
| WARMUP_MESSAGES | Number of messages to produce before starting the produce benchmark | BATCH_SIZE * 10 |
3738
| MESSAGE_PROCESS_TIME_MS | Time to sleep after consuming each message in the consume-transform-produce benchmark. Simulates "transform". May be 0. | 5 |
3839
| CONSUME_TRANSFORM_PRODUCE_CONCURRENCY | partitionsConsumedConcurrently for the consume-transform-produce benchmark | 1 |
40+
| CONSUMER_PROCESSING_TIME | Time to sleep (ms) after consuming each message in the latency benchmark. | 100 |
41+
| PRODUCER_PROCESSING_TIME | Time to sleep (ms) after producing each message in the latency benchmark. | 100 |
3942
| MODE | Mode to run the benchmarks in (confluent, kafkajs). Can be used for comparison with KafkaJS | confluent |

examples/performance/performance-consolidated.js

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
const mode = process.env.MODE ? process.env.MODE : 'confluent';
22

3-
let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics;
3+
let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether;
44
if (mode === 'confluent') {
5-
({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics } = require('./performance-primitives'));
5+
({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether } = require('./performance-primitives'));
66
} else {
7-
({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics } = require('./performance-primitives-kafkajs'));
7+
({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether } = require('./performance-primitives-kafkajs'));
88
}
99

1010
const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
@@ -17,11 +17,14 @@ const compression = process.env.COMPRESSION || 'None';
1717
const warmupMessages = process.env.WARMUP_MESSAGES ? +process.env.WARMUP_MESSAGES : (batchSize * 10);
1818
const messageProcessTimeMs = process.env.MESSAGE_PROCESS_TIME_MS ? +process.env.MESSAGE_PROCESS_TIME_MS : 5;
1919
const ctpConcurrency = process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? +process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY : 1;
20+
const consumerProcessingTime = process.env.CONSUMER_PROCESSING_TIME ? +process.env.CONSUMER_PROCESSING_TIME : 100;
21+
const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.env.PRODUCER_PROCESSING_TIME : 100;
2022

2123
(async function () {
2224
const producer = process.argv.includes('--producer');
2325
const consumer = process.argv.includes('--consumer');
2426
const ctp = process.argv.includes('--ctp');
27+
const produceConsumeLatency = process.argv.includes('--latency');
2528
const all = process.argv.includes('--all');
2629
const createTopics = process.argv.includes('--create-topics');
2730

@@ -68,4 +71,20 @@ const ctpConcurrency = process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? +proc
6871
console.log("=== Consume-Transform-Produce Rate: ", ctpRate);
6972
}
7073

74+
if (produceConsumeLatency || all) {
75+
console.log("=== Running Produce-To-Consume Latency Performance Test:")
76+
console.log(` Brokers: ${brokers}`);
77+
console.log(` Topic: ${topic}`);
78+
console.log(` Message Count: ${messageCount}`);
79+
console.log(` Consumer Processing Time: ${consumerProcessingTime}`);
80+
console.log(` Producer Processing Time: ${producerProcessingTime}`);
81+
const { mean, p50, p90, p95, outliers } = await runProducerConsumerTogether(brokers, topic, messageCount, messageSize, producerProcessingTime, consumerProcessingTime);
82+
console.log(`=== Produce-To-Consume Latency (ms): Mean: ${mean}, P50: ${p50}, P90: ${p90}, P95: ${p95}`);
83+
84+
// The presence of outliers invalidates the mean measurement, and rasies concerns as to why there are any.
85+
// Ideally, the test should not have outliers if consumer processing time is less or equal to producer processing time.
86+
if (outliers.length > 0) {
87+
console.log("=== Outliers (ms): ", outliers);
88+
}
89+
}
7190
})();

examples/performance/performance-primitives-kafkajs.js

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ module.exports = {
77
runConsumer,
88
runConsumeTransformProduce,
99
runCreateTopics,
10+
runProducerConsumerTogether,
1011
};
1112

1213
async function runCreateTopics(brokers, topic, topic2) {
@@ -130,7 +131,7 @@ async function runConsumer(brokers, topic, totalMessageCnt) {
130131
autoCommit: false,
131132
eachMessage: async ({ topic, partition, message }) => {
132133
messagesReceived++;
133-
134+
134135
if (messagesReceived >= skippedMessages) {
135136
messagesMeasured++;
136137
totalMessageSize += message.value.length;
@@ -239,3 +240,93 @@ async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, w
239240
await producer.disconnect();
240241
return rate;
241242
}
243+
244+
245+
async function runProducerConsumerTogether(brokers, topic, totalMessageCnt, msgSize, produceMessageProcessTimeMs, consumeMessageProcessTimeMs) {
246+
const kafka = new Kafka({
247+
clientId: 'kafka-test-performance',
248+
brokers: brokers.split(','),
249+
});
250+
251+
const producer = kafka.producer({});
252+
await producer.connect();
253+
254+
let consumerReady = false;
255+
let consumerFinished = false;
256+
const consumer = kafka.consumer({
257+
groupId: 'test-group' + Math.random(),
258+
});
259+
await consumer.connect();
260+
await consumer.subscribe({ topic: topic, fromBeginning: true });
261+
262+
let startTime = null;
263+
let diffs = [];
264+
console.log("Starting consumer.");
265+
266+
consumer.run({
267+
autoCommit: false,
268+
eachMessage: async ({ topic, partition, message }) => {
269+
if (!consumerReady) {
270+
consumerReady = true;
271+
return;
272+
}
273+
if (startTime === null)
274+
return;
275+
let endTime = hrtime.bigint();
276+
diffs.push(endTime - startTime);
277+
await new Promise(resolve => setTimeout(resolve, consumeMessageProcessTimeMs));
278+
if (diffs.length >= totalMessageCnt) {
279+
consumerFinished = true;
280+
}
281+
}
282+
});
283+
284+
const message = {
285+
value: randomBytes(msgSize),
286+
}
287+
288+
// Don't initialize startTime here, the first message includes the metadata
289+
// request and isn't representative of latency measurements.
290+
await producer.send({
291+
topic,
292+
messages: [message],
293+
});
294+
// We don't want this to show up at all for our measurements, so make sure the
295+
// consumer processes this and ignores it before proceeding.
296+
await new Promise(resolve => setTimeout(resolve, 1000));
297+
298+
while(!consumerReady) {
299+
await new Promise(resolve => setTimeout(resolve, 1000));
300+
}
301+
302+
303+
console.log("Starting producer.");
304+
305+
for (let i = 0; i < totalMessageCnt; i++) {
306+
startTime = hrtime.bigint();
307+
await producer.send({
308+
topic,
309+
messages: [message],
310+
});
311+
await new Promise(resolve => setTimeout(resolve, produceMessageProcessTimeMs));
312+
}
313+
314+
while (!consumerFinished) {
315+
await new Promise(resolve => setTimeout(resolve, 1000));
316+
}
317+
318+
console.log("Consumer finished.");
319+
320+
await consumer.disconnect();
321+
await producer.disconnect();
322+
323+
const nanoDiffs = diffs.map(d => parseInt(d));
324+
const sortedDiffs = nanoDiffs.sort((a, b) => a - b);
325+
const p50 = sortedDiffs[Math.floor(sortedDiffs.length / 2)] / 1e6;
326+
const p90 = sortedDiffs[Math.floor(sortedDiffs.length * 0.9)] / 1e6;
327+
const p95 = sortedDiffs[Math.floor(sortedDiffs.length * 0.95)] / 1e6;
328+
const mean = nanoDiffs.reduce((acc, d) => acc + d, 0) / nanoDiffs.length / 1e6;
329+
// Count outliers: elements 10x or more than the p50.
330+
const outliers = sortedDiffs.map(d => d/1e6).filter(d => (d) > (10 * p50));
331+
return { mean, p50, p90, p95, outliers };
332+
}

examples/performance/performance-primitives.js

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ module.exports = {
77
runConsumer,
88
runConsumeTransformProduce,
99
runCreateTopics,
10+
runProducerConsumerTogether,
1011
};
1112

1213
async function runCreateTopics(brokers, topic, topic2) {
@@ -137,7 +138,7 @@ async function runConsumer(brokers, topic, totalMessageCnt) {
137138
consumer.run({
138139
eachMessage: async ({ topic, partition, message }) => {
139140
messagesReceived++;
140-
141+
141142
if (messagesReceived >= skippedMessages) {
142143
messagesMeasured++;
143144
totalMessageSize += message.value.length;
@@ -170,7 +171,6 @@ async function runConsumer(brokers, topic, totalMessageCnt) {
170171
}
171172

172173
async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, warmupMessages, totalMessageCnt, messageProcessTimeMs, ctpConcurrency) {
173-
console.log("here");
174174
const kafka = new Kafka({
175175
'client.id': 'kafka-test-performance',
176176
'metadata.broker.list': brokers,
@@ -219,7 +219,7 @@ async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, w
219219

220220
if (messagesReceived === skippedMessages)
221221
startTime = hrtime();
222-
222+
223223
/* Simulate message processing for messageProcessTimeMs */
224224
if (messageProcessTimeMs > 0) {
225225
await new Promise((resolve) => setTimeout(resolve, messageProcessTimeMs));
@@ -259,3 +259,99 @@ async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, w
259259
await producer.disconnect();
260260
return rate;
261261
}
262+
263+
async function runProducerConsumerTogether(brokers, topic, totalMessageCnt, msgSize, produceMessageProcessTimeMs, consumeMessageProcessTimeMs) {
264+
const kafka = new Kafka({
265+
'client.id': 'kafka-test-performance',
266+
'metadata.broker.list': brokers,
267+
});
268+
269+
const producer = kafka.producer({
270+
/* We want things to be flushed immediately as we'll be awaiting this. */
271+
'linger.ms': 0,
272+
});
273+
await producer.connect();
274+
275+
let consumerReady = false;
276+
let consumerFinished = false;
277+
const consumer = kafka.consumer({
278+
'group.id': 'test-group' + Math.random(),
279+
'enable.auto.commit': false,
280+
'auto.offset.reset': 'earliest',
281+
rebalance_cb: function(err) {
282+
if (err.code !== ErrorCodes.ERR__ASSIGN_PARTITIONS) return;
283+
if (!consumerReady) {
284+
consumerReady = true;
285+
console.log("Consumer ready.");
286+
}
287+
}
288+
});
289+
await consumer.connect();
290+
await consumer.subscribe({ topic: topic });
291+
292+
let startTime = null;
293+
let diffs = [];
294+
console.log("Starting consumer.");
295+
296+
consumer.run({
297+
eachMessage: async ({ topic, partition, message }) => {
298+
if (startTime === null)
299+
return;
300+
let endTime = hrtime.bigint();
301+
diffs.push(endTime - startTime);
302+
await new Promise(resolve => setTimeout(resolve, consumeMessageProcessTimeMs));
303+
if (diffs.length >= totalMessageCnt) {
304+
consumerFinished = true;
305+
}
306+
}
307+
});
308+
309+
while(!consumerReady) {
310+
await new Promise(resolve => setTimeout(resolve, 1000));
311+
}
312+
313+
const message = {
314+
value: randomBytes(msgSize),
315+
}
316+
317+
// Don't initialize startTime here, the first message includes the metadata
318+
// request and isn't representative of latency measurements.
319+
await producer.send({
320+
topic,
321+
messages: [message],
322+
});
323+
// We don't want this to show up at all for our measurements, so make sure the
324+
// consumer processes this and ignores it before proceeding.
325+
await new Promise(resolve => setTimeout(resolve, 1000));
326+
327+
console.log("Starting producer.");
328+
329+
for (let i = 0; i < totalMessageCnt; i++) {
330+
startTime = hrtime.bigint();
331+
await producer.send({
332+
topic,
333+
messages: [message],
334+
});
335+
await new Promise(resolve => setTimeout(resolve, produceMessageProcessTimeMs));
336+
}
337+
338+
while (!consumerFinished) {
339+
await new Promise(resolve => setTimeout(resolve, 1000));
340+
}
341+
342+
console.log("Consumer finished.");
343+
344+
await consumer.disconnect();
345+
await producer.disconnect();
346+
347+
const nanoDiffs = diffs.map(d => parseInt(d));
348+
const sortedDiffs = nanoDiffs.sort((a, b) => a - b);
349+
const p50 = sortedDiffs[Math.floor(sortedDiffs.length / 2)] / 1e6;
350+
const p90 = sortedDiffs[Math.floor(sortedDiffs.length * 0.9)] / 1e6;
351+
const p95 = sortedDiffs[Math.floor(sortedDiffs.length * 0.95)] / 1e6;
352+
const mean = nanoDiffs.reduce((acc, d) => acc + d, 0) / nanoDiffs.length / 1e6;
353+
// Count outliers: elements 10x or more than the p50. My choice of what an
354+
// outlier is defined as, is arbitrary.
355+
const outliers = sortedDiffs.map(d => d/1e6).filter(d => (d) > (10 * p50));
356+
return { mean, p50, p90, p95, outliers };
357+
}

lib/kafka-consumer.js

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -120,21 +120,10 @@ function KafkaConsumer(conf, topicConf) {
120120
};
121121
}
122122

123-
/**
124-
* KafkaConsumer message.
125-
*
126-
* This is the representation of a message read from Kafka.
127-
*
128-
* @typedef {object} KafkaConsumer~Message
129-
* @property {buffer} value - the message buffer from Kafka.
130-
* @property {string} topic - the topic name
131-
* @property {number} partition - the partition on the topic the
132-
* message was on
133-
* @property {number} offset - the offset of the message
134-
* @property {string} key - the message key
135-
* @property {number} size - message size, in bytes.
136-
* @property {number} timestamp - message timestamp
137-
*/
123+
// Note: This configuration is for internal use for now, and hence is not documented, or
124+
// exposed via types.
125+
const queue_non_empty_cb = conf.queue_non_empty_cb || null;
126+
delete conf.queue_non_empty_cb;
138127

139128
Client.call(this, conf, Kafka.KafkaConsumer, topicConf);
140129

@@ -144,6 +133,10 @@ function KafkaConsumer(conf, topicConf) {
144133
this._consumeTimeout = DEFAULT_CONSUME_TIME_OUT;
145134
this._consumeLoopTimeoutDelay = DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY;
146135
this._consumeIsTimeoutOnlyForFirstMessage = DEFAULT_IS_TIMEOUT_ONLY_FOR_FIRST_MESSAGE;
136+
137+
if (queue_non_empty_cb) {
138+
this._cb_configs.event.queue_non_empty_cb = queue_non_empty_cb;
139+
}
147140
}
148141

149142
/**

0 commit comments

Comments
 (0)