diff --git a/e2e/both.spec.js b/e2e/both.spec.js index 1b53840e..9e758200 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -295,6 +295,62 @@ describe('Consumer/Producer', function() { }); }); + describe('Timeout Shared By Batch - Behavior', function() { + it('should debounce batch return when bool is false: consumeNum', function(done) { + crypto.randomBytes(4096, function(_ex, buffer) { + consumer.setDefaultIsTimeoutSharedByBatch(false); + consumer.setDefaultConsumeTimeout(5000); + + producer.setPollInterval(10); + producer.once('delivery-report', function(err, _report) { + t.ifError(err); + }); + + consumer.subscribe([topic]); + + let messageCount = 12; + for (let i = 0; i < messageCount; i++) { + setTimeout(() => producer.produce(topic, null, buffer, null), 500 * i); // messages come in staggered under timeout + } + + const start = Date.now(); + consumer.consume(1000, function(err, messages) { + t.ifError(err); + t.equal(messages.length, messageCount, 'Consume should wait for all messages because of debounce'); + t(Date.now() - start > 6000, 'Time passed should exceed timeout') // wiggle room for time shenanigans + done(); + }); + }); + }); + + it('should return after timeout when bool is true: consumeNum', function(done) { + crypto.randomBytes(4096, function(_ex, buffer) { + consumer.setDefaultIsTimeoutSharedByBatch(true); + consumer.setDefaultConsumeTimeout(5000); + + producer.setPollInterval(10); + producer.once('delivery-report', function(err, _report) { + t.ifError(err); + }); + + consumer.subscribe([topic]); + + let messageCount = 12; + for (let i = 0; i < messageCount; i++) { + setTimeout(() => producer.produce(topic, null, buffer, null), 500 * i); // messages come in staggered under timeout + } + + const start = Date.now(); + consumer.consume(1000, function(err, messages) { + t.ifError(err); + t.notEqual(messages.length, messageCount, 'Consume should fail to get all messages because of timeout'); + t(Date.now() - start < 6000, 'Time passed should adhere to timeout') // wiggle room for time shenanigans + done(); + }); + }); + }); + }); + it('should be able to produce and consume messages: consumeLoop', function(done) { var key = 'key'; diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index 637ce368..eb5aad49 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -21,6 +21,7 @@ var shallowCopy = require('./util').shallowCopy; var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500; var DEFAULT_CONSUME_TIME_OUT = 1000; const DEFAULT_IS_TIMEOUT_ONLY_FOR_FIRST_MESSAGE = false; +const DEFAULT_IS_TIMEOUT_SHARED_BY_BATCH = false; util.inherits(KafkaConsumer, Client); /** @@ -136,6 +137,7 @@ function KafkaConsumer(conf, topicConf) { this._consumeTimeout = DEFAULT_CONSUME_TIME_OUT; this._consumeLoopTimeoutDelay = DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY; this._consumeIsTimeoutOnlyForFirstMessage = DEFAULT_IS_TIMEOUT_ONLY_FOR_FIRST_MESSAGE; + this._consumeIsTimeoutSharedByBatch = DEFAULT_IS_TIMEOUT_SHARED_BY_BATCH; if (queue_non_empty_cb) { this._cb_configs.event.queue_non_empty_cb = queue_non_empty_cb; @@ -175,6 +177,21 @@ KafkaConsumer.prototype.setDefaultIsTimeoutOnlyForFirstMessage = function(isTime this._consumeIsTimeoutOnlyForFirstMessage = isTimeoutOnlyForFirstMessage; }; +/** + * If true: + * In consume(number, cb), consume will attempt to fill the batch size until `timeoutMs` passes. + * + * If false: + * In consume(number, cb), we will wait for upto `timeoutMs` after the most recent message is ready. + * i.e. As long as messages are continuously arriving, consume will not return until the batch size is full. + * + * @param {boolean} isTimeoutSharedByBatch + * @private + */ +KafkaConsumer.prototype.setDefaultIsTimeoutSharedByBatch = function (isTimeoutSharedByBatch) { + this._consumeIsTimeoutSharedByBatch = isTimeoutSharedByBatch; +}; + /** * Get a stream representation of this KafkaConsumer. * @@ -552,7 +569,7 @@ KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) { KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) { var self = this; - this._client.consume(timeoutMs, numMessages, this._consumeIsTimeoutOnlyForFirstMessage, function(err, messages, eofEvents) { + this._client.consume(timeoutMs, numMessages, this._consumeIsTimeoutOnlyForFirstMessage, this._consumeIsTimeoutSharedByBatch, function(err, messages, eofEvents) { if (err) { err = LibrdKafkaError.create(err); if (cb) { @@ -795,4 +812,4 @@ KafkaConsumer.prototype.pause = function(topicPartitions) { * @property {number} timestamp - The timestamp of the message (in milliseconds since the epoch in UTC). * @property {number?} leaderEpoch - The leader epoch of the message if available. * @property {RdKafka.KafkaConsumer~MessageHeader[]?} headers - The headers of the message. - */ \ No newline at end of file + */ diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 4bc778d4..ea1f7a48 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -1392,7 +1392,11 @@ NAN_METHOD(KafkaConsumer::NodeConsume) { return Nan::ThrowError("Need to specify a boolean"); } - if (!info[3]->IsFunction()) { + if (!info[3]->IsBoolean()) { + return Nan::ThrowError("Need to specify a boolean"); + } + + if (!info[4]->IsFunction()) { return Nan::ThrowError("Need to specify a callback"); } @@ -1417,12 +1421,23 @@ NAN_METHOD(KafkaConsumer::NodeConsume) { isTimeoutOnlyForFirstMessage = isTimeoutOnlyForFirstMessageMaybe.FromJust(); // NOLINT } + v8::Local isTimeoutSharedByBatchBoolean = info[3].As(); // NOLINT + Nan::Maybe isTimeoutSharedByBatchMaybe = + Nan::To(isTimeoutSharedByBatchBoolean); + + bool isTimeoutSharedByBatch; + if (isTimeoutSharedByBatchMaybe.IsNothing()) { + return Nan::ThrowError("Parameter must be a boolean"); + } else { + isTimeoutSharedByBatch = isTimeoutSharedByBatchMaybe.FromJust(); // NOLINT + } + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); - v8::Local cb = info[3].As(); + v8::Local cb = info[4].As(); Nan::Callback *callback = new Nan::Callback(cb); Nan::AsyncQueueWorker( - new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, isTimeoutOnlyForFirstMessage)); // NOLINT + new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, isTimeoutOnlyForFirstMessage, isTimeoutSharedByBatch)); // NOLINT } else { if (!info[1]->IsFunction()) { diff --git a/src/workers.cc b/src/workers.cc index 4655458d..cea4c38a 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -9,6 +9,8 @@ */ #include "src/workers.h" +#include +#include #include #include @@ -799,22 +801,44 @@ KafkaConsumerConsumeNum::KafkaConsumerConsumeNum(Nan::Callback *callback, KafkaConsumer* consumer, const uint32_t & num_messages, const int & timeout_ms, - bool timeout_only_for_first_message) : + bool timeout_only_for_first_message, + bool timeout_shared_by_batch) : ErrorAwareWorker(callback), m_consumer(consumer), m_num_messages(num_messages), m_timeout_ms(timeout_ms), - m_timeout_only_for_first_message(timeout_only_for_first_message) {} + m_timeout_only_for_first_message(timeout_only_for_first_message), + m_timeout_shared_by_batch(timeout_shared_by_batch) {} KafkaConsumerConsumeNum::~KafkaConsumerConsumeNum() {} void KafkaConsumerConsumeNum::Execute() { std::size_t max = static_cast(m_num_messages); bool looping = true; - int timeout_ms = m_timeout_ms; std::size_t eof_event_count = 0; + int timeout_ms = m_timeout_ms; + int early_exit_ms = 1; + + std::chrono::steady_clock::time_point start_time; + if (m_timeout_shared_by_batch) { + start_time = std::chrono::steady_clock::now(); + } + while (m_messages.size() - eof_event_count < max && looping) { + // Allow timeout_ms = early_exit_ms to take precedence + // timeout_ms > 1 + if (m_timeout_shared_by_batch && timeout_ms > early_exit_ms) { + // Calc next single consume timeout remaining for batch + auto now = std::chrono::steady_clock::now(); + auto elapsed = + std::chrono::duration_cast(now - start_time) + .count(); + // `timeout_ms` of 0 triggers non-blocking behavior https://github.com/confluentinc/librdkafka/blob/3f52de491f8aae1d71a9a0b3f1c07bfd6df4aec3/src/rdkafka_int.h#L1189-L1190 + // This still returns ERR_TIMED_OUT if no message available + timeout_ms = std::max(0, m_timeout_ms - static_cast(elapsed)); + } + // Get a message Baton b = m_consumer->Consume(timeout_ms); if (b.err() == RdKafka::ERR_NO_ERROR) { @@ -825,7 +849,7 @@ void KafkaConsumerConsumeNum::Execute() { // If partition EOF and have consumed messages, retry with timeout 1 // This allows getting ready messages, while not waiting for new ones if (m_messages.size() > eof_event_count) { - timeout_ms = 1; + timeout_ms = early_exit_ms; } // We will only go into this code path when `enable.partition.eof` @@ -848,7 +872,7 @@ void KafkaConsumerConsumeNum::Execute() { // within the timeout but not wait if we already have one or more // messages. if (m_timeout_only_for_first_message) { - timeout_ms = 1; + timeout_ms = early_exit_ms; } break; diff --git a/src/workers.h b/src/workers.h index b9583823..fb645bb9 100644 --- a/src/workers.h +++ b/src/workers.h @@ -457,7 +457,7 @@ class KafkaConsumerSeek : public ErrorAwareWorker { class KafkaConsumerConsumeNum : public ErrorAwareWorker { public: KafkaConsumerConsumeNum(Nan::Callback*, NodeKafka::KafkaConsumer*, - const uint32_t &, const int &, bool); + const uint32_t &, const int &, bool, bool); ~KafkaConsumerConsumeNum(); void Execute(); @@ -468,6 +468,7 @@ class KafkaConsumerConsumeNum : public ErrorAwareWorker { const uint32_t m_num_messages; const int m_timeout_ms; const bool m_timeout_only_for_first_message; + const bool m_timeout_shared_by_batch; std::vector m_messages; };