Skip to content

Commit 6fc412d

Browse files
committed
reuse existing classes
1 parent a1b4505 commit 6fc412d

File tree

5 files changed

+48
-158
lines changed

5 files changed

+48
-158
lines changed

lib/kafka-consumer.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ KafkaConsumer.prototype.unsubscribe = function() {
385385
* is fetched.
386386
*/
387387
KafkaConsumer.prototype.consume = function(number, cb) {
388-
var timeoutMs = this._consumeTimeout || DEFAULT_CONSUME_TIME_OUT;
388+
var timeoutMs = this._consumeTimeout !== undefined ? this._consumeTimeout : DEFAULT_CONSUME_TIME_OUT;
389389
var self = this;
390390

391391
if ((number && typeof number === 'number') || (number && cb)) {

src/kafka-consumer.cc

Lines changed: 17 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
* of the MIT license. See the LICENSE.txt file for details.
88
*/
99

10-
#define THREADED_CONSUME
11-
1210
#include <string>
1311
#include <vector>
1412

@@ -35,26 +33,12 @@ KafkaConsumer::KafkaConsumer(Conf* gconfig, Conf* tconfig):
3533

3634
m_gconfig->set("default_topic_conf", m_tconfig, errstr);
3735

38-
consume_callback = nullptr;
39-
40-
uv_async_init(Nan::GetCurrentEventLoop(), &consume_async, ConsumeMessage);
41-
consume_async.data = this;
42-
43-
uv_mutex_init(&consume_messages_lock);
36+
m_consume_loop = nullptr;
4437
}
4538

4639
KafkaConsumer::~KafkaConsumer() {
4740
// We only want to run this if it hasn't been run already
4841
Disconnect();
49-
50-
if (consume_callback != nullptr) {
51-
consume_callback->Reset();
52-
consume_callback = nullptr;
53-
}
54-
55-
uv_mutex_destroy(&consume_messages_lock);
56-
57-
uv_close((uv_handle_t*) &consume_async, NULL);
5842
}
5943

6044
Baton KafkaConsumer::Connect() {
@@ -108,6 +92,11 @@ Baton KafkaConsumer::Disconnect() {
10892
}
10993
}
11094

95+
if (m_consume_loop != nullptr) {
96+
delete m_consume_loop;
97+
m_consume_loop = nullptr;
98+
}
99+
111100
m_is_closing = false;
112101

113102
return Baton(err);
@@ -492,88 +481,6 @@ std::string KafkaConsumer::Name() {
492481
return std::string(m_client->name());
493482
}
494483

495-
void KafkaConsumer::ConsumeLoop(void *arg) {
496-
KafkaConsumer* consumer = (KafkaConsumer*)arg;
497-
498-
bool looping = true;
499-
500-
while (consumer->IsConnected() && looping) {
501-
Baton b = consumer->Consume(consumer->consume_timeout_ms);
502-
RdKafka::ErrorCode ec = b.err();
503-
if (ec == RdKafka::ERR_NO_ERROR) {
504-
RdKafka::Message *message = b.data<RdKafka::Message*>();
505-
switch (message->err()) {
506-
507-
case RdKafka::ERR_NO_ERROR: {
508-
// message is deleted after it's passed to the main event loop
509-
510-
scoped_mutex_lock lock(consumer->consume_messages_lock);
511-
consumer->consume_messages.push_back(message);
512-
513-
uv_async_send(&consumer->consume_async);
514-
515-
break;
516-
}
517-
518-
case RdKafka::ERR__TIMED_OUT:
519-
case RdKafka::ERR__TIMED_OUT_QUEUE:
520-
case RdKafka::ERR__PARTITION_EOF: {
521-
delete message;
522-
523-
// no need to wait here because the next Consume() call will handle the timeout?
524-
525-
break;
526-
}
527-
528-
default:
529-
// Unknown error. We need to break out of this
530-
// SetErrorBaton(b);
531-
// TODO: pass error along?
532-
533-
looping = false;
534-
break;
535-
}
536-
537-
} else if (ec == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART || ec == RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED) {
538-
// bus.SendWarning(ec);
539-
540-
} else {
541-
// Unknown error. We need to break out of this
542-
// SetErrorBaton(b);
543-
looping = false;
544-
}
545-
}
546-
}
547-
548-
void KafkaConsumer::ConsumeMessage(uv_async_t* handle) {
549-
Nan::HandleScope scope;
550-
551-
KafkaConsumer* consumer = (KafkaConsumer*)handle->data;
552-
553-
std::vector<RdKafka::Message*> message_queue;
554-
// std::vector<RdKafka::ErrorCode> warning_queue;
555-
556-
{
557-
scoped_mutex_lock lock(consumer->consume_messages_lock);
558-
// Copy the vector and empty it
559-
consumer->consume_messages.swap(message_queue);
560-
// m_asyncwarning.swap(warning_queue);
561-
}
562-
563-
for (unsigned int i = 0; i < message_queue.size(); i++) {
564-
RdKafka::Message* message = message_queue[i];
565-
v8::Local<v8::Value> argv[] = {
566-
Nan::Null(),
567-
Conversion::Message::ToV8Object(message),
568-
Nan::Null(),
569-
};
570-
571-
delete message;
572-
573-
consumer->consume_callback->Call(3, argv);
574-
}
575-
}
576-
577484
Nan::Persistent<v8::Function> KafkaConsumer::constructor;
578485

579486
void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
@@ -676,9 +583,6 @@ void KafkaConsumer::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
676583

677584
KafkaConsumer* consumer = new KafkaConsumer(gconfig, tconfig);
678585

679-
v8::Local<v8::Object> context = v8::Local<v8::Object>::Cast(info[0]);
680-
consumer->consume_context.Reset(context);
681-
682586
// Wrap it
683587
consumer->Wrap(info.This());
684588

@@ -1186,25 +1090,19 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) {
11861090

11871091
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
11881092

1189-
v8::Local<v8::Function> cb = info[2].As<v8::Function>();
1190-
1191-
Nan::Callback *callback = new Nan::Callback(cb);
1093+
if (consumer->m_consume_loop != nullptr) {
1094+
return Nan::ThrowError("Consume was already called");
1095+
}
11921096

1193-
#ifdef THREADED_CONSUME
1194-
if (consumer->consume_callback != nullptr) {
1195-
return Nan::ThrowError("Consume was already called once");
1097+
if (!consumer->IsConnected()) {
1098+
return Nan::ThrowError("Connect before calling consume");
11961099
}
11971100

1198-
consumer->consume_timeout_ms = timeout_ms;
1199-
consumer->consume_retry_read_ms = retry_read_ms;
1200-
consumer->consume_callback = callback;
1101+
v8::Local<v8::Function> cb = info[2].As<v8::Function>();
12011102

1202-
uv_thread_create(&consumer->consume_event_loop, KafkaConsumer::ConsumeLoop, (void*)consumer);
1203-
1204-
#else
1205-
Nan::AsyncQueueWorker(
1206-
new Workers::KafkaConsumerConsumeLoop(callback, consumer, timeout_ms, retry_read_ms));
1207-
#endif
1103+
Nan::Callback *callback = new Nan::Callback(cb);
1104+
1105+
consumer->m_consume_loop = new Workers::KafkaConsumerConsumeLoop(callback, consumer, timeout_ms, retry_read_ms);
12081106

12091107
info.GetReturnValue().Set(Nan::Null());
12101108
}
@@ -1259,8 +1157,8 @@ NAN_METHOD(KafkaConsumer::NodeConsume) {
12591157
v8::Local<v8::Function> cb = info[1].As<v8::Function>();
12601158
Nan::Callback *callback = new Nan::Callback(cb);
12611159

1262-
Nan::AsyncQueueWorker(
1263-
new Workers::KafkaConsumerConsume(callback, consumer, timeout_ms));
1160+
Nan::AsyncQueueWorker(
1161+
new Workers::KafkaConsumerConsume(callback, consumer, timeout_ms));
12641162
}
12651163

12661164
info.GetReturnValue().Set(Nan::Null());

src/kafka-consumer.h

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,6 @@ class KafkaConsumer : public Connection {
8484
void ActivateDispatchers();
8585
void DeactivateDispatchers();
8686

87-
static void ConsumeLoop(void *arg);
88-
static void ConsumeMessage(uv_async_t* handle);
89-
9087
protected:
9188
static Nan::Persistent<v8::Function> constructor;
9289
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
@@ -101,15 +98,7 @@ class KafkaConsumer : public Connection {
10198
int m_partition_cnt;
10299
bool m_is_subscribed = false;
103100

104-
uv_thread_t consume_event_loop;
105-
int consume_timeout_ms;
106-
int consume_retry_read_ms;
107-
Nan::Callback *consume_callback;
108-
Nan::Persistent<v8::Object> consume_context;
109-
110-
uv_async_t consume_async;
111-
uv_mutex_t consume_messages_lock;
112-
std::vector<RdKafka::Message*> consume_messages;
101+
void* m_consume_loop;
113102

114103
// Node methods
115104
static NAN_METHOD(NodeConnect);

src/workers.cc

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -664,59 +664,61 @@ KafkaConsumerConsumeLoop::KafkaConsumerConsumeLoop(Nan::Callback *callback,
664664
MessageWorker(callback),
665665
consumer(consumer),
666666
m_timeout_ms(timeout_ms),
667-
m_timeout_sleep_delay_ms(timeout_sleep_delay_ms),
668-
m_rand_seed(time(NULL)) {}
667+
m_timeout_sleep_delay_ms(timeout_sleep_delay_ms) {
668+
uv_thread_create(&thread_event_loop, KafkaConsumerConsumeLoop::ConsumeLoop, (void*)this);
669+
}
669670

670671
KafkaConsumerConsumeLoop::~KafkaConsumerConsumeLoop() {}
671672

672673
void KafkaConsumerConsumeLoop::Execute(const ExecutionMessageBus& bus) {
674+
// ConsumeLoop is used instead
675+
}
676+
677+
void KafkaConsumerConsumeLoop::ConsumeLoop(void *arg) {
678+
KafkaConsumerConsumeLoop* consumerLoop = (KafkaConsumerConsumeLoop*)arg;
679+
ExecutionMessageBus bus(consumerLoop);
680+
KafkaConsumer* consumer = consumerLoop->consumer;
681+
673682
// Do one check here before we move forward
674683
bool looping = true;
675684
while (consumer->IsConnected() && looping) {
676-
Baton b = consumer->Consume(m_timeout_ms);
685+
Baton b = consumer->Consume(consumerLoop->m_timeout_ms);
677686
RdKafka::ErrorCode ec = b.err();
678687
if (ec == RdKafka::ERR_NO_ERROR) {
679688
RdKafka::Message *message = b.data<RdKafka::Message*>();
680689
switch (message->err()) {
681690
case RdKafka::ERR__PARTITION_EOF:
682691
bus.Send(message);
683-
// EOF means there are no more messages to read.
684-
// We should wait a little bit for more messages to come in
685-
// when in consume loop mode
686-
// Randomise the wait time to avoid contention on different
687-
// slow topics
688-
#ifndef _WIN32
689-
usleep(static_cast<int>(rand_r(&m_rand_seed) * 1000 * 1000 / RAND_MAX));
690-
#else
691-
_sleep(1000);
692-
#endif
693692
break;
693+
694694
case RdKafka::ERR__TIMED_OUT:
695695
case RdKafka::ERR__TIMED_OUT_QUEUE:
696696
delete message;
697-
// If it is timed out this could just mean there were no
698-
// new messages fetched quickly enough. This isn't really
699-
// an error that should kill us.
700-
#ifndef _WIN32
701-
usleep(m_timeout_sleep_delay_ms*1000);
702-
#else
703-
_sleep(m_timeout_sleep_delay_ms);
704-
#endif
697+
if (consumerLoop->m_timeout_sleep_delay_ms > 0) {
698+
// If it is timed out this could just mean there were no
699+
// new messages fetched quickly enough. This isn't really
700+
// an error that should kill us.
701+
#ifndef _WIN32
702+
usleep(consumerLoop->m_timeout_sleep_delay_ms*1000);
703+
#else
704+
_sleep(consumerLoop->m_timeout_sleep_delay_ms);
705+
#endif
706+
}
705707
break;
706708
case RdKafka::ERR_NO_ERROR:
707709
bus.Send(message);
708710
break;
709711
default:
710712
// Unknown error. We need to break out of this
711-
SetErrorBaton(b);
713+
consumerLoop->SetErrorBaton(b);
712714
looping = false;
713715
break;
714716
}
715717
} else if (ec == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART || ec == RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED) {
716718
bus.SendWarning(ec);
717719
} else {
718720
// Unknown error. We need to break out of this
719-
SetErrorBaton(b);
721+
consumerLoop->SetErrorBaton(b);
720722
looping = false;
721723
}
722724
}
@@ -770,7 +772,6 @@ void KafkaConsumerConsumeLoop::HandleOKCallback() {
770772
void KafkaConsumerConsumeLoop::HandleErrorCallback() {
771773
Nan::HandleScope scope;
772774

773-
774775
const unsigned int argc = 1;
775776
v8::Local<v8::Value> argv[argc] = { Nan::Error(ErrorMessage()) };
776777

src/workers.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ class MessageWorker : public ErrorAwareWorker {
121121
void SendWarning(RdKafka::ErrorCode c) const {
122122
that_->ProduceWarning_(c);
123123
}
124-
private:
125124
explicit ExecutionMessageBus(MessageWorker* that) : that_(that) {}
125+
private:
126126
MessageWorker* const that_;
127127
};
128128

@@ -372,12 +372,14 @@ class KafkaConsumerConsumeLoop : public MessageWorker {
372372
NodeKafka::KafkaConsumer*, const int &, const int &);
373373
~KafkaConsumerConsumeLoop();
374374

375+
static void ConsumeLoop(void *arg);
375376
void Execute(const ExecutionMessageBus&);
376377
void HandleOKCallback();
377378
void HandleErrorCallback();
378379
void HandleMessageCallback(RdKafka::Message*, RdKafka::ErrorCode);
379380
private:
380-
NodeKafka::KafkaConsumer * consumer;
381+
uv_thread_t thread_event_loop;
382+
NodeKafka::KafkaConsumer* consumer;
381383
const int m_timeout_ms;
382384
unsigned int m_rand_seed;
383385
const int m_timeout_sleep_delay_ms;

0 commit comments

Comments
 (0)