Skip to content

Commit 0629955

Browse files
committed
Fix issue with cooperative assignor when rebalance_cb is a boolean
1 parent 98c78e7 commit 0629955

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

lib/kafka-consumer.js

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,17 @@ function KafkaConsumer(conf, topicConf) {
6262
// That's it
6363
try {
6464
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
65-
self.assign(assignment);
65+
if (self.rebalanceProtocol() === 'COOPERATIVE') {
66+
self.incrementalAssign(assignment);
67+
} else {
68+
self.assign(assignment);
69+
}
6670
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
67-
self.unassign();
71+
if (self.rebalanceProtocol() === 'COOPERATIVE') {
72+
self.incrementalUnassign(assignment);
73+
} else {
74+
self.unassign();
75+
}
6876
}
6977
} catch (e) {
7078
// Ignore exceptions if we are not connected

0 commit comments

Comments
 (0)