Skip to content

Commit a22d1f3

Browse files
authored
Handle consumer topic errors in librdkafka v1.5.0+ (#864)
1 parent 6810bdb commit a22d1f3

File tree

5 files changed

+76
-31
lines changed

5 files changed

+76
-31
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@ The following table lists events for this API.
504504
|-------|----------|
505505
|`data` | When using the Standard API consumed messages are emitted in this event. |
506506
|`partition.eof` | When using Standard API and the configuration option `enable.partition.eof` is set, `partition.eof` events are emitted in this event. The event contains `topic`, `partition` and `offset` properties. |
507+
|`warning` | The event is emitted in case of `UNKNOWN_TOPIC_OR_PART` or `TOPIC_AUTHORIZATION_FAILED` errors when consuming in *Flowing mode*. Since the consumer will continue working if the error is still happening, the warning event should reappear after the next metadata refresh. To control the metadata refresh rate set `topic.metadata.refresh.interval.ms` property. Once you resolve the error, you can manually call `getMetadata` to speed up consumer recovery. |
507508
|`disconnected` | The `disconnected` event is emitted when the broker disconnects. <br><br>This event is only emitted when `.disconnect` is called. The wrapper will always try to reconnect otherwise. |
508509
|`ready` | The `ready` event is emitted when the `Consumer` is ready to read messages. |
509510
|`event` | The `event` event is emitted when `librdkafka` reports an event (if you opted in via the `event_cb` option).|

e2e/both.spec.js

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ describe('Consumer/Producer', function() {
228228

229229
consumer.once('partition.eof', function(eof) {
230230
events.push("partition.eof");
231-
})
231+
});
232232

233233
setTimeout(function() {
234234
producer.produce(topic, null, buffer, null);
@@ -362,8 +362,23 @@ describe('Consumer/Producer', function() {
362362
startOffset + 2 ]);
363363
done();
364364
}, 6000);
365-
})
366-
})
365+
});
366+
});
367+
368+
it('should emit [warning] event on UNKNOWN_TOPIC_OR_PART error: consumeLoop', function(done) {
369+
consumer.on('warning', function (err) {
370+
if (err.code === Kafka.CODES.ERRORS.ERR_UNKNOWN_TOPIC_OR_PART) {
371+
consumer.disconnect(function() {
372+
done();
373+
});
374+
} else {
375+
t.ifError(err);
376+
}
377+
});
378+
379+
consumer.subscribe(['non_existing_topic']);
380+
consumer.consume();
381+
});
367382

368383
it('should be able to produce and consume messages with one header value as string: consumeLoop', function(done) {
369384
var headers = [

lib/kafka-consumer.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ KafkaConsumer.prototype.consume = function(number, cb) {
426426
KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) {
427427
var self = this;
428428
var retryReadInterval = this._consumeLoopTimeoutDelay;
429-
self._client.consumeLoop(timeoutMs, retryReadInterval, function readCallback(err, message, eofEvent) {
429+
self._client.consumeLoop(timeoutMs, retryReadInterval, function readCallback(err, message, eofEvent, warning) {
430430

431431
if (err) {
432432
// A few different types of errors here
@@ -436,6 +436,8 @@ KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) {
436436
cb(LibrdKafkaError.create(err));
437437
} else if (eofEvent) {
438438
self.emit('partition.eof', eofEvent);
439+
} else if (warning) {
440+
self.emit('warning', LibrdKafkaError.create(warning));
439441
} else {
440442
/**
441443
* Data event. called whenever a message is received.

src/workers.cc

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,8 @@ void KafkaConsumerConsumeLoop::Execute(const ExecutionMessageBus& bus) {
435435
bool looping = true;
436436
while (consumer->IsConnected() && looping) {
437437
Baton b = consumer->Consume(m_timeout_ms);
438-
if (b.err() == RdKafka::ERR_NO_ERROR) {
438+
RdKafka::ErrorCode ec = b.err();
439+
if (ec == RdKafka::ERR_NO_ERROR) {
439440
RdKafka::Message *message = b.data<RdKafka::Message*>();
440441
switch (message->err()) {
441442
case RdKafka::ERR__PARTITION_EOF:
@@ -472,6 +473,8 @@ void KafkaConsumerConsumeLoop::Execute(const ExecutionMessageBus& bus) {
472473
looping = false;
473474
break;
474475
}
476+
} else if (ec == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART || ec == RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED) {
477+
bus.SendWarning(ec);
475478
} else {
476479
// Unknown error. We need to break out of this
477480
SetErrorBaton(b);
@@ -480,36 +483,43 @@ void KafkaConsumerConsumeLoop::Execute(const ExecutionMessageBus& bus) {
480483
}
481484
}
482485

483-
void KafkaConsumerConsumeLoop::HandleMessageCallback(RdKafka::Message* msg) {
486+
void KafkaConsumerConsumeLoop::HandleMessageCallback(RdKafka::Message* msg, RdKafka::ErrorCode ec) {
484487
Nan::HandleScope scope;
485488

486-
const unsigned int argc = 3;
489+
const unsigned int argc = 4;
487490
v8::Local<v8::Value> argv[argc];
488491

489492
argv[0] = Nan::Null();
490-
switch (msg->err()) {
491-
case RdKafka::ERR__PARTITION_EOF: {
492-
argv[1] = Nan::Null();
493-
v8::Local<v8::Object> eofEvent = Nan::New<v8::Object>();
494-
495-
Nan::Set(eofEvent, Nan::New<v8::String>("topic").ToLocalChecked(),
496-
Nan::New<v8::String>(msg->topic_name()).ToLocalChecked());
497-
Nan::Set(eofEvent, Nan::New<v8::String>("offset").ToLocalChecked(),
498-
Nan::New<v8::Number>(msg->offset()));
499-
Nan::Set(eofEvent, Nan::New<v8::String>("partition").ToLocalChecked(),
500-
Nan::New<v8::Number>(msg->partition()));
501-
502-
argv[2] = eofEvent;
503-
break;
493+
if (msg == NULL) {
494+
argv[1] = Nan::Null();
495+
argv[2] = Nan::Null();
496+
argv[3] = Nan::New<v8::Number>(ec);
497+
} else {
498+
argv[3] = Nan::Null();
499+
switch (msg->err()) {
500+
case RdKafka::ERR__PARTITION_EOF: {
501+
argv[1] = Nan::Null();
502+
v8::Local<v8::Object> eofEvent = Nan::New<v8::Object>();
503+
504+
Nan::Set(eofEvent, Nan::New<v8::String>("topic").ToLocalChecked(),
505+
Nan::New<v8::String>(msg->topic_name()).ToLocalChecked());
506+
Nan::Set(eofEvent, Nan::New<v8::String>("offset").ToLocalChecked(),
507+
Nan::New<v8::Number>(msg->offset()));
508+
Nan::Set(eofEvent, Nan::New<v8::String>("partition").ToLocalChecked(),
509+
Nan::New<v8::Number>(msg->partition()));
510+
511+
argv[2] = eofEvent;
512+
break;
513+
}
514+
default:
515+
argv[1] = Conversion::Message::ToV8Object(msg);
516+
argv[2] = Nan::Null();
517+
break;
504518
}
505-
default:
506-
argv[1] = Conversion::Message::ToV8Object(msg);
507-
argv[2] = Nan::Null();
508-
break;
509-
}
510519

511-
// We can delete msg now
512-
delete msg;
520+
// We can delete msg now
521+
delete msg;
522+
}
513523

514524
callback->Call(argc, argv);
515525
}

src/workers.h

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,20 +90,26 @@ class MessageWorker : public ErrorAwareWorker {
9090
}
9191

9292
std::vector<RdKafka::Message*> message_queue;
93+
std::vector<RdKafka::ErrorCode> warning_queue;
9394

9495
{
9596
scoped_mutex_lock lock(m_async_lock);
9697
// Copy the vector and empty it
9798
m_asyncdata.swap(message_queue);
99+
m_asyncwarning.swap(warning_queue);
98100
}
99101

100102
for (unsigned int i = 0; i < message_queue.size(); i++) {
101-
HandleMessageCallback(message_queue[i]);
103+
HandleMessageCallback(message_queue[i], RdKafka::ERR_NO_ERROR);
102104

103105
// we are done with it. it is about to go out of scope
104106
// for the last time so let's just free it up here. can't rely
105107
// on the destructor
106108
}
109+
110+
for (unsigned int i = 0; i < warning_queue.size(); i++) {
111+
HandleMessageCallback(NULL, warning_queue[i]);
112+
}
107113
}
108114

109115
class ExecutionMessageBus {
@@ -112,13 +118,16 @@ class MessageWorker : public ErrorAwareWorker {
112118
void Send(RdKafka::Message* m) const {
113119
that_->Produce_(m);
114120
}
121+
void SendWarning(RdKafka::ErrorCode c) const {
122+
that_->ProduceWarning_(c);
123+
}
115124
private:
116125
explicit ExecutionMessageBus(MessageWorker* that) : that_(that) {}
117126
MessageWorker* const that_;
118127
};
119128

120129
virtual void Execute(const ExecutionMessageBus&) = 0;
121-
virtual void HandleMessageCallback(RdKafka::Message*) = 0;
130+
virtual void HandleMessageCallback(RdKafka::Message*, RdKafka::ErrorCode) = 0;
122131

123132
virtual void Destroy() {
124133
uv_close(reinterpret_cast<uv_handle_t*>(m_async), AsyncClose_);
@@ -135,6 +144,13 @@ class MessageWorker : public ErrorAwareWorker {
135144
m_asyncdata.push_back(m);
136145
uv_async_send(m_async);
137146
}
147+
148+
void ProduceWarning_(RdKafka::ErrorCode c) {
149+
scoped_mutex_lock lock(m_async_lock);
150+
m_asyncwarning.push_back(c);
151+
uv_async_send(m_async);
152+
}
153+
138154
NAN_INLINE static NAUV_WORK_CB(m_async_message) {
139155
MessageWorker *worker = static_cast<MessageWorker*>(async->data);
140156
worker->WorkMessage();
@@ -149,6 +165,7 @@ class MessageWorker : public ErrorAwareWorker {
149165
uv_async_t *m_async;
150166
uv_mutex_t m_async_lock;
151167
std::vector<RdKafka::Message*> m_asyncdata;
168+
std::vector<RdKafka::ErrorCode> m_asyncwarning;
152169
};
153170

154171
namespace Handle {
@@ -283,7 +300,7 @@ class KafkaConsumerConsumeLoop : public MessageWorker {
283300
void Execute(const ExecutionMessageBus&);
284301
void HandleOKCallback();
285302
void HandleErrorCallback();
286-
void HandleMessageCallback(RdKafka::Message*);
303+
void HandleMessageCallback(RdKafka::Message*, RdKafka::ErrorCode);
287304
private:
288305
NodeKafka::KafkaConsumer * consumer;
289306
const int m_timeout_ms;

0 commit comments

Comments
 (0)