Skip to content

Commit dabb2d3

Browse files
committed
Fix for failover issue.
When the consumer enters the group and gets no assignment (for ex. there is not enough partitions in the topic), librdkafka waits for the rebalancing sequence to be finished by calling assign with the empty list of partitions (just as was passed by librdkafka to rebalance callback). But cppkafka instead pass nullptr instead of an empty list (which means unassign). And consumer stuck forever in that state, not being able to pick the partition during the next rebalance (failover), because the previous rebalance sequence was not finished. Fixes #273 , ClickHouse/ClickHouse#21118 , etc.
1 parent 57a599d commit dabb2d3

File tree

1 file changed

+3
-9
lines changed

1 file changed

+3
-9
lines changed

src/consumer.cpp

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,9 @@ void Consumer::unsubscribe() {
124124

125125
void Consumer::assign(const TopicPartitionList& topic_partitions) {
126126
rd_kafka_resp_err_t error;
127-
if (topic_partitions.empty()) {
128-
error = rd_kafka_assign(get_handle(), nullptr);
129-
check_error(error);
130-
}
131-
else {
132-
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
133-
error = rd_kafka_assign(get_handle(), topic_list_handle.get());
134-
check_error(error, topic_list_handle.get());
135-
}
127+
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
128+
error = rd_kafka_assign(get_handle(), topic_list_handle.get());
129+
check_error(error, topic_list_handle.get());
136130
}
137131

138132
void Consumer::unassign() {

0 commit comments

Comments
 (0)