Skip to content

Commit a1b4505

Browse files
committed
change where consume_async is opened / closed
1 parent a9099d4 commit a1b4505

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

src/kafka-consumer.cc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ KafkaConsumer::KafkaConsumer(Conf* gconfig, Conf* tconfig):
3737

3838
consume_callback = nullptr;
3939

40+
uv_async_init(Nan::GetCurrentEventLoop(), &consume_async, ConsumeMessage);
41+
consume_async.data = this;
42+
4043
uv_mutex_init(&consume_messages_lock);
4144
}
4245

@@ -50,6 +53,8 @@ KafkaConsumer::~KafkaConsumer() {
5053
}
5154

5255
uv_mutex_destroy(&consume_messages_lock);
56+
57+
uv_close((uv_handle_t*) &consume_async, NULL);
5358
}
5459

5560
Baton KafkaConsumer::Connect() {
@@ -497,7 +502,7 @@ void KafkaConsumer::ConsumeLoop(void *arg) {
497502
RdKafka::ErrorCode ec = b.err();
498503
if (ec == RdKafka::ERR_NO_ERROR) {
499504
RdKafka::Message *message = b.data<RdKafka::Message*>();
500-
switch (message->err()) {
505+
switch (message->err()) {
501506

502507
case RdKafka::ERR_NO_ERROR: {
503508
// message is deleted after it's passed to the main event loop
@@ -538,8 +543,6 @@ void KafkaConsumer::ConsumeLoop(void *arg) {
538543
looping = false;
539544
}
540545
}
541-
542-
uv_close((uv_handle_t*) &consumer->consume_async, NULL);
543546
}
544547

545548
void KafkaConsumer::ConsumeMessage(uv_async_t* handle) {
@@ -676,9 +679,6 @@ void KafkaConsumer::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
676679
v8::Local<v8::Object> context = v8::Local<v8::Object>::Cast(info[0]);
677680
consumer->consume_context.Reset(context);
678681

679-
uv_async_init(Nan::GetCurrentEventLoop(), &consumer->consume_async, ConsumeMessage);
680-
consumer->consume_async.data = consumer;
681-
682682
// Wrap it
683683
consumer->Wrap(info.This());
684684

0 commit comments

Comments
 (0)