Skip to content

Commit 65840b8

Browse files
authored
Merge pull request #529 from zendesk/dasch/improve-consumer-specs
Improve the consumer specs
2 parents f70cfc3 + ad1b665 commit 65840b8

File tree

1 file changed

+60
-59
lines changed

1 file changed

+60
-59
lines changed

spec/functional/consumer_group_spec.rb

Lines changed: 60 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,102 +1,77 @@
11
describe "Consumer API", functional: true do
2-
let(:num_partitions) { 15 }
3-
let!(:topic) { create_random_topic(num_partitions: 3) }
42
let(:offset_retention_time) { 30 }
53

64
example "consuming messages from the beginning of a topic" do
5+
topic = create_random_topic(num_partitions: 1)
76
messages = (1..1000).to_a
87

9-
Thread.new do
8+
begin
109
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test")
1110
producer = kafka.producer
1211

1312
messages.each do |i|
14-
producer.produce(i.to_s, topic: topic, partition_key: i.to_s)
15-
16-
if i % 100 == 0
17-
producer.deliver_messages
18-
sleep 1
19-
end
20-
end
21-
22-
(0...num_partitions).each do |i|
23-
# Send a tombstone to each partition.
24-
producer.produce(nil, topic: topic, partition: i)
13+
producer.produce(i.to_s, topic: topic, partition: 0)
2514
end
2615

2716
producer.deliver_messages
2817
end
2918

3019
group_id = "test#{rand(1000)}"
3120

32-
threads = 2.times.map do |thread_id|
33-
t = Thread.new do
34-
received_messages = []
21+
mutex = Mutex.new
22+
received_messages = []
3523

36-
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test", logger: logger)
37-
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time)
38-
consumer.subscribe(topic)
24+
consumers = 2.times.map do
25+
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test", logger: logger)
26+
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time)
27+
consumer.subscribe(topic)
28+
consumer
29+
end
3930

31+
threads = consumers.map do |consumer|
32+
t = Thread.new do
4033
consumer.each_message do |message|
41-
if message.value.nil?
42-
consumer.stop
43-
else
44-
received_messages << Integer(message.value)
34+
mutex.synchronize do
35+
received_messages << message
36+
37+
if received_messages.count == messages.count
38+
consumers.each(&:stop)
39+
end
4540
end
4641
end
47-
48-
received_messages
4942
end
5043

5144
t.abort_on_exception = true
5245

5346
t
5447
end
5548

56-
received_messages = threads.map(&:value).flatten
49+
threads.each(&:join)
5750

58-
expect(received_messages.sort).to match_array messages
51+
expect(received_messages.map(&:value).map(&:to_i)).to match_array messages
5952
end
6053

61-
example "consuming messages from the end of a topic" do
62-
sent_messages = 1_000
63-
64-
num_partitions = 1
54+
example "consuming messages from a topic that's being written to" do
55+
num_partitions = 3
6556
topic = create_random_topic(num_partitions: num_partitions)
66-
group_id = "test#{rand(1000)}"
57+
messages = (1..100).to_a
6758

68-
consumer_thread = Thread.new do
69-
received_messages = 0
70-
71-
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test", logger: logger)
72-
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time)
73-
consumer.subscribe(topic, start_from_beginning: false)
74-
75-
consumer.each_message do |message|
76-
if message.value.nil?
77-
consumer.stop
78-
else
79-
received_messages += 1
80-
end
81-
end
82-
83-
received_messages
84-
end
85-
86-
consumer_thread.abort_on_exception = true
87-
88-
sleep 1
59+
mutex = Mutex.new
60+
var = ConditionVariable.new
8961

9062
Thread.new do
9163
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test")
9264
producer = kafka.producer
9365

94-
1.upto(sent_messages) do |i|
95-
producer.produce("hello", topic: topic, partition_key: i.to_s)
66+
messages.each do |i|
67+
producer.produce(i.to_s, topic: topic, partition: i % 3)
9668

9769
if i % 100 == 0
9870
producer.deliver_messages
99-
sleep 1
71+
72+
mutex.synchronize do
73+
var.wait(mutex)
74+
end
10075
end
10176
end
10277

@@ -108,9 +83,35 @@
10883
producer.deliver_messages
10984
end
11085

111-
received_messages = consumer_thread.value
86+
group_id = "test#{rand(1000)}"
87+
received_messages = []
88+
89+
threads = 2.times.map do |thread_id|
90+
t = Thread.new do
91+
kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test", logger: logger)
92+
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time)
93+
consumer.subscribe(topic)
94+
95+
consumer.each_message do |message|
96+
if message.value.nil?
97+
consumer.stop
98+
else
99+
mutex.synchronize do
100+
received_messages << Integer(message.value)
101+
var.signal
102+
end
103+
end
104+
end
105+
end
106+
107+
t.abort_on_exception = true
108+
109+
t
110+
end
111+
112+
threads.each(&:join)
112113

113-
expect(received_messages).to eq sent_messages
114+
expect(received_messages).to match_array messages
114115
end
115116

116117
example "stopping and restarting a consumer group" do

0 commit comments

Comments
 (0)