-
Notifications
You must be signed in to change notification settings - Fork 80
Open
Description
I believe this piece is to blame (lib/kafkat/interface/zookeeper.rb):
partitions = []
topic_string = pool.with_connection { |cnx| cnx.get(path1).first }
partition_ids = pool.with_connection { |cnx| cnx.children(path2) }
topic_json = JSON.parse(topic_string)
threads = partition_ids.map do |id|
id = id.to_i
Thread.new do
path3 = topic_partition_state_path(name, id)
partition_string = pool.with_connection { |cnx| cnx.get(path3).first }
partition_json = JSON.parse(partition_string)
replicas = topic_json['partitions'][id.to_s]
leader = partition_json['leader']
isr = partition_json['isr']
partition_queue << Partition.new(name, id, replicas, leader, isr)
end
end
threads.map(&:join)
It basically creates a thread for every partition it has to work on. That feels a bit an overkill. And poses certain problems spawning 500 threads when you work on a topic with 500 partitions.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels