Skip to content

Commit 08ecfc3

Browse files
committed
Add optimization where the size of messages in our subscription is less than the consumer cache size
1 parent 6118b66 commit 08ecfc3

File tree

5 files changed

+51
-7
lines changed

5 files changed

+51
-7
lines changed

lib/kafka-consumer.js

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ var TopicPartition = require('./topic-partition');
2020
var shallowCopy = require('./util').shallowCopy;
2121
var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500;
2222
var DEFAULT_CONSUME_TIME_OUT = 1000;
23+
const DEFAULT_IS_TIMEOUT_ONLY_FOR_FIRST_MESSAGE = false;
2324
util.inherits(KafkaConsumer, Client);
2425

2526
/**
@@ -142,6 +143,7 @@ function KafkaConsumer(conf, topicConf) {
142143

143144
this._consumeTimeout = DEFAULT_CONSUME_TIME_OUT;
144145
this._consumeLoopTimeoutDelay = DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY;
146+
this._consumeIsTimeoutOnlyForFirstMessage = DEFAULT_IS_TIMEOUT_ONLY_FOR_FIRST_MESSAGE;
145147
}
146148

147149
/**
@@ -160,6 +162,20 @@ KafkaConsumer.prototype.setDefaultConsumeLoopTimeoutDelay = function(intervalMs)
160162
this._consumeLoopTimeoutDelay = intervalMs;
161163
};
162164

165+
/**
166+
* If true:
167+
* In consume(number, cb), we will wait for `timeoutMs` for the first message to be fetched.
168+
* Subsequent messages will not be waited for and will be fetched (upto `number`) if already ready.
169+
*
170+
* If false:
171+
* In consume(number, cb), we will wait for upto `timeoutMs` for each message to be fetched.
172+
*
173+
* @param {boolean} isTimeoutOnlyForFirstMessage
174+
*/
175+
KafkaConsumer.prototype.setDefaultIsTimeoutOnlyForFirstMessage = function(isTimeoutOnlyForFirstMessage) {
176+
this._consumeIsTimeoutOnlyForFirstMessage = isTimeoutOnlyForFirstMessage;
177+
};
178+
163179
/**
164180
* Get a stream representation of this KafkaConsumer
165181
*
@@ -512,7 +528,7 @@ KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) {
512528
KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) {
513529
var self = this;
514530

515-
this._client.consume(timeoutMs, numMessages, function(err, messages, eofEvents) {
531+
this._client.consume(timeoutMs, numMessages, this._consumeIsTimeoutOnlyForFirstMessage, function(err, messages, eofEvents) {
516532
if (err) {
517533
err = LibrdKafkaError.create(err);
518534
if (cb) {

lib/kafkajs/_consumer.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,9 @@ class Consumer {
573573
}
574574
this.#state = ConsumerState.CONNECTED;
575575

576+
/* Slight optimization for cases where the size of messages in our subscription is less than the cache size. */
577+
this.#internalClient.setDefaultIsTimeoutOnlyForFirstMessage(false);
578+
576579
// Resolve the promise.
577580
this.#connectPromiseFunc['resolve']();
578581
}

src/kafka-consumer.cc

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,7 +1330,11 @@ NAN_METHOD(KafkaConsumer::NodeConsume) {
13301330
}
13311331

13321332
if (info[1]->IsNumber()) {
1333-
if (!info[2]->IsFunction()) {
1333+
if (!info[2]->IsBoolean()) {
1334+
return Nan::ThrowError("Need to specify a boolean");
1335+
}
1336+
1337+
if (!info[3]->IsFunction()) {
13341338
return Nan::ThrowError("Need to specify a callback");
13351339
}
13361340

@@ -1344,12 +1348,22 @@ NAN_METHOD(KafkaConsumer::NodeConsume) {
13441348
numMessages = numMessagesMaybe.FromJust();
13451349
}
13461350

1351+
v8::Local<v8::Boolean> isTimeoutOnlyForFirstMessageBoolean = info[2].As<v8::Boolean>();
1352+
Nan::Maybe<bool> isTimeoutOnlyForFirstMessageMaybe = Nan::To<bool>(isTimeoutOnlyForFirstMessageBoolean); // NOLINT
1353+
1354+
bool isTimeoutOnlyForFirstMessage;
1355+
if (isTimeoutOnlyForFirstMessageMaybe.IsNothing()) {
1356+
return Nan::ThrowError("Parameter must be a boolean");
1357+
} else {
1358+
isTimeoutOnlyForFirstMessage = isTimeoutOnlyForFirstMessageMaybe.FromJust();
1359+
}
1360+
13471361
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
13481362

1349-
v8::Local<v8::Function> cb = info[2].As<v8::Function>();
1363+
v8::Local<v8::Function> cb = info[3].As<v8::Function>();
13501364
Nan::Callback *callback = new Nan::Callback(cb);
13511365
Nan::AsyncQueueWorker(
1352-
new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms)); // NOLINT
1366+
new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, isTimeoutOnlyForFirstMessage)); // NOLINT
13531367

13541368
} else {
13551369
if (!info[1]->IsFunction()) {

src/workers.cc

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -808,11 +808,13 @@ void KafkaConsumerConsumeLoop::HandleErrorCallback() {
808808
KafkaConsumerConsumeNum::KafkaConsumerConsumeNum(Nan::Callback *callback,
809809
KafkaConsumer* consumer,
810810
const uint32_t & num_messages,
811-
const int & timeout_ms) :
811+
const int & timeout_ms,
812+
bool timeout_only_for_first_message) :
812813
ErrorAwareWorker(callback),
813814
m_consumer(consumer),
814815
m_num_messages(num_messages),
815-
m_timeout_ms(timeout_ms) {}
816+
m_timeout_ms(timeout_ms),
817+
m_timeout_only_for_first_message(timeout_only_for_first_message) {}
816818

817819
KafkaConsumerConsumeNum::~KafkaConsumerConsumeNum() {}
818820

@@ -849,6 +851,14 @@ void KafkaConsumerConsumeNum::Execute() {
849851
break;
850852
case RdKafka::ERR_NO_ERROR:
851853
m_messages.push_back(b.data<RdKafka::Message*>());
854+
855+
// This allows getting ready messages, while not waiting for new ones.
856+
// This is useful when we want to get the as many messages as possible
857+
// within the timeout but not wait if we already have one or more messages.
858+
if (m_timeout_only_for_first_message) {
859+
timeout_ms = 1;
860+
}
861+
852862
break;
853863
default:
854864
// Set the error for any other errors and break

src/workers.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ class KafkaConsumerSeek : public ErrorAwareWorker {
441441
class KafkaConsumerConsumeNum : public ErrorAwareWorker {
442442
public:
443443
KafkaConsumerConsumeNum(Nan::Callback*, NodeKafka::KafkaConsumer*,
444-
const uint32_t &, const int &);
444+
const uint32_t &, const int &, bool);
445445
~KafkaConsumerConsumeNum();
446446

447447
void Execute();
@@ -451,6 +451,7 @@ class KafkaConsumerConsumeNum : public ErrorAwareWorker {
451451
NodeKafka::KafkaConsumer * m_consumer;
452452
const uint32_t m_num_messages;
453453
const int m_timeout_ms;
454+
const bool m_timeout_only_for_first_message;
454455
std::vector<RdKafka::Message*> m_messages;
455456
};
456457

0 commit comments

Comments
 (0)