diff --git a/e2e/both.spec.js b/e2e/both.spec.js index a8289ec3..452227d5 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -229,7 +229,7 @@ describe('Consumer/Producer', function() { setTimeout(function() { producer.produce(topic, null, buffer, null); }, 500) - consumer.setDefaultConsumeTimeout(2000); + consumer.setDefaultConsumeTimeout(3000); consumer.consume(1000, function(err, messages) { t.ifError(err); t.equal(messages.length, 1); @@ -262,7 +262,7 @@ describe('Consumer/Producer', function() { setTimeout(function() { producer.produce(topic, null, buffer, null); }, 2000) - consumer.setDefaultConsumeTimeout(3000); + consumer.setDefaultConsumeTimeout(5000); consumer.consume(1000, function(err, messages) { t.ifError(err); t.equal(messages.length, 1); @@ -272,6 +272,49 @@ describe('Consumer/Producer', function() { }); }); + it('should stop consuming batch after consume timeout', function(done) { + crypto.randomBytes(4096, function(ex, buffer) { + producer.setPollInterval(10); + + producer.once('delivery-report', function(err, report) { + t.ifError(err); + }); + + consumer.subscribe([topic]); + + var events = []; + + consumer.once('data', function(msg) { + events.push("data"); + }); + + consumer.on('partition.eof', function(eof) { + events.push("partition.eof"); + }); + + let timeoutId; + let toProduce = 10; + produceLoop = () => { + producer.produce(topic, null, buffer, null); + if (--toProduce > 0) { + timeoutId = setTimeout(produceLoop, 500); + } + }; + produceLoop(); + + consumer.setDefaultConsumeTimeout(2000); + const startedAt = Date.now(); + consumer.consume(100, function(err, messages) { + t.ifError(err); + t(Date.now() - startedAt < 3000, 'Consume took longer than consume timeout'); + t(messages.length > 2, 'Too few messages consumed within batch'); + t(messages.length < 8, 'Too many messages consumed within batch'); + clearTimeout(timeoutId); + done(); + }); + }); + }); + it('should be able to produce and consume messages: consumeLoop', function(done) { var key = 'key'; diff --git a/src/workers.cc b/src/workers.cc index 55d3dd50..b4ac77a1 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -7,6 +7,8 @@ * of the MIT license. See the LICENSE.txt file for details. */ +#include +#include #include #include @@ -810,6 +812,7 @@ void KafkaConsumerConsumeNum::Execute() { bool looping = true; int timeout_ms = m_timeout_ms; std::size_t eof_event_count = 0; + const auto end = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); while (m_messages.size() - eof_event_count < max && looping) { // Get a message @@ -838,6 +841,8 @@ void KafkaConsumerConsumeNum::Execute() { break; case RdKafka::ERR_NO_ERROR: m_messages.push_back(b.data()); + timeout_ms = std::max(0, static_cast(std::chrono::duration_cast( + end - std::chrono::steady_clock::now()).count())); break; default: // Set the error for any other errors and break