Skip to content

Commit 8429f0c

Browse files
authored
Stop consume loop thread when disconnecting (#1017)
1 parent 76ec934 commit 8429f0c

File tree

6 files changed

+27
-13
lines changed

6 files changed

+27
-13
lines changed

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "node-rdkafka",
3-
"version": "v2.16.0",
3+
"version": "v2.16.1",
44
"description": "Node.js bindings for librdkafka",
55
"librdkafka": "2.1.1",
66
"main": "lib/index.js",

src/kafka-consumer.cc

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,6 @@ Baton KafkaConsumer::Disconnect() {
9292
}
9393
}
9494

95-
if (m_consume_loop != nullptr) {
96-
delete m_consume_loop;
97-
m_consume_loop = nullptr;
98-
}
99-
10095
m_is_closing = false;
10196

10297
return Baton(err);
@@ -1192,6 +1187,18 @@ NAN_METHOD(KafkaConsumer::NodeDisconnect) {
11921187
Nan::Callback *callback = new Nan::Callback(cb);
11931188
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
11941189

1190+
Workers::KafkaConsumerConsumeLoop* consumeLoop = (Workers::KafkaConsumerConsumeLoop*)consumer->m_consume_loop;
1191+
if (consumeLoop != nullptr) {
1192+
// stop the consume loop
1193+
consumeLoop->Close();
1194+
1195+
// cleanup the async worker
1196+
consumeLoop->WorkComplete();
1197+
consumeLoop->Destroy();
1198+
1199+
consumer->m_consume_loop = nullptr;
1200+
}
1201+
11951202
Nan::AsyncQueueWorker(
11961203
new Workers::KafkaConsumerDisconnect(callback, consumer));
11971204
info.GetReturnValue().Set(Nan::Null());

src/kafka-consumer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class KafkaConsumer : public Connection {
9898
int m_partition_cnt;
9999
bool m_is_subscribed = false;
100100

101-
void* m_consume_loop;
101+
void* m_consume_loop = nullptr;
102102

103103
// Node methods
104104
static NAN_METHOD(NodeConnect);

src/workers.cc

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -663,13 +663,19 @@ KafkaConsumerConsumeLoop::KafkaConsumerConsumeLoop(Nan::Callback *callback,
663663
const int & timeout_sleep_delay_ms) :
664664
MessageWorker(callback),
665665
consumer(consumer),
666+
m_looping(true),
666667
m_timeout_ms(timeout_ms),
667668
m_timeout_sleep_delay_ms(timeout_sleep_delay_ms) {
668669
uv_thread_create(&thread_event_loop, KafkaConsumerConsumeLoop::ConsumeLoop, (void*)this);
669670
}
670671

671672
KafkaConsumerConsumeLoop::~KafkaConsumerConsumeLoop() {}
672673

674+
void KafkaConsumerConsumeLoop::Close() {
675+
m_looping = false;
676+
uv_thread_join(&thread_event_loop);
677+
}
678+
673679
void KafkaConsumerConsumeLoop::Execute(const ExecutionMessageBus& bus) {
674680
// ConsumeLoop is used instead
675681
}
@@ -680,8 +686,7 @@ void KafkaConsumerConsumeLoop::ConsumeLoop(void *arg) {
680686
KafkaConsumer* consumer = consumerLoop->consumer;
681687

682688
// Do one check here before we move forward
683-
bool looping = true;
684-
while (consumer->IsConnected() && looping) {
689+
while (consumerLoop->m_looping && consumer->IsConnected()) {
685690
Baton b = consumer->Consume(consumerLoop->m_timeout_ms);
686691
RdKafka::ErrorCode ec = b.err();
687692
if (ec == RdKafka::ERR_NO_ERROR) {
@@ -711,15 +716,15 @@ void KafkaConsumerConsumeLoop::ConsumeLoop(void *arg) {
711716
default:
712717
// Unknown error. We need to break out of this
713718
consumerLoop->SetErrorBaton(b);
714-
looping = false;
719+
consumerLoop->m_looping = false;
715720
break;
716721
}
717722
} else if (ec == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART || ec == RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED) {
718723
bus.SendWarning(ec);
719724
} else {
720725
// Unknown error. We need to break out of this
721726
consumerLoop->SetErrorBaton(b);
722-
looping = false;
727+
consumerLoop->m_looping = false;
723728
}
724729
}
725730
}

src/workers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,7 @@ class KafkaConsumerConsumeLoop : public MessageWorker {
373373
~KafkaConsumerConsumeLoop();
374374

375375
static void ConsumeLoop(void *arg);
376+
void Close();
376377
void Execute(const ExecutionMessageBus&);
377378
void HandleOKCallback();
378379
void HandleErrorCallback();
@@ -383,6 +384,7 @@ class KafkaConsumerConsumeLoop : public MessageWorker {
383384
const int m_timeout_ms;
384385
unsigned int m_rand_seed;
385386
const int m_timeout_sleep_delay_ms;
387+
bool m_looping;
386388
};
387389

388390
class KafkaConsumerConsume : public ErrorAwareWorker {

0 commit comments

Comments
 (0)