Skip to content

Commit da02c84

Browse files
authored
Merge pull request #822 from lihightower/replica-count
Add client function for fetching topic replica count
2 parents 1b2ed7c + fd59385 commit da02c84

File tree

3 files changed

+26
-1
lines changed

3 files changed

+26
-1
lines changed

lib/kafka/client.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,14 @@ def partitions_for(topic)
694694
@cluster.partitions_for(topic).count
695695
end
696696

697+
# Counts the number of replicas for a topic's partition
698+
#
699+
# @param topic [String]
700+
# @return [Integer] the number of replica nodes for the topic's partition
701+
def replica_count_for(topic)
702+
@cluster.partitions_for(topic).first.replicas.count
703+
end
704+
697705
# Retrieve the offset of the last message in a partition. If there are no
698706
# messages in the partition -1 is returned.
699707
#

lib/kafka/protocol/metadata_response.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ module Protocol
3434
#
3535
class MetadataResponse
3636
class PartitionMetadata
37-
attr_reader :partition_id, :leader
37+
attr_reader :partition_id, :leader, :replicas
3838

3939
attr_reader :partition_error_code
4040

spec/functional/client_spec.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,23 @@
130130
expect(kafka.partitions_for(topic)).to be > 0
131131
end
132132

133+
example "fetching the replica count for a topic" do
134+
expect(kafka.replica_count_for(topic)).to eq 1
135+
end
136+
137+
example "fetching the replica count for a topic that doesn't yet exist" do
138+
topic = "unknown-topic-#{SecureRandom.uuid}"
139+
140+
expect { kafka.replica_count_for(topic) }.to raise_exception(Kafka::LeaderNotAvailable)
141+
142+
# Eventually the call should succeed.
143+
expect {
144+
10.times { kafka.replica_count_for(topic) rescue nil }
145+
}.not_to raise_exception
146+
147+
expect(kafka.replica_count_for(topic)).to be > 0
148+
end
149+
133150
example "delivering a message to a topic" do
134151
kafka.deliver_message("yolo", topic: topic, key: "xoxo", partition: 0)
135152

0 commit comments

Comments
 (0)