File tree Expand file tree Collapse file tree 1 file changed +4
-3
lines changed Expand file tree Collapse file tree 1 file changed +4
-3
lines changed Original file line number Diff line number Diff line change 33 let ( :num_messages ) { 10_000 }
44 let ( :num_partitions ) { 30 }
55 let ( :num_consumers ) { 10 }
6+ let ( :group_id ) { "fuzz-#{ rand ( 1000 ) } " }
67 let ( :topic ) { create_random_topic ( num_partitions : num_partitions ) }
78 let ( :messages ) { Set . new ( ( 1 ..num_messages ) . to_a ) }
89
2223
2324 example "consuming messages in a group with unreliable members" do
2425 result_queue = Queue . new
25- consumer_threads = num_consumers . times . map { start_consumer ( result_queue ) }
26+ consumer_threads = num_consumers . times . map { start_consumer ( group_id , result_queue ) }
2627
2728 nemesis = Thread . new do
2829 loop do
6364 puts "#{ duplicate_messages . size } duplicate messages!"
6465 end
6566
66- def start_consumer ( result_queue )
67+ def start_consumer ( group_id , result_queue )
6768 thread = Thread . new do
6869 kafka = Kafka . new (
6970 seed_brokers : kafka_brokers ,
@@ -72,7 +73,7 @@ def start_consumer(result_queue)
7273 connect_timeout : 20 ,
7374 )
7475
75- consumer = kafka . consumer ( group_id : "fuzz" , session_timeout : 30 , offset_retention_time : 300 )
76+ consumer = kafka . consumer ( group_id : group_id , session_timeout : 30 , offset_retention_time : 300 )
7677 consumer . subscribe ( topic )
7778
7879 consumer . each_message do |message |
You can’t perform that action at this time.
0 commit comments