Skip to content

fix: Apply timeout correctly to consumeNum #330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions e2e/both.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,62 @@ describe('Consumer/Producer', function() {
});
});

describe('Timeout Shared By Batch - Behavior', function() {
it('should debounce batch return when bool is false: consumeNum', function(done) {
crypto.randomBytes(4096, function(_ex, buffer) {
consumer.setDefaultIsTimeoutSharedByBatch(false);
consumer.setDefaultConsumeTimeout(5000);

producer.setPollInterval(10);
producer.once('delivery-report', function(err, _report) {
t.ifError(err);
});

consumer.subscribe([topic]);

let messageCount = 12;
for (let i = 0; i < messageCount; i++) {
setTimeout(() => producer.produce(topic, null, buffer, null), 500 * i); // messages come in staggered under timeout
}

const start = Date.now();
consumer.consume(1000, function(err, messages) {
t.ifError(err);
t.equal(messages.length, messageCount, 'Consume should wait for all messages because of debounce');
t(Date.now() - start > 6000, 'Time passed should exceed timeout') // wiggle room for time shenanigans
done();
});
});
});

it('should return after timeout when bool is true: consumeNum', function(done) {
crypto.randomBytes(4096, function(_ex, buffer) {
consumer.setDefaultIsTimeoutSharedByBatch(true);
consumer.setDefaultConsumeTimeout(5000);

producer.setPollInterval(10);
producer.once('delivery-report', function(err, _report) {
t.ifError(err);
});

consumer.subscribe([topic]);

let messageCount = 12;
for (let i = 0; i < messageCount; i++) {
setTimeout(() => producer.produce(topic, null, buffer, null), 500 * i); // messages come in staggered under timeout
}

const start = Date.now();
consumer.consume(1000, function(err, messages) {
t.ifError(err);
t.notEqual(messages.length, messageCount, 'Consume should fail to get all messages because of timeout');
t(Date.now() - start < 6000, 'Time passed should adhere to timeout') // wiggle room for time shenanigans
done();
});
});
});
});

it('should be able to produce and consume messages: consumeLoop', function(done) {
var key = 'key';

Expand Down
21 changes: 19 additions & 2 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var shallowCopy = require('./util').shallowCopy;
var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500;
var DEFAULT_CONSUME_TIME_OUT = 1000;
const DEFAULT_IS_TIMEOUT_ONLY_FOR_FIRST_MESSAGE = false;
const DEFAULT_IS_TIMEOUT_SHARED_BY_BATCH = false;
util.inherits(KafkaConsumer, Client);

/**
Expand Down Expand Up @@ -136,6 +137,7 @@ function KafkaConsumer(conf, topicConf) {
this._consumeTimeout = DEFAULT_CONSUME_TIME_OUT;
this._consumeLoopTimeoutDelay = DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY;
this._consumeIsTimeoutOnlyForFirstMessage = DEFAULT_IS_TIMEOUT_ONLY_FOR_FIRST_MESSAGE;
this._consumeIsTimeoutSharedByBatch = DEFAULT_IS_TIMEOUT_SHARED_BY_BATCH;

if (queue_non_empty_cb) {
this._cb_configs.event.queue_non_empty_cb = queue_non_empty_cb;
Expand Down Expand Up @@ -175,6 +177,21 @@ KafkaConsumer.prototype.setDefaultIsTimeoutOnlyForFirstMessage = function(isTime
this._consumeIsTimeoutOnlyForFirstMessage = isTimeoutOnlyForFirstMessage;
};

/**
* If true:
* In consume(number, cb), consume will attempt to fill the batch size until `timeoutMs` passes.
*
* If false:
* In consume(number, cb), we will wait for upto `timeoutMs` after the most recent message is ready.
* i.e. As long as messages are continuously arriving, consume will not return until the batch size is full.
*
* @param {boolean} isTimeoutSharedByBatch
* @private
*/
KafkaConsumer.prototype.setDefaultIsTimeoutSharedByBatch = function (isTimeoutSharedByBatch) {
this._consumeIsTimeoutSharedByBatch = isTimeoutSharedByBatch;
};

/**
* Get a stream representation of this KafkaConsumer.
*
Expand Down Expand Up @@ -552,7 +569,7 @@ KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) {
KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) {
var self = this;

this._client.consume(timeoutMs, numMessages, this._consumeIsTimeoutOnlyForFirstMessage, function(err, messages, eofEvents) {
this._client.consume(timeoutMs, numMessages, this._consumeIsTimeoutOnlyForFirstMessage, this._consumeIsTimeoutSharedByBatch, function(err, messages, eofEvents) {
if (err) {
err = LibrdKafkaError.create(err);
if (cb) {
Expand Down Expand Up @@ -795,4 +812,4 @@ KafkaConsumer.prototype.pause = function(topicPartitions) {
* @property {number} timestamp - The timestamp of the message (in milliseconds since the epoch in UTC).
* @property {number?} leaderEpoch - The leader epoch of the message if available.
* @property {RdKafka.KafkaConsumer~MessageHeader[]?} headers - The headers of the message.
*/
*/
21 changes: 18 additions & 3 deletions src/kafka-consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,11 @@ NAN_METHOD(KafkaConsumer::NodeConsume) {
return Nan::ThrowError("Need to specify a boolean");
}

if (!info[3]->IsFunction()) {
if (!info[3]->IsBoolean()) {
return Nan::ThrowError("Need to specify a boolean");
}

if (!info[4]->IsFunction()) {
return Nan::ThrowError("Need to specify a callback");
}

Expand All @@ -1417,12 +1421,23 @@ NAN_METHOD(KafkaConsumer::NodeConsume) {
isTimeoutOnlyForFirstMessage = isTimeoutOnlyForFirstMessageMaybe.FromJust(); // NOLINT
}

v8::Local<v8::Boolean> isTimeoutSharedByBatchBoolean = info[3].As<v8::Boolean>(); // NOLINT
Nan::Maybe<bool> isTimeoutSharedByBatchMaybe =
Nan::To<bool>(isTimeoutSharedByBatchBoolean);

bool isTimeoutSharedByBatch;
if (isTimeoutSharedByBatchMaybe.IsNothing()) {
return Nan::ThrowError("Parameter must be a boolean");
} else {
isTimeoutSharedByBatch = isTimeoutSharedByBatchMaybe.FromJust(); // NOLINT
}

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());

v8::Local<v8::Function> cb = info[3].As<v8::Function>();
v8::Local<v8::Function> cb = info[4].As<v8::Function>();
Nan::Callback *callback = new Nan::Callback(cb);
Nan::AsyncQueueWorker(
new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, isTimeoutOnlyForFirstMessage)); // NOLINT
new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, isTimeoutOnlyForFirstMessage, isTimeoutSharedByBatch)); // NOLINT

} else {
if (!info[1]->IsFunction()) {
Expand Down
34 changes: 29 additions & 5 deletions src/workers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/
#include "src/workers.h"

#include <algorithm>
#include <chrono>
#include <string>
#include <vector>

Expand Down Expand Up @@ -799,22 +801,44 @@ KafkaConsumerConsumeNum::KafkaConsumerConsumeNum(Nan::Callback *callback,
KafkaConsumer* consumer,
const uint32_t & num_messages,
const int & timeout_ms,
bool timeout_only_for_first_message) :
bool timeout_only_for_first_message,
bool timeout_shared_by_batch) :
ErrorAwareWorker(callback),
m_consumer(consumer),
m_num_messages(num_messages),
m_timeout_ms(timeout_ms),
m_timeout_only_for_first_message(timeout_only_for_first_message) {}
m_timeout_only_for_first_message(timeout_only_for_first_message),
m_timeout_shared_by_batch(timeout_shared_by_batch) {}

KafkaConsumerConsumeNum::~KafkaConsumerConsumeNum() {}

void KafkaConsumerConsumeNum::Execute() {
std::size_t max = static_cast<std::size_t>(m_num_messages);
bool looping = true;
int timeout_ms = m_timeout_ms;
std::size_t eof_event_count = 0;

int timeout_ms = m_timeout_ms;
int early_exit_ms = 1;

std::chrono::steady_clock::time_point start_time;
if (m_timeout_shared_by_batch) {
start_time = std::chrono::steady_clock::now();
}

while (m_messages.size() - eof_event_count < max && looping) {
// Allow timeout_ms = early_exit_ms to take precedence
// timeout_ms > 1
if (m_timeout_shared_by_batch && timeout_ms > early_exit_ms) {
// Calc next single consume timeout remaining for batch
auto now = std::chrono::steady_clock::now();
auto elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time)
.count();
// `timeout_ms` of 0 triggers non-blocking behavior https://github.com/confluentinc/librdkafka/blob/3f52de491f8aae1d71a9a0b3f1c07bfd6df4aec3/src/rdkafka_int.h#L1189-L1190
// This still returns ERR_TIMED_OUT if no message available
timeout_ms = std::max(0, m_timeout_ms - static_cast<int>(elapsed));
}

// Get a message
Baton b = m_consumer->Consume(timeout_ms);
if (b.err() == RdKafka::ERR_NO_ERROR) {
Expand All @@ -825,7 +849,7 @@ void KafkaConsumerConsumeNum::Execute() {
// If partition EOF and have consumed messages, retry with timeout 1
// This allows getting ready messages, while not waiting for new ones
if (m_messages.size() > eof_event_count) {
timeout_ms = 1;
timeout_ms = early_exit_ms;
}

// We will only go into this code path when `enable.partition.eof`
Expand All @@ -848,7 +872,7 @@ void KafkaConsumerConsumeNum::Execute() {
// within the timeout but not wait if we already have one or more
// messages.
if (m_timeout_only_for_first_message) {
timeout_ms = 1;
timeout_ms = early_exit_ms;
}

break;
Expand Down
3 changes: 2 additions & 1 deletion src/workers.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ class KafkaConsumerSeek : public ErrorAwareWorker {
class KafkaConsumerConsumeNum : public ErrorAwareWorker {
public:
KafkaConsumerConsumeNum(Nan::Callback*, NodeKafka::KafkaConsumer*,
const uint32_t &, const int &, bool);
const uint32_t &, const int &, bool, bool);
~KafkaConsumerConsumeNum();

void Execute();
Expand All @@ -468,6 +468,7 @@ class KafkaConsumerConsumeNum : public ErrorAwareWorker {
const uint32_t m_num_messages;
const int m_timeout_ms;
const bool m_timeout_only_for_first_message;
const bool m_timeout_shared_by_batch;
std::vector<RdKafka::Message*> m_messages;
};

Expand Down