@@ -184,6 +184,31 @@ Baton KafkaConsumer::Assign(std::vector<RdKafka::TopicPartition*> partitions) {
184184 return Baton (errcode);
185185}
186186
187+ Baton KafkaConsumer::IncrementalAssign (std::vector<RdKafka::TopicPartition*> partitions) {
188+ if (!IsConnected ()) {
189+ return Baton (RdKafka::ERR__STATE, " KafkaConsumer is disconnected" );
190+ }
191+
192+ RdKafka::KafkaConsumer* consumer =
193+ dynamic_cast <RdKafka::KafkaConsumer*>(m_client);
194+
195+ RdKafka::Error *e = consumer->incremental_assign (partitions);
196+
197+ if (e) {
198+ RdKafka::ErrorCode errcode = e->code ();
199+ delete e;
200+ return Baton (errcode);
201+ }
202+
203+ m_partition_cnt += partitions.size ();
204+ for (auto i = partitions.begin (); i != partitions.end (); ++i) {
205+ m_partitions.push_back (*i);
206+ }
207+ partitions.clear ();
208+
209+ return Baton (RdKafka::ERR_NO_ERROR);
210+ }
211+
187212Baton KafkaConsumer::Unassign () {
188213 if (!IsClosing () && !IsConnected ()) {
189214 return Baton (RdKafka::ERR__STATE);
@@ -200,12 +225,41 @@ Baton KafkaConsumer::Unassign() {
200225
201226 // Destroy the old list of partitions since we are no longer using it
202227 RdKafka::TopicPartition::destroy (m_partitions);
228+ m_partitions.clear ();
203229
204230 m_partition_cnt = 0 ;
205231
206232 return Baton (RdKafka::ERR_NO_ERROR);
207233}
208234
235+ Baton KafkaConsumer::IncrementalUnassign (std::vector<RdKafka::TopicPartition*> partitions) {
236+ if (!IsClosing () && !IsConnected ()) {
237+ return Baton (RdKafka::ERR__STATE);
238+ }
239+
240+ RdKafka::KafkaConsumer* consumer =
241+ dynamic_cast <RdKafka::KafkaConsumer*>(m_client);
242+
243+ RdKafka::Error *e = consumer->incremental_unassign (partitions);
244+ if (e) {
245+ RdKafka::ErrorCode errcode = e->code ();
246+ delete e;
247+ return Baton (errcode);
248+ }
249+
250+ // Destroy the old list of partitions since we are no longer using it
251+ RdKafka::TopicPartition::destroy (partitions);
252+
253+ m_partitions.erase (
254+ std::remove_if (m_partitions.begin (), m_partitions.end (), [&partitions](RdKafka::TopicPartition *x) -> bool {
255+ return std::find (partitions.begin (), partitions.end (), x) != partitions.end ();
256+ }),
257+ m_partitions.end ()
258+ );
259+ m_partition_cnt -= partitions.size ();
260+ return Baton (RdKafka::ERR_NO_ERROR);
261+ }
262+
209263Baton KafkaConsumer::Commit (std::vector<RdKafka::TopicPartition*> toppars) {
210264 if (!IsConnected ()) {
211265 return Baton (RdKafka::ERR__STATE);
@@ -532,7 +586,9 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
532586 Nan::SetPrototypeMethod (tpl, " committed" , NodeCommitted);
533587 Nan::SetPrototypeMethod (tpl, " position" , NodePosition);
534588 Nan::SetPrototypeMethod (tpl, " assign" , NodeAssign);
589+ Nan::SetPrototypeMethod (tpl, " incrementalAssign" , NodeIncrementalAssign);
535590 Nan::SetPrototypeMethod (tpl, " unassign" , NodeUnassign);
591+ Nan::SetPrototypeMethod (tpl, " incrementalUnassign" , NodeIncrementalUnassign);
536592 Nan::SetPrototypeMethod (tpl, " assignments" , NodeAssignments);
537593
538594 Nan::SetPrototypeMethod (tpl, " commit" , NodeCommit);
@@ -764,6 +820,64 @@ NAN_METHOD(KafkaConsumer::NodeAssign) {
764820 info.GetReturnValue ().Set (Nan::True ());
765821}
766822
823+ NAN_METHOD (KafkaConsumer::NodeIncrementalAssign) {
824+ Nan::HandleScope scope;
825+
826+ if (info.Length () < 1 || !info[0 ]->IsArray ()) {
827+ // Just throw an exception
828+ return Nan::ThrowError (" Need to specify an array of partitions" );
829+ }
830+
831+ v8::Local<v8::Array> partitions = info[0 ].As <v8::Array>();
832+ std::vector<RdKafka::TopicPartition*> topic_partitions;
833+
834+ for (unsigned int i = 0 ; i < partitions->Length (); ++i) {
835+ v8::Local<v8::Value> partition_obj_value;
836+ if (!(
837+ Nan::Get (partitions, i).ToLocal (&partition_obj_value) &&
838+ partition_obj_value->IsObject ())) {
839+ Nan::ThrowError (" Must pass topic-partition objects" );
840+ }
841+
842+ v8::Local<v8::Object> partition_obj = partition_obj_value.As <v8::Object>();
843+
844+ // Got the object
845+ int64_t partition = GetParameter<int64_t >(partition_obj, " partition" , -1 );
846+ std::string topic = GetParameter<std::string>(partition_obj, " topic" , " " );
847+
848+ if (!topic.empty ()) {
849+ RdKafka::TopicPartition* part;
850+
851+ if (partition < 0 ) {
852+ part = Connection::GetPartition (topic);
853+ } else {
854+ part = Connection::GetPartition (topic, partition);
855+ }
856+
857+ // Set the default value to offset invalid. If provided, we will not set
858+ // the offset.
859+ int64_t offset = GetParameter<int64_t >(
860+ partition_obj, " offset" , RdKafka::Topic::OFFSET_INVALID);
861+ if (offset != RdKafka::Topic::OFFSET_INVALID) {
862+ part->set_offset (offset);
863+ }
864+
865+ topic_partitions.push_back (part);
866+ }
867+ }
868+
869+ KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This ());
870+
871+ // Hand over the partitions to the consumer.
872+ Baton b = consumer->IncrementalAssign (topic_partitions);
873+
874+ if (b.err () != RdKafka::ERR_NO_ERROR) {
875+ Nan::ThrowError (RdKafka::err2str (b.err ()).c_str ());
876+ }
877+
878+ info.GetReturnValue ().Set (Nan::True ());
879+ }
880+
767881NAN_METHOD (KafkaConsumer::NodeUnassign) {
768882 Nan::HandleScope scope;
769883
@@ -784,6 +898,63 @@ NAN_METHOD(KafkaConsumer::NodeUnassign) {
784898 info.GetReturnValue ().Set (Nan::True ());
785899}
786900
901+ NAN_METHOD (KafkaConsumer::NodeIncrementalUnassign) {
902+ Nan::HandleScope scope;
903+
904+ if (info.Length () < 1 || !info[0 ]->IsArray ()) {
905+ // Just throw an exception
906+ return Nan::ThrowError (" Need to specify an array of partitions" );
907+ }
908+
909+ v8::Local<v8::Array> partitions = info[0 ].As <v8::Array>();
910+ std::vector<RdKafka::TopicPartition*> topic_partitions;
911+
912+ for (unsigned int i = 0 ; i < partitions->Length (); ++i) {
913+ v8::Local<v8::Value> partition_obj_value;
914+ if (!(
915+ Nan::Get (partitions, i).ToLocal (&partition_obj_value) &&
916+ partition_obj_value->IsObject ())) {
917+ Nan::ThrowError (" Must pass topic-partition objects" );
918+ }
919+
920+ v8::Local<v8::Object> partition_obj = partition_obj_value.As <v8::Object>();
921+
922+ // Got the object
923+ int64_t partition = GetParameter<int64_t >(partition_obj, " partition" , -1 );
924+ std::string topic = GetParameter<std::string>(partition_obj, " topic" , " " );
925+
926+ if (!topic.empty ()) {
927+ RdKafka::TopicPartition* part;
928+
929+ if (partition < 0 ) {
930+ part = Connection::GetPartition (topic);
931+ } else {
932+ part = Connection::GetPartition (topic, partition);
933+ }
934+
935+ // Set the default value to offset invalid. If provided, we will not set
936+ // the offset.
937+ int64_t offset = GetParameter<int64_t >(
938+ partition_obj, " offset" , RdKafka::Topic::OFFSET_INVALID);
939+ if (offset != RdKafka::Topic::OFFSET_INVALID) {
940+ part->set_offset (offset);
941+ }
942+
943+ topic_partitions.push_back (part);
944+ }
945+ }
946+
947+ KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This ());
948+ // Hand over the partitions to the consumer.
949+ Baton b = consumer->IncrementalUnassign (topic_partitions);
950+
951+ if (b.err () != RdKafka::ERR_NO_ERROR) {
952+ Nan::ThrowError (RdKafka::err2str (b.err ()).c_str ());
953+ }
954+
955+ info.GetReturnValue ().Set (Nan::True ());
956+ }
957+
787958NAN_METHOD (KafkaConsumer::NodeUnsubscribe) {
788959 Nan::HandleScope scope;
789960
0 commit comments