From 9a13db3b3f4483cce947077b684cd0467812ccb4 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 8 Jun 2025 09:51:47 -0500 Subject: [PATCH 01/17] apply timeout to batch --- src/workers.cc | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/workers.cc b/src/workers.cc index 4655458d..0eab51b5 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -9,6 +9,7 @@ */ #include "src/workers.h" +#include #include #include @@ -811,10 +812,23 @@ KafkaConsumerConsumeNum::~KafkaConsumerConsumeNum() {} void KafkaConsumerConsumeNum::Execute() { std::size_t max = static_cast(m_num_messages); bool looping = true; - int timeout_ms = m_timeout_ms; std::size_t eof_event_count = 0; + auto start_time = std::chrono::steady_clock::now(); + int timeout_ms = m_timeout_ms; + while (m_messages.size() - eof_event_count < max && looping) { + // Allow timeout_ms = 1 early exits to work + if (timeout_ms > 1) { + // Calc next single consume timeout remaining for batch + auto now = std::chrono::steady_clock::now(); + auto elapsed = + std::chrono::duration_cast(now - start_time) + .count(); + // timeout_ms of 0 triggers non-blocking behavior https://github.com/confluentinc/librdkafka/blob/3f52de491f8aae1d71a9a0b3f1c07bfd6df4aec3/src/rdkafka_int.h#L1189-L1190 + timeout_ms = std::max(1, m_timeout_ms - static_cast(elapsed)); + } + // Get a message Baton b = m_consumer->Consume(timeout_ms); if (b.err() == RdKafka::ERR_NO_ERROR) { From 7d6d811a22aa019bfc75d41884ba98f5e494efb2 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 8 Jun 2025 10:11:26 -0500 Subject: [PATCH 02/17] allow 0 timeout in calc --- src/workers.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/workers.cc b/src/workers.cc index 0eab51b5..38b4caf7 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -825,8 +825,9 @@ void KafkaConsumerConsumeNum::Execute() { auto elapsed = std::chrono::duration_cast(now - start_time) .count(); - // timeout_ms of 0 triggers non-blocking behavior https://github.com/confluentinc/librdkafka/blob/3f52de491f8aae1d71a9a0b3f1c07bfd6df4aec3/src/rdkafka_int.h#L1189-L1190 - timeout_ms = std::max(1, m_timeout_ms - static_cast(elapsed)); + // `timeout_ms` of 0 triggers non-blocking behavior https://github.com/confluentinc/librdkafka/blob/3f52de491f8aae1d71a9a0b3f1c07bfd6df4aec3/src/rdkafka_int.h#L1189-L1190 + // This still returns ERR_TIMED_OUT if no message available + timeout_ms = std::max(0, m_timeout_ms - static_cast(elapsed)); } // Get a message From adc61614a3e0a4ce8dc807017765e8e561e12ae6 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 8 Jun 2025 12:51:56 -0500 Subject: [PATCH 03/17] extract 1 into descriptive var --- src/workers.cc | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/workers.cc b/src/workers.cc index 38b4caf7..8bffcf65 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -816,10 +816,12 @@ void KafkaConsumerConsumeNum::Execute() { auto start_time = std::chrono::steady_clock::now(); int timeout_ms = m_timeout_ms; + int early_exit_ms = 1; while (m_messages.size() - eof_event_count < max && looping) { - // Allow timeout_ms = 1 early exits to work - if (timeout_ms > 1) { + // Allow timeout_ms = early_exit_ms to take precedence + // timeout_ms > 1 + if (timeout_ms > early_exit_ms) { // Calc next single consume timeout remaining for batch auto now = std::chrono::steady_clock::now(); auto elapsed = @@ -840,7 +842,7 @@ void KafkaConsumerConsumeNum::Execute() { // If partition EOF and have consumed messages, retry with timeout 1 // This allows getting ready messages, while not waiting for new ones if (m_messages.size() > eof_event_count) { - timeout_ms = 1; + timeout_ms = early_exit_ms; } // We will only go into this code path when `enable.partition.eof` @@ -863,7 +865,7 @@ void KafkaConsumerConsumeNum::Execute() { // within the timeout but not wait if we already have one or more // messages. if (m_timeout_only_for_first_message) { - timeout_ms = 1; + timeout_ms = early_exit_ms; } break; From 452f9c5d7682d3a3332c9972d459ff301fcdbcf3 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 09:30:01 -0500 Subject: [PATCH 04/17] add bool to worker consume num --- src/workers.cc | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/workers.cc b/src/workers.cc index 8bffcf65..f5158fbb 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -800,12 +800,14 @@ KafkaConsumerConsumeNum::KafkaConsumerConsumeNum(Nan::Callback *callback, KafkaConsumer* consumer, const uint32_t & num_messages, const int & timeout_ms, - bool timeout_only_for_first_message) : + bool timeout_only_for_first_message, + bool timeout_shared_by_batch) : ErrorAwareWorker(callback), m_consumer(consumer), m_num_messages(num_messages), m_timeout_ms(timeout_ms), - m_timeout_only_for_first_message(timeout_only_for_first_message) {} + m_timeout_only_for_first_message(timeout_only_for_first_message, + m_timeout_shared_by_batch) {} KafkaConsumerConsumeNum::~KafkaConsumerConsumeNum() {} @@ -814,14 +816,18 @@ void KafkaConsumerConsumeNum::Execute() { bool looping = true; std::size_t eof_event_count = 0; - auto start_time = std::chrono::steady_clock::now(); int timeout_ms = m_timeout_ms; int early_exit_ms = 1; + std::chrono::steady_clock::time_point start_time; + if (m_timeout_shared_by_batch) { + start_time = std::chrono::steady_clock::now(); + } + while (m_messages.size() - eof_event_count < max && looping) { // Allow timeout_ms = early_exit_ms to take precedence // timeout_ms > 1 - if (timeout_ms > early_exit_ms) { + if (m_timeout_shared_by_batch && timeout_ms > early_exit_ms) { // Calc next single consume timeout remaining for batch auto now = std::chrono::steady_clock::now(); auto elapsed = From 370b147a1a5e1d89344155261bd9d96a6d18af04 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 10:11:53 -0500 Subject: [PATCH 05/17] update kafka-consumer.js with bool --- lib/kafka-consumer.js | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index 637ce368..eb5aad49 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -21,6 +21,7 @@ var shallowCopy = require('./util').shallowCopy; var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500; var DEFAULT_CONSUME_TIME_OUT = 1000; const DEFAULT_IS_TIMEOUT_ONLY_FOR_FIRST_MESSAGE = false; +const DEFAULT_IS_TIMEOUT_SHARED_BY_BATCH = false; util.inherits(KafkaConsumer, Client); /** @@ -136,6 +137,7 @@ function KafkaConsumer(conf, topicConf) { this._consumeTimeout = DEFAULT_CONSUME_TIME_OUT; this._consumeLoopTimeoutDelay = DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY; this._consumeIsTimeoutOnlyForFirstMessage = DEFAULT_IS_TIMEOUT_ONLY_FOR_FIRST_MESSAGE; + this._consumeIsTimeoutSharedByBatch = DEFAULT_IS_TIMEOUT_SHARED_BY_BATCH; if (queue_non_empty_cb) { this._cb_configs.event.queue_non_empty_cb = queue_non_empty_cb; @@ -175,6 +177,21 @@ KafkaConsumer.prototype.setDefaultIsTimeoutOnlyForFirstMessage = function(isTime this._consumeIsTimeoutOnlyForFirstMessage = isTimeoutOnlyForFirstMessage; }; +/** + * If true: + * In consume(number, cb), consume will attempt to fill the batch size until `timeoutMs` passes. + * + * If false: + * In consume(number, cb), we will wait for upto `timeoutMs` after the most recent message is ready. + * i.e. As long as messages are continuously arriving, consume will not return until the batch size is full. + * + * @param {boolean} isTimeoutSharedByBatch + * @private + */ +KafkaConsumer.prototype.setDefaultIsTimeoutSharedByBatch = function (isTimeoutSharedByBatch) { + this._consumeIsTimeoutSharedByBatch = isTimeoutSharedByBatch; +}; + /** * Get a stream representation of this KafkaConsumer. * @@ -552,7 +569,7 @@ KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) { KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) { var self = this; - this._client.consume(timeoutMs, numMessages, this._consumeIsTimeoutOnlyForFirstMessage, function(err, messages, eofEvents) { + this._client.consume(timeoutMs, numMessages, this._consumeIsTimeoutOnlyForFirstMessage, this._consumeIsTimeoutSharedByBatch, function(err, messages, eofEvents) { if (err) { err = LibrdKafkaError.create(err); if (cb) { @@ -795,4 +812,4 @@ KafkaConsumer.prototype.pause = function(topicPartitions) { * @property {number} timestamp - The timestamp of the message (in milliseconds since the epoch in UTC). * @property {number?} leaderEpoch - The leader epoch of the message if available. * @property {RdKafka.KafkaConsumer~MessageHeader[]?} headers - The headers of the message. - */ \ No newline at end of file + */ From 5f4fa5f05b4757369e9015414266dc39b7c5f276 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 10:18:10 -0500 Subject: [PATCH 06/17] update arg parsing in consume num path --- src/kafka-consumer.cc | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 4bc778d4..ea1f7a48 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -1392,7 +1392,11 @@ NAN_METHOD(KafkaConsumer::NodeConsume) { return Nan::ThrowError("Need to specify a boolean"); } - if (!info[3]->IsFunction()) { + if (!info[3]->IsBoolean()) { + return Nan::ThrowError("Need to specify a boolean"); + } + + if (!info[4]->IsFunction()) { return Nan::ThrowError("Need to specify a callback"); } @@ -1417,12 +1421,23 @@ NAN_METHOD(KafkaConsumer::NodeConsume) { isTimeoutOnlyForFirstMessage = isTimeoutOnlyForFirstMessageMaybe.FromJust(); // NOLINT } + v8::Local isTimeoutSharedByBatchBoolean = info[3].As(); // NOLINT + Nan::Maybe isTimeoutSharedByBatchMaybe = + Nan::To(isTimeoutSharedByBatchBoolean); + + bool isTimeoutSharedByBatch; + if (isTimeoutSharedByBatchMaybe.IsNothing()) { + return Nan::ThrowError("Parameter must be a boolean"); + } else { + isTimeoutSharedByBatch = isTimeoutSharedByBatchMaybe.FromJust(); // NOLINT + } + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); - v8::Local cb = info[3].As(); + v8::Local cb = info[4].As(); Nan::Callback *callback = new Nan::Callback(cb); Nan::AsyncQueueWorker( - new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, isTimeoutOnlyForFirstMessage)); // NOLINT + new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, isTimeoutOnlyForFirstMessage, isTimeoutSharedByBatch)); // NOLINT } else { if (!info[1]->IsFunction()) { From 9b067c710797cebaf7830a734b7fe37cd86fc7d0 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 11:02:09 -0500 Subject: [PATCH 07/17] update headers --- src/workers.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/workers.h b/src/workers.h index b9583823..fb645bb9 100644 --- a/src/workers.h +++ b/src/workers.h @@ -457,7 +457,7 @@ class KafkaConsumerSeek : public ErrorAwareWorker { class KafkaConsumerConsumeNum : public ErrorAwareWorker { public: KafkaConsumerConsumeNum(Nan::Callback*, NodeKafka::KafkaConsumer*, - const uint32_t &, const int &, bool); + const uint32_t &, const int &, bool, bool); ~KafkaConsumerConsumeNum(); void Execute(); @@ -468,6 +468,7 @@ class KafkaConsumerConsumeNum : public ErrorAwareWorker { const uint32_t m_num_messages; const int m_timeout_ms; const bool m_timeout_only_for_first_message; + const bool m_timeout_shared_by_batch; std::vector m_messages; }; From c43e99ee4f1abad2c80d8af5337575540916a0e3 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 11:02:22 -0500 Subject: [PATCH 08/17] lint --- src/workers.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/workers.cc b/src/workers.cc index f5158fbb..5b98ad87 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -9,6 +9,7 @@ */ #include "src/workers.h" +#include #include #include #include @@ -825,7 +826,7 @@ void KafkaConsumerConsumeNum::Execute() { } while (m_messages.size() - eof_event_count < max && looping) { - // Allow timeout_ms = early_exit_ms to take precedence + // Allow timeout_ms = early_exit_ms to take precedence // timeout_ms > 1 if (m_timeout_shared_by_batch && timeout_ms > early_exit_ms) { // Calc next single consume timeout remaining for batch From 592ba1f09dd286ad1989eb363096aa6329cd9792 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 11:37:40 -0500 Subject: [PATCH 09/17] add tests --- e2e/both.spec.js | 58 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index 1b53840e..8f09405b 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -546,6 +546,64 @@ describe('Consumer/Producer', function() { }, 2000); }); + describe('Timeout Shared By Batch - Behavior', function() { + it('should debounce batch return when is false', function(done) { + crypto.randomBytes(4096, function(_ex, buffer) { + consumer.setDefaultIsTimeoutSharedByBatch(false); + consumer.setDefaultConsumeTimeout(1000); + + producer.setPollInterval(10); + producer.once('delivery-report', function(err, _report) { + t.ifError(err); + }); + + consumer.subscribe([topic]); + + let messageCount = 5; + for (let i = 0; i < messageCount; i++) { + setTimeout(producer.produce(topic, null, buffer, null), 500 * i); + } + + // Batch size large enough to not interfere with timeout test + const start = Date.now(); + consumer.consume(100, function(err, messages) { + t.ifError(err); + t.equal(messages.length, messageCount, 'Consume should wait for all messages because of debounce'); + t(Date.now() - start > 2000, 'Time passed should exceed timeout') // wiggle room for time shenanigans + done(); + }) + }) + }) + + it('should return after timeout when bool is true', function(done) { + crypto.randomBytes(4096, function (_ex, buffer) { + consumer.setDefaultIsTimeoutSharedByBatch(true); + consumer.setDefaultConsumeTimeout(1000); + + producer.setPollInterval(10); + producer.once('delivery-report', function(err, _report) { + t.ifError(err); + }); + + consumer.subscribe([topic]); + + let messageCount = 5; + for (let i = 0; i < messageCount; i++) { + setTimeout(producer.produce(topic, null, buffer, null), 500 * i); + } + + // Batch size large enough to not interfere with timeout test + consumer.consume(100, function(err, messages) { + t.ifError(err); + t.notEqual(messages.length, messageCount, 'Consume should gather less than total messageCount because of timeout'); + t(Date.now() - start < 1500, 'Time passed should adhere to timeout') // wiggle room for time shenanigans + done(); + }) + }) + + }) + }) + describe('Exceptional case - offset_commit_cb true', function() { var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex'); var consumerOpts = { From 2aa98ea848ebb86a77a554facedcbf55256978e4 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 11:38:17 -0500 Subject: [PATCH 10/17] missed word --- e2e/both.spec.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index 8f09405b..cd3853be 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -547,7 +547,7 @@ describe('Consumer/Producer', function() { }); describe('Timeout Shared By Batch - Behavior', function() { - it('should debounce batch return when is false', function(done) { + it('should debounce batch return when bool is false', function(done) { crypto.randomBytes(4096, function(_ex, buffer) { consumer.setDefaultIsTimeoutSharedByBatch(false); consumer.setDefaultConsumeTimeout(1000); From 5914c1fa29f81dfca7052cbb925106779d4c9d88 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 11:39:38 -0500 Subject: [PATCH 11/17] clean up --- e2e/both.spec.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index cd3853be..d4a8d818 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -564,8 +564,8 @@ describe('Consumer/Producer', function() { setTimeout(producer.produce(topic, null, buffer, null), 500 * i); } - // Batch size large enough to not interfere with timeout test const start = Date.now(); + // Batch size large enough to not interfere with timeout test consumer.consume(100, function(err, messages) { t.ifError(err); t.equal(messages.length, messageCount, 'Consume should wait for all messages because of debounce'); @@ -592,6 +592,7 @@ describe('Consumer/Producer', function() { setTimeout(producer.produce(topic, null, buffer, null), 500 * i); } + const start = Date.now(); // Batch size large enough to not interfere with timeout test consumer.consume(100, function(err, messages) { t.ifError(err); From 24c8cf3a20e906a5ab42cc009deed6e43b8149f0 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 11:40:15 -0500 Subject: [PATCH 12/17] last clean --- e2e/both.spec.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index d4a8d818..81cb95f0 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -576,7 +576,7 @@ describe('Consumer/Producer', function() { }) it('should return after timeout when bool is true', function(done) { - crypto.randomBytes(4096, function (_ex, buffer) { + crypto.randomBytes(4096, function(_ex, buffer) { consumer.setDefaultIsTimeoutSharedByBatch(true); consumer.setDefaultConsumeTimeout(1000); @@ -601,7 +601,6 @@ describe('Consumer/Producer', function() { done(); }) }) - }) }) From 6664c165574a65757856e19eebedef61d49996f0 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 11:59:57 -0500 Subject: [PATCH 13/17] fix args error --- src/workers.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/workers.cc b/src/workers.cc index 5b98ad87..cea4c38a 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -807,8 +807,8 @@ KafkaConsumerConsumeNum::KafkaConsumerConsumeNum(Nan::Callback *callback, m_consumer(consumer), m_num_messages(num_messages), m_timeout_ms(timeout_ms), - m_timeout_only_for_first_message(timeout_only_for_first_message, - m_timeout_shared_by_batch) {} + m_timeout_only_for_first_message(timeout_only_for_first_message), + m_timeout_shared_by_batch(timeout_shared_by_batch) {} KafkaConsumerConsumeNum::~KafkaConsumerConsumeNum() {} From 8c88361057a9321b6be94b83c8d078433910e6f0 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 13:00:47 -0500 Subject: [PATCH 14/17] fix timeout --- e2e/both.spec.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index 81cb95f0..993ce3ab 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -561,7 +561,7 @@ describe('Consumer/Producer', function() { let messageCount = 5; for (let i = 0; i < messageCount; i++) { - setTimeout(producer.produce(topic, null, buffer, null), 500 * i); + setTimeout(() => producer.produce(topic, null, buffer, null), 500 * i); } const start = Date.now(); @@ -589,7 +589,7 @@ describe('Consumer/Producer', function() { let messageCount = 5; for (let i = 0; i < messageCount; i++) { - setTimeout(producer.produce(topic, null, buffer, null), 500 * i); + setTimeout(() => producer.produce(topic, null, buffer, null), 500 * i); } const start = Date.now(); From a64dabb6dd9fc535e14be9e6e5edddc94a755444 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 13:24:28 -0500 Subject: [PATCH 15/17] final tests --- e2e/both.spec.js | 117 ++++++++++++++++++++++++----------------------- 1 file changed, 59 insertions(+), 58 deletions(-) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index 81cb95f0..d9d69a6b 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -295,6 +295,65 @@ describe('Consumer/Producer', function() { }); }); + describe('Timeout Shared By Batch - Behavior', function() { + it('should debounce batch return when bool is false: consumeNum', function(done) { + crypto.randomBytes(4096, function(_ex, buffer) { + consumer.setDefaultIsTimeoutSharedByBatch(false); + consumer.setDefaultConsumeTimeout(5000); + + producer.setPollInterval(10); + producer.once('delivery-report', function(err, _report) { + t.ifError(err); + }); + + consumer.subscribe([topic]); + + let messageCount = 12; + for (let i = 0; i < messageCount; i++) { + setTimeout(() => producer.produce(topic, null, buffer, null), 500 * i); + } + + const start = Date.now(); + // Batch size large enough to not interfere with timeout test + consumer.consume(1000, function(err, messages) { + t.ifError(err); + t.equal(messages.length, messageCount, 'Consume should wait for all messages because of debounce'); + t(Date.now() - start > 6000, 'Time passed should exceed timeout') // wiggle room for time shenanigans + done(); + }); + }); + }); + + it('should return after timeout when bool is true: consumeNum', function(done) { + crypto.randomBytes(4096, function(_ex, buffer) { + consumer.setDefaultIsTimeoutSharedByBatch(true); + consumer.setDefaultConsumeTimeout(5000); + + producer.setPollInterval(10); + producer.once('delivery-report', function(err, _report) { + t.ifError(err); + }); + + consumer.subscribe([topic]); + + let messageCount = 12; + for (let i = 0; i < messageCount; i++) { + setTimeout(() => producer.produce(topic, null, buffer, null), 500 * i); + } + + const start = Date.now(); + // Batch size large enough to not interfere with timeout test + consumer.consume(1000, function(err, messages) { + t.ifError(err); + t.notEqual(messages.length, messageCount, 'Consume should fail to get all messages because of timeout'); + t(Date.now() - start < 6000, 'Time passed should adhere to timeout') // wiggle room for time shenanigans + done(); + }); + }); + }); + }); + + it('should be able to produce and consume messages: consumeLoop', function(done) { var key = 'key'; @@ -546,64 +605,6 @@ describe('Consumer/Producer', function() { }, 2000); }); - describe('Timeout Shared By Batch - Behavior', function() { - it('should debounce batch return when bool is false', function(done) { - crypto.randomBytes(4096, function(_ex, buffer) { - consumer.setDefaultIsTimeoutSharedByBatch(false); - consumer.setDefaultConsumeTimeout(1000); - - producer.setPollInterval(10); - producer.once('delivery-report', function(err, _report) { - t.ifError(err); - }); - - consumer.subscribe([topic]); - - let messageCount = 5; - for (let i = 0; i < messageCount; i++) { - setTimeout(producer.produce(topic, null, buffer, null), 500 * i); - } - - const start = Date.now(); - // Batch size large enough to not interfere with timeout test - consumer.consume(100, function(err, messages) { - t.ifError(err); - t.equal(messages.length, messageCount, 'Consume should wait for all messages because of debounce'); - t(Date.now() - start > 2000, 'Time passed should exceed timeout') // wiggle room for time shenanigans - done(); - }) - }) - }) - - it('should return after timeout when bool is true', function(done) { - crypto.randomBytes(4096, function(_ex, buffer) { - consumer.setDefaultIsTimeoutSharedByBatch(true); - consumer.setDefaultConsumeTimeout(1000); - - producer.setPollInterval(10); - producer.once('delivery-report', function(err, _report) { - t.ifError(err); - }); - - consumer.subscribe([topic]); - - let messageCount = 5; - for (let i = 0; i < messageCount; i++) { - setTimeout(producer.produce(topic, null, buffer, null), 500 * i); - } - - const start = Date.now(); - // Batch size large enough to not interfere with timeout test - consumer.consume(100, function(err, messages) { - t.ifError(err); - t.notEqual(messages.length, messageCount, 'Consume should gather less than total messageCount because of timeout'); - t(Date.now() - start < 1500, 'Time passed should adhere to timeout') // wiggle room for time shenanigans - done(); - }) - }) - }) - }) - describe('Exceptional case - offset_commit_cb true', function() { var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex'); var consumerOpts = { From 060c26ec7ffde22dcb911ce94c1f09c2483f65b8 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 13:27:02 -0500 Subject: [PATCH 16/17] clean up newline --- e2e/both.spec.js | 1 - 1 file changed, 1 deletion(-) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index d9d69a6b..0dc4c8a6 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -353,7 +353,6 @@ describe('Consumer/Producer', function() { }); }); - it('should be able to produce and consume messages: consumeLoop', function(done) { var key = 'key'; From 2f0755b043da12c1f3c05033a69511ac4b67e7d3 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 3 Aug 2025 13:34:48 -0500 Subject: [PATCH 17/17] comments --- e2e/both.spec.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index 0dc4c8a6..9e758200 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -310,11 +310,10 @@ describe('Consumer/Producer', function() { let messageCount = 12; for (let i = 0; i < messageCount; i++) { - setTimeout(() => producer.produce(topic, null, buffer, null), 500 * i); + setTimeout(() => producer.produce(topic, null, buffer, null), 500 * i); // messages come in staggered under timeout } const start = Date.now(); - // Batch size large enough to not interfere with timeout test consumer.consume(1000, function(err, messages) { t.ifError(err); t.equal(messages.length, messageCount, 'Consume should wait for all messages because of debounce'); @@ -338,11 +337,10 @@ describe('Consumer/Producer', function() { let messageCount = 12; for (let i = 0; i < messageCount; i++) { - setTimeout(() => producer.produce(topic, null, buffer, null), 500 * i); + setTimeout(() => producer.produce(topic, null, buffer, null), 500 * i); // messages come in staggered under timeout } const start = Date.now(); - // Batch size large enough to not interfere with timeout test consumer.consume(1000, function(err, messages) { t.ifError(err); t.notEqual(messages.length, messageCount, 'Consume should fail to get all messages because of timeout');