Skip to content

Commit 357cdb3

Browse files
committed
Add Producer poll from librdkafka background thread
1 parent bda98c6 commit 357cdb3

File tree

6 files changed

+88
-32
lines changed

6 files changed

+88
-32
lines changed

MIGRATION.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,22 @@ producerRun().then(consumerRun).catch(console.error);
181181
messages: [ /* ... */ ],
182182
});
183183
```
184+
- It's recommended to send a number of messages without awaiting them, and then calling `flush` to ensure all messages are sent, rather than awaiting each message. This is more efficient.
185+
Example:
186+
```javascript
187+
const kafka = new Kafka({ kafkaJS: { /* ... */ }});
188+
const producer = kafka.producer();
189+
await producer.connect();
190+
for (/*...*/) producer.send({ /* ... */});
191+
await producer.flush({timeout: 5000});
192+
```
193+
194+
However, in case it is desired to await every message, `linger.ms` should be set to 0, to ensure that the default batching behaviour does not cause a delay in awaiting messages.
195+
Example:
196+
```javascript
197+
const kafka = new Kafka({ kafkaJS: { /* ... */ }});
198+
const producer = kafka.producer({ 'linger.ms': 0 });
199+
```
184200
185201
* A transactional producer (with a `transactionId`) set, **cannot** send messages without initiating a transaction using `producer.transaction()`.
186202

lib/kafkajs/_producer.js

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ const { kafkaJSToRdKafkaConfig,
1313
const { Consumer } = require('./_consumer');
1414
const error = require('./_error');
1515
const { Buffer } = require('buffer');
16-
const { hrtime } = require('process');
1716

1817
const ProducerState = Object.freeze({
1918
INIT: 0,
@@ -33,8 +32,6 @@ const CompressionTypes = Object.freeze({
3332
ZSTD: 'zstd',
3433
});
3534

36-
const producerPollIntervalMs = 500;
37-
3835
class Producer {
3936
/**
4037
* The config supplied by the user.
@@ -78,14 +75,6 @@ class Producer {
7875
*/
7976
#logger = new DefaultLogger();
8077

81-
/**
82-
* Stores the time of the last poll.
83-
* In case we are producing in a tight loop, the interval timer will not
84-
* fire, and we won't poll. By maintaining the last poll time, we can
85-
* poll at the end of send() and sendBatch().
86-
*/
87-
#lastPollTime = hrtime();
88-
8978
/**
9079
* @constructor
9180
* @param {import("../../types/kafkajs").ProducerConfig} kJSConfig
@@ -277,16 +266,7 @@ class Producer {
277266

278267
this.#state = ProducerState.CONNECTED;
279268

280-
/* Start a loop to poll. the queues. */
281-
const pollInterval = setInterval(() => {
282-
if (this.#state >= ProducerState.DISCONNECTING) {
283-
clearInterval(pollInterval);
284-
return;
285-
}
286-
this.#internalClient.poll();
287-
this.#lastPollTime = hrtime();
288-
}, producerPollIntervalMs);
289-
269+
this.#internalClient.setPollInBackground(true);
290270
this.#internalClient.on('delivery-report', this.#deliveryCallback.bind(this));
291271

292272
// Resolve the promise.
@@ -564,16 +544,6 @@ class Producer {
564544
reject(err);
565545
}
566546
}));
567-
568-
/* Poll if we haven't polled in a while. This can be the case if we're producing
569-
* in a tight loop without awaiting the produce. */
570-
const elapsed = hrtime(this.#lastPollTime);
571-
const elapsedInNanos = elapsed[0] * 1e9 + elapsed[1];
572-
if (elapsedInNanos > producerPollIntervalMs * 1000) {
573-
this.#lastPollTime = hrtime();
574-
this.#internalClient.poll();
575-
}
576-
577547
}
578548

579549
/* The delivery report will be handled by the delivery-report event handler, and we can simply wait for it here. */

lib/producer.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,23 @@ Producer.prototype.setPollInterval = function(interval) {
230230
return this;
231231
};
232232

233+
/**
234+
* Set automatic polling for events on the librdkafka background thread.
235+
*
236+
* This provides several advantages over `setPollInterval`, as the polling
237+
* does not happen on the event loop, but on the C thread spawned by librdkafka,
238+
* and can be more efficient for high-throughput producers.
239+
*
240+
* @param {boolean} set Whether to poll in the background or not.
241+
* @note If set = true, this will disable any polling interval set by `setPollInterval`.
242+
*/
243+
Producer.prototype.setPollInBackground = function(set) {
244+
if (set) {
245+
this.setPollInterval(0); // Clear poll interval from JS.
246+
}
247+
this._client.setPollInBackground(set);
248+
}
249+
233250
/**
234251
* Flush the producer
235252
*

src/producer.cc

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ namespace NodeKafka {
3333
Producer::Producer(Conf* gconfig, Conf* tconfig):
3434
Connection(gconfig, tconfig),
3535
m_dr_cb(),
36-
m_partitioner_cb() {
36+
m_partitioner_cb(),
37+
m_is_background_polling(false) {
3738
std::string errstr;
3839

3940
if (m_tconfig)
@@ -72,6 +73,7 @@ void Producer::Init(v8::Local<v8::Object> exports) {
7273
Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata);
7374
Nan::SetPrototypeMethod(tpl, "queryWatermarkOffsets", NodeQueryWatermarkOffsets); // NOLINT
7475
Nan::SetPrototypeMethod(tpl, "poll", NodePoll);
76+
Nan::SetPrototypeMethod(tpl, "setPollInBackground", NodeSetPollInBackground);
7577
Nan::SetPrototypeMethod(tpl, "setSaslCredentials", NodeSetSaslCredentials);
7678
Nan::SetPrototypeMethod(tpl, "setOAuthBearerToken", NodeSetOAuthBearerToken);
7779
Nan::SetPrototypeMethod(tpl, "setOAuthBearerTokenFailure",
@@ -330,9 +332,39 @@ Baton Producer::Produce(void* message, size_t size, std::string topic,
330332
}
331333

332334
void Producer::Poll() {
335+
// We're not allowed to call poll when we have forwarded the main
336+
// queue to the background queue, as that would indirectly poll
337+
// the background queue. However, that's not allowed by librdkafka.
338+
if (m_is_background_polling) {
339+
return;
340+
}
333341
m_client->poll(0);
334342
}
335343

344+
Baton Producer::SetPollInBackground(bool set) {
345+
scoped_shared_read_lock lock(m_connection_lock);
346+
rd_kafka_t* rk = this->m_client->c_ptr();
347+
if (!IsConnected()) {
348+
return Baton(RdKafka::ERR__STATE, "Producer is disconnected");
349+
}
350+
351+
if (set && !m_is_background_polling) {
352+
m_is_background_polling = true;
353+
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
354+
rd_kafka_queue_t* background_q = rd_kafka_queue_get_background(rk);
355+
rd_kafka_queue_forward(main_q, background_q);
356+
rd_kafka_queue_destroy(main_q);
357+
rd_kafka_queue_destroy(background_q);
358+
} else if (!set && m_is_background_polling) {
359+
m_is_background_polling = false;
360+
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
361+
rd_kafka_queue_forward(main_q, NULL);
362+
rd_kafka_queue_destroy(main_q);
363+
}
364+
365+
return Baton(RdKafka::ERR_NO_ERROR);
366+
}
367+
336368
void Producer::ConfigureCallback(const std::string &string_key, const v8::Local<v8::Function> &cb, bool add) {
337369
if (string_key.compare("delivery_cb") == 0) {
338370
if (add) {
@@ -686,6 +718,23 @@ NAN_METHOD(Producer::NodePoll) {
686718
}
687719
}
688720

721+
NAN_METHOD(Producer::NodeSetPollInBackground) {
722+
Nan::HandleScope scope;
723+
if (info.Length() < 1 || !info[0]->IsBoolean()) {
724+
// Just throw an exception
725+
return Nan::ThrowError(
726+
"Need to specify a boolean for setting or unsetting");
727+
}
728+
bool set = Nan::To<bool>(info[0]).FromJust();
729+
730+
Producer* producer = ObjectWrap::Unwrap<Producer>(info.This());
731+
Baton b = producer->SetPollInBackground(set);
732+
if (b.err() != RdKafka::ERR_NO_ERROR) {
733+
return Nan::ThrowError(b.errstr().c_str());
734+
}
735+
info.GetReturnValue().Set(b.ToObject());
736+
}
737+
689738
Baton Producer::Flush(int timeout_ms) {
690739
RdKafka::ErrorCode response_code;
691740
if (IsConnected()) {

src/producer.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class Producer : public Connection {
5454
Baton Connect();
5555
void Disconnect();
5656
void Poll();
57+
Baton SetPollInBackground(bool);
5758
#if RD_KAFKA_VERSION > 0x00090200
5859
Baton Flush(int timeout_ms);
5960
#endif
@@ -103,6 +104,7 @@ class Producer : public Connection {
103104
static NAN_METHOD(NodeConnect);
104105
static NAN_METHOD(NodeDisconnect);
105106
static NAN_METHOD(NodePoll);
107+
static NAN_METHOD(NodeSetPollInBackground);
106108
#if RD_KAFKA_VERSION > 0x00090200
107109
static NAN_METHOD(NodeFlush);
108110
#endif
@@ -114,6 +116,7 @@ class Producer : public Connection {
114116

115117
Callbacks::Delivery m_dr_cb;
116118
Callbacks::Partitioner m_partitioner_cb;
119+
bool m_is_background_polling;
117120
};
118121

119122
} // namespace NodeKafka

types/rdkafka.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ export class Producer extends Client<KafkaProducerEvents> {
268268
produce(topic: string, partition: NumberNullUndefined, message: MessageValue, key?: MessageKey, timestamp?: NumberNullUndefined, opaque?: any, headers?: MessageHeader[]): any;
269269

270270
setPollInterval(interval: number): this;
271+
setPollInBackground(set: boolean): void;
271272

272273
static createWriteStream(conf: ProducerGlobalConfig, topicConf: ProducerTopicConfig, streamOptions: WriteStreamOptions): ProducerStream;
273274

0 commit comments

Comments
 (0)