Skip to content

Commit 5334cd5

Browse files
authored
Merge pull request #543 from nguyenquangminh0711/add-consumer-property-tests
Add more tests to ensure consumer properties
2 parents e6a7780 + c0770e6 commit 5334cd5

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

spec/functional/consumer_group_spec.rb

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,60 @@
148148

149149
expect(processed_messages).to eq(num_messages * 2)
150150
end
151+
152+
example "consumers process all messages in-order and non-duplicated" do
153+
topic = create_random_topic(num_partitions: 2)
154+
message_count = 500
155+
messages_set_1 = (1..500).to_a
156+
messages_set_2 = (501..1000).to_a
157+
158+
begin
159+
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test")
160+
producer = kafka.producer
161+
messages_set_1.each do |i|
162+
producer.produce(i.to_s, topic: topic, partition: 0)
163+
end
164+
messages_set_2.each do |i|
165+
producer.produce(i.to_s, topic: topic, partition: 1)
166+
end
167+
producer.deliver_messages
168+
end
169+
170+
group_id = "test#{rand(1000)}"
171+
received_messages = {}
172+
173+
consumers = 2.times.map do
174+
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test", logger: logger)
175+
consumer = kafka.consumer(group_id: group_id)
176+
consumer.subscribe(topic)
177+
consumer
178+
end
179+
180+
threads = consumers.map do |consumer|
181+
t = Thread.new do
182+
received_messages[Thread.current] = []
183+
puts received_messages
184+
consumer.each_message do |message|
185+
received_messages[Thread.current] << message
186+
187+
if received_messages[Thread.current].count == message_count
188+
consumer.stop
189+
end
190+
end
191+
end
192+
t.abort_on_exception = true
193+
t
194+
end
195+
196+
threads.each(&:join)
197+
198+
received_messages.each do |_thread, messages|
199+
values = messages.map(&:value).map(&:to_i)
200+
if messages.first.partition == 0
201+
expect(values).to eql(messages_set_1)
202+
else
203+
expect(values).to eql(messages_set_2)
204+
end
205+
end
206+
end
151207
end

0 commit comments

Comments
 (0)