diff --git a/deps/librdkafka b/deps/librdkafka index c282ba24..1a722553 160000 --- a/deps/librdkafka +++ b/deps/librdkafka @@ -1 +1 @@ -Subproject commit c282ba2423b2694052393c8edb0399a5ef471b3f +Subproject commit 1a722553638bba85dbda5050455f7b9a5ef302de diff --git a/index.d.ts b/index.d.ts index d7ce7e61..9b6ad333 100644 --- a/index.d.ts +++ b/index.d.ts @@ -223,6 +223,10 @@ export class KafkaConsumer extends Client { consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void; consume(): void; + incrementalAssign(assigments: Assignment[]): this; + + incrementalUnassign(assignments: Assignment[]): this; + getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets; offsetsStore(topicPartitions: TopicPartitionOffset[]): any; diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index c479240f..89072340 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -21,6 +21,49 @@ var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500; var DEFAULT_CONSUME_TIME_OUT = 1000; util.inherits(KafkaConsumer, Client); +var eagerRebalanceCallback = function(err, assignment) { + // Create the librdkafka error + err = LibrdKafkaError.create(err); + // Emit the event + this.emit('rebalance', err, assignment); + + // That's it + try { + if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) { + this.assign(assignment); + } else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) { + this.unassign(); + } + } catch (e) { + // Ignore exceptions if we are not connected + if (this.isConnected()) { + this.emit('rebalance.error', e); + } + } +} + +var cooperativeRebalanceCallback = function(err, assignment) { + // Create the librdkafka error + err = LibrdKafkaError.create(err); + // Emit the event + this.emit('rebalance', err, assignment); + + // That's it + try { + if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) { + this.incrementalAssign(assignment); + } else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) { + this.incrementalUnassign(assignment); + } + } catch (e) { + // Ignore exceptions if we are not connected + if (this.isConnected()) { + this.emit('rebalance.error', e); + } + } +} + + /** * KafkaConsumer class for reading messages from Kafka * @@ -52,26 +95,9 @@ function KafkaConsumer(conf, topicConf) { // If rebalance is undefined we don't want any part of this if (onRebalance && typeof onRebalance === 'boolean') { - conf.rebalance_cb = function(err, assignment) { - // Create the librdkafka error - err = LibrdKafkaError.create(err); - // Emit the event - self.emit('rebalance', err, assignment); - - // That's it - try { - if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) { - self.assign(assignment); - } else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) { - self.unassign(); - } - } catch (e) { - // Ignore exceptions if we are not connected - if (self.isConnected()) { - self.emit('rebalance.error', e); - } - } - }; + conf.rebalance_cb = conf['partition.assignment.strategy'] === 'cooperative-sticky' + ? cooperativeRebalanceCallback.bind(this) + : eagerRebalanceCallback.bind(this); } else if (onRebalance && typeof onRebalance === 'function') { /* * Once this is opted in to, that's it. It's going to manually rebalance @@ -264,6 +290,19 @@ KafkaConsumer.prototype.assign = function(assignments) { return this; }; +/** + * Incremental assign the consumer specific partitions and topics + * + * @param {array} assignments - Assignments array. Should contain + * objects with topic and partition set. + * @return {Client} - Returns itself + */ + +KafkaConsumer.prototype.incrementalAssign = function(assignments) { + this._client.incrementalAssign(TopicPartition.map(assignments)); + return this; +}; + /** * Unassign the consumer from its assigned partitions and topics. * @@ -275,6 +314,18 @@ KafkaConsumer.prototype.unassign = function() { return this; }; +/** + * Incremental unassign the consumer from specific partitions and topics + * + * @param {array} assignments - Assignments array. Should contain + * objects with topic and partition set. + * @return {Client} - Returns itself + */ + +KafkaConsumer.prototype.incrementalUnassign = function(assignments) { + this._client.incrementalUnassign(TopicPartition.map(assignments)); + return this; +}; /** * Get the assignments for the consumer @@ -341,6 +392,17 @@ KafkaConsumer.prototype.position = function(toppars) { return this._errorWrap(this._client.position(toppars), true); }; +/** + * Check whether the consumer considers the current assignment to have been + * lost invountarily. + * + * @throws Throws from native land if + * @returns {boolean} Whether the assignment have been lost or not + */ +KafkaConsumer.prototype.assimentLost = function() { + return this._client.assignmentLost(); +} + /** * Unsubscribe from all currently subscribed topics * diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 0c4c07c2..5a5af906 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -184,6 +184,31 @@ Baton KafkaConsumer::Assign(std::vector partitions) { return Baton(errcode); } +Baton KafkaConsumer::IncrementalAssign(std::vector partitions) { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE, "KafkaConsumer is disconnected"); + } + + RdKafka::KafkaConsumer* consumer = + dynamic_cast(m_client); + + RdKafka::Error *e = consumer->incremental_assign(partitions); + + if (e) { + RdKafka::ErrorCode errcode = e->code(); + delete e; + return Baton(errcode); + } + + m_partition_cnt += partitions.size(); + for (auto i = partitions.begin(); i != partitions.end(); ++i) { + m_partitions.push_back(*i); + } + partitions.clear(); + + return Baton(RdKafka::ERR_NO_ERROR); +} + Baton KafkaConsumer::Unassign() { if (!IsClosing() && !IsConnected()) { return Baton(RdKafka::ERR__STATE); @@ -200,12 +225,41 @@ Baton KafkaConsumer::Unassign() { // Destroy the old list of partitions since we are no longer using it RdKafka::TopicPartition::destroy(m_partitions); + m_partitions.clear(); m_partition_cnt = 0; return Baton(RdKafka::ERR_NO_ERROR); } +Baton KafkaConsumer::IncrementalUnassign(std::vector partitions) { + if (!IsClosing() && !IsConnected()) { + return Baton(RdKafka::ERR__STATE); + } + + RdKafka::KafkaConsumer* consumer = + dynamic_cast(m_client); + + RdKafka::Error *e = consumer->incremental_unassign(partitions); + if (e) { + RdKafka::ErrorCode errcode = e->code(); + delete e; + return Baton(errcode); + } + + // Destroy the old list of partitions since we are no longer using it + RdKafka::TopicPartition::destroy(partitions); + + m_partitions.erase( + std::remove_if(m_partitions.begin(), m_partitions.end(), [&partitions](RdKafka::TopicPartition *x) -> bool { + return std::find(partitions.begin(), partitions.end(), x) != partitions.end(); + }), + m_partitions.end() + ); + m_partition_cnt -= partitions.size(); + return Baton(RdKafka::ERR_NO_ERROR); +} + Baton KafkaConsumer::Commit(std::vector toppars) { if (!IsConnected()) { return Baton(RdKafka::ERR__STATE); @@ -332,6 +386,20 @@ Baton KafkaConsumer::Position(std::vector &toppars) { return Baton(err); } + +Baton KafkaConsumer::AssigmentLost() { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE, "KafkaConsumer is not connected"); + } + + RdKafka::KafkaConsumer* consumer = + dynamic_cast(m_client); + + // XXX: Returning a bool by casting it to a pointer, + return Baton(reinterpret_cast( + static_cast(consumer->assignment_lost() ? true : false))); +} + Baton KafkaConsumer::Subscription() { if (!IsConnected()) { return Baton(RdKafka::ERR__STATE, "Consumer is not connected"); @@ -531,8 +599,11 @@ void KafkaConsumer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "committed", NodeCommitted); Nan::SetPrototypeMethod(tpl, "position", NodePosition); + Nan::SetPrototypeMethod(tpl, "assignemntLost", NodeAssignmentLost); Nan::SetPrototypeMethod(tpl, "assign", NodeAssign); + Nan::SetPrototypeMethod(tpl, "incrementalAssign", NodeIncrementalAssign); Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign); + Nan::SetPrototypeMethod(tpl, "incrementalUnassign", NodeIncrementalUnassign); Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments); Nan::SetPrototypeMethod(tpl, "commit", NodeCommit); @@ -689,6 +760,20 @@ NAN_METHOD(KafkaConsumer::NodePosition) { RdKafka::TopicPartition::destroy(toppars); } +NAN_METHOD(KafkaConsumer::NodeAssignmentLost) { + + Nan::HandleScope scope; + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + Baton b = consumer->AssigmentLost(); + if (b.err() != RdKafka::ERR_NO_ERROR) { + Nan::ThrowError(RdKafka::err2str(b.err()).c_str()); + } + bool result = static_cast(reinterpret_cast(b.data())); + info.GetReturnValue().Set(Nan::New(result)); +} + NAN_METHOD(KafkaConsumer::NodeAssignments) { Nan::HandleScope scope; @@ -764,6 +849,64 @@ NAN_METHOD(KafkaConsumer::NodeAssign) { info.GetReturnValue().Set(Nan::True()); } +NAN_METHOD(KafkaConsumer::NodeIncrementalAssign) { + Nan::HandleScope scope; + + if (info.Length() < 1 || !info[0]->IsArray()) { + // Just throw an exception + return Nan::ThrowError("Need to specify an array of partitions"); + } + + v8::Local partitions = info[0].As(); + std::vector topic_partitions; + + for (unsigned int i = 0; i < partitions->Length(); ++i) { + v8::Local partition_obj_value; + if (!( + Nan::Get(partitions, i).ToLocal(&partition_obj_value) && + partition_obj_value->IsObject())) { + Nan::ThrowError("Must pass topic-partition objects"); + } + + v8::Local partition_obj = partition_obj_value.As(); + + // Got the object + int64_t partition = GetParameter(partition_obj, "partition", -1); + std::string topic = GetParameter(partition_obj, "topic", ""); + + if (!topic.empty()) { + RdKafka::TopicPartition* part; + + if (partition < 0) { + part = Connection::GetPartition(topic); + } else { + part = Connection::GetPartition(topic, partition); + } + + // Set the default value to offset invalid. If provided, we will not set + // the offset. + int64_t offset = GetParameter( + partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID); + if (offset != RdKafka::Topic::OFFSET_INVALID) { + part->set_offset(offset); + } + + topic_partitions.push_back(part); + } + } + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + // Hand over the partitions to the consumer. + Baton b = consumer->IncrementalAssign(topic_partitions); + + if (b.err() != RdKafka::ERR_NO_ERROR) { + Nan::ThrowError(RdKafka::err2str(b.err()).c_str()); + } + + info.GetReturnValue().Set(Nan::True()); +} + NAN_METHOD(KafkaConsumer::NodeUnassign) { Nan::HandleScope scope; @@ -784,6 +927,63 @@ NAN_METHOD(KafkaConsumer::NodeUnassign) { info.GetReturnValue().Set(Nan::True()); } +NAN_METHOD(KafkaConsumer::NodeIncrementalUnassign) { + Nan::HandleScope scope; + + if (info.Length() < 1 || !info[0]->IsArray()) { + // Just throw an exception + return Nan::ThrowError("Need to specify an array of partitions"); + } + + v8::Local partitions = info[0].As(); + std::vector topic_partitions; + + for (unsigned int i = 0; i < partitions->Length(); ++i) { + v8::Local partition_obj_value; + if (!( + Nan::Get(partitions, i).ToLocal(&partition_obj_value) && + partition_obj_value->IsObject())) { + Nan::ThrowError("Must pass topic-partition objects"); + } + + v8::Local partition_obj = partition_obj_value.As(); + + // Got the object + int64_t partition = GetParameter(partition_obj, "partition", -1); + std::string topic = GetParameter(partition_obj, "topic", ""); + + if (!topic.empty()) { + RdKafka::TopicPartition* part; + + if (partition < 0) { + part = Connection::GetPartition(topic); + } else { + part = Connection::GetPartition(topic, partition); + } + + // Set the default value to offset invalid. If provided, we will not set + // the offset. + int64_t offset = GetParameter( + partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID); + if (offset != RdKafka::Topic::OFFSET_INVALID) { + part->set_offset(offset); + } + + topic_partitions.push_back(part); + } + } + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + // Hand over the partitions to the consumer. + Baton b = consumer->IncrementalUnassign(topic_partitions); + + if (b.err() != RdKafka::ERR_NO_ERROR) { + Nan::ThrowError(RdKafka::err2str(b.err()).c_str()); + } + + info.GetReturnValue().Set(Nan::True()); +} + NAN_METHOD(KafkaConsumer::NodeUnsubscribe) { Nan::HandleScope scope; diff --git a/src/kafka-consumer.h b/src/kafka-consumer.h index de8f2181..81097084 100644 --- a/src/kafka-consumer.h +++ b/src/kafka-consumer.h @@ -65,6 +65,7 @@ class KafkaConsumer : public Connection { Baton Committed(std::vector &, int timeout_ms); Baton Position(std::vector &); + Baton AssigmentLost(); Baton RefreshAssignments(); @@ -72,7 +73,9 @@ class KafkaConsumer : public Connection { int AssignedPartitionCount(); Baton Assign(std::vector); + Baton IncrementalAssign(std::vector); Baton Unassign(); + Baton IncrementalUnassign(std::vector); Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms); @@ -99,13 +102,15 @@ class KafkaConsumer : public Connection { bool m_is_subscribed = false; void* m_consume_loop; - + // Node methods static NAN_METHOD(NodeConnect); static NAN_METHOD(NodeSubscribe); static NAN_METHOD(NodeDisconnect); static NAN_METHOD(NodeAssign); + static NAN_METHOD(NodeIncrementalAssign); static NAN_METHOD(NodeUnassign); + static NAN_METHOD(NodeIncrementalUnassign); static NAN_METHOD(NodeAssignments); static NAN_METHOD(NodeUnsubscribe); static NAN_METHOD(NodeCommit); @@ -113,6 +118,7 @@ class KafkaConsumer : public Connection { static NAN_METHOD(NodeOffsetsStore); static NAN_METHOD(NodeCommitted); static NAN_METHOD(NodePosition); + static NAN_METHOD(NodeAssignmentLost); static NAN_METHOD(NodeSubscription); static NAN_METHOD(NodeSeek); static NAN_METHOD(NodeGetWatermarkOffsets);