Skip to content

Commit d6d86e8

Browse files
committed
consumer: fix consumeNum to respect the consume timeout
Steps to reproduce: * configure consumer with setDefaultConsumeTimeout(1000) * produce 1 message every 500ms * call consumer.consume(128, cb) Actual outcome: * consume returns 128 messages after 64 seconds Expected outcome: * consumer returns ~2 messages after 1 second KafkaConsumerConsumeNum call underlaying c++ m_consumer->Consume in cycle until * either the accumulated batch is full * or the call to c++ m_consumer->Consume times out on the total timeout KafkaConsumerConsumeNum must enforce its timeout over all m_consumer->Consume invocations altogether.
1 parent 095352a commit d6d86e8

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

e2e/both.spec.js

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,49 @@ describe('Consumer/Producer', function() {
272272
});
273273
});
274274

275+
it('should stop consuming batch after consume timeout', function(done) {
276+
crypto.randomBytes(4096, function(ex, buffer) {
277+
producer.setPollInterval(10);
278+
279+
producer.once('delivery-report', function(err, report) {
280+
t.ifError(err);
281+
});
282+
283+
consumer.subscribe([topic]);
284+
285+
var events = [];
286+
287+
consumer.once('data', function(msg) {
288+
events.push("data");
289+
});
290+
291+
consumer.on('partition.eof', function(eof) {
292+
events.push("partition.eof");
293+
});
294+
295+
let timeoutId;
296+
let toProduce = 10;
297+
produceLoop = () => {
298+
producer.produce(topic, null, buffer, null);
299+
if (--toProduce > 0) {
300+
timeoutId = setTimeout(produceLoop, 500);
301+
}
302+
};
303+
produceLoop();
304+
305+
consumer.setDefaultConsumeTimeout(2000);
306+
const startedAt = Date.now();
307+
consumer.consume(100, function(err, messages) {
308+
t.ifError(err);
309+
t(Date.now() - startedAt < 3000, 'Consume took longer than consume timeout');
310+
t(messages.length > 2, 'Too few messages consumed within batch');
311+
t(messages.length < 8, 'Too many messages consumed within batch');
312+
clearTimeout(timeoutId);
313+
done();
314+
});
315+
});
316+
});
317+
275318
it('should be able to produce and consume messages: consumeLoop', function(done) {
276319
var key = 'key';
277320

src/workers.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
* of the MIT license. See the LICENSE.txt file for details.
88
*/
99

10+
#include <algorithm>
11+
#include <chrono>
1012
#include <string>
1113
#include <vector>
1214

@@ -810,6 +812,7 @@ void KafkaConsumerConsumeNum::Execute() {
810812
bool looping = true;
811813
int timeout_ms = m_timeout_ms;
812814
std::size_t eof_event_count = 0;
815+
const auto end = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
813816

814817
while (m_messages.size() - eof_event_count < max && looping) {
815818
// Get a message
@@ -838,6 +841,8 @@ void KafkaConsumerConsumeNum::Execute() {
838841
break;
839842
case RdKafka::ERR_NO_ERROR:
840843
m_messages.push_back(b.data<RdKafka::Message*>());
844+
timeout_ms = std::max(0LL, std::chrono::duration_cast<std::chrono::milliseconds>(
845+
end - std::chrono::steady_clock::now()).count());
841846
break;
842847
default:
843848
// Set the error for any other errors and break

0 commit comments

Comments
 (0)