7
7
* of the MIT license. See the LICENSE.txt file for details.
8
8
*/
9
9
10
+ #define THREADED_CONSUME
11
+
10
12
#include < string>
11
13
#include < vector>
12
14
@@ -32,11 +34,22 @@ KafkaConsumer::KafkaConsumer(Conf* gconfig, Conf* tconfig):
32
34
std::string errstr;
33
35
34
36
m_gconfig->set (" default_topic_conf" , m_tconfig, errstr);
37
+
38
+ consume_callback = nullptr ;
39
+
40
+ uv_mutex_init (&consume_messages_lock);
35
41
}
36
42
37
43
KafkaConsumer::~KafkaConsumer () {
38
44
// We only want to run this if it hasn't been run already
39
45
Disconnect ();
46
+
47
+ if (consume_callback != nullptr ) {
48
+ consume_callback->Reset ();
49
+ consume_callback = nullptr ;
50
+ }
51
+
52
+ uv_mutex_destroy (&consume_messages_lock);
40
53
}
41
54
42
55
Baton KafkaConsumer::Connect () {
@@ -474,6 +487,90 @@ std::string KafkaConsumer::Name() {
474
487
return std::string (m_client->name ());
475
488
}
476
489
490
+ void KafkaConsumer::ConsumeLoop (void *arg) {
491
+ KafkaConsumer* consumer = (KafkaConsumer*)arg;
492
+
493
+ bool looping = true ;
494
+
495
+ while (consumer->IsConnected () && looping) {
496
+ Baton b = consumer->Consume (consumer->consume_timeout_ms );
497
+ RdKafka::ErrorCode ec = b.err ();
498
+ if (ec == RdKafka::ERR_NO_ERROR) {
499
+ RdKafka::Message *message = b.data <RdKafka::Message*>();
500
+ switch (message->err ()) {
501
+
502
+ case RdKafka::ERR_NO_ERROR: {
503
+ // message is deleted after it's passed to the main event loop
504
+
505
+ scoped_mutex_lock lock (consumer->consume_messages_lock );
506
+ consumer->consume_messages .push_back (message);
507
+
508
+ uv_async_send (&consumer->consume_async );
509
+
510
+ break ;
511
+ }
512
+
513
+ case RdKafka::ERR__TIMED_OUT:
514
+ case RdKafka::ERR__TIMED_OUT_QUEUE:
515
+ case RdKafka::ERR__PARTITION_EOF: {
516
+ delete message;
517
+
518
+ // no need to wait here because the next Consume() call will handle the timeout?
519
+
520
+ break ;
521
+ }
522
+
523
+ default :
524
+ // Unknown error. We need to break out of this
525
+ // SetErrorBaton(b);
526
+ // TODO: pass error along?
527
+
528
+ looping = false ;
529
+ break ;
530
+ }
531
+
532
+ } else if (ec == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART || ec == RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED) {
533
+ // bus.SendWarning(ec);
534
+
535
+ } else {
536
+ // Unknown error. We need to break out of this
537
+ // SetErrorBaton(b);
538
+ looping = false ;
539
+ }
540
+ }
541
+
542
+ uv_close ((uv_handle_t *) &consumer->consume_async , NULL );
543
+ }
544
+
545
+ void KafkaConsumer::ConsumeMessage (uv_async_t * handle) {
546
+ Nan::HandleScope scope;
547
+
548
+ KafkaConsumer* consumer = (KafkaConsumer*)handle->data ;
549
+
550
+ std::vector<RdKafka::Message*> message_queue;
551
+ // std::vector<RdKafka::ErrorCode> warning_queue;
552
+
553
+ {
554
+ scoped_mutex_lock lock (consumer->consume_messages_lock );
555
+ // Copy the vector and empty it
556
+ consumer->consume_messages .swap (message_queue);
557
+ // m_asyncwarning.swap(warning_queue);
558
+ }
559
+
560
+ for (unsigned int i = 0 ; i < message_queue.size (); i++) {
561
+ RdKafka::Message* message = message_queue[i];
562
+ v8::Local<v8::Value> argv[] = {
563
+ Nan::Null (),
564
+ Conversion::Message::ToV8Object (message),
565
+ Nan::Null (),
566
+ };
567
+
568
+ delete message;
569
+
570
+ consumer->consume_callback ->Call (3 , argv);
571
+ }
572
+ }
573
+
477
574
Nan::Persistent<v8::Function> KafkaConsumer::constructor;
478
575
479
576
void KafkaConsumer::Init (v8::Local<v8::Object> exports) {
@@ -576,6 +673,12 @@ void KafkaConsumer::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
576
673
577
674
KafkaConsumer* consumer = new KafkaConsumer (gconfig, tconfig);
578
675
676
+ v8::Local<v8::Object> context = v8::Local<v8::Object>::Cast (info[0 ]);
677
+ consumer->consume_context .Reset (context);
678
+
679
+ uv_async_init (Nan::GetCurrentEventLoop (), &consumer->consume_async , ConsumeMessage);
680
+ consumer->consume_async .data = consumer;
681
+
579
682
// Wrap it
580
683
consumer->Wrap (info.This ());
581
684
@@ -1086,8 +1189,22 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) {
1086
1189
v8::Local<v8::Function> cb = info[2 ].As <v8::Function>();
1087
1190
1088
1191
Nan::Callback *callback = new Nan::Callback (cb);
1192
+
1193
+ #ifdef THREADED_CONSUME
1194
+ if (consumer->consume_callback != nullptr ) {
1195
+ return Nan::ThrowError (" Consume was already called once" );
1196
+ }
1197
+
1198
+ consumer->consume_timeout_ms = timeout_ms;
1199
+ consumer->consume_retry_read_ms = retry_read_ms;
1200
+ consumer->consume_callback = callback;
1201
+
1202
+ uv_thread_create (&consumer->consume_event_loop , KafkaConsumer::ConsumeLoop, (void *)consumer);
1203
+
1204
+ #else
1089
1205
Nan::AsyncQueueWorker (
1090
1206
new Workers::KafkaConsumerConsumeLoop (callback, consumer, timeout_ms, retry_read_ms));
1207
+ #endif
1091
1208
1092
1209
info.GetReturnValue ().Set (Nan::Null ());
1093
1210
}
@@ -1141,8 +1258,9 @@ NAN_METHOD(KafkaConsumer::NodeConsume) {
1141
1258
1142
1259
v8::Local<v8::Function> cb = info[1 ].As <v8::Function>();
1143
1260
Nan::Callback *callback = new Nan::Callback (cb);
1144
- Nan::AsyncQueueWorker (
1145
- new Workers::KafkaConsumerConsume (callback, consumer, timeout_ms));
1261
+
1262
+ Nan::AsyncQueueWorker (
1263
+ new Workers::KafkaConsumerConsume (callback, consumer, timeout_ms));
1146
1264
}
1147
1265
1148
1266
info.GetReturnValue ().Set (Nan::Null ());
0 commit comments