Skip to content

Commit 7d1c384

Browse files
committed
Make "consuming messages with a custom assignment strategy" stable
In the example, one consumer sometimes consumers all messages before another consumer joins. As a result, the example is unstable. This commit makes it stable by ensuring all consumers have joined.
1 parent 8e00bea commit 7d1c384

File tree

1 file changed

+9
-0
lines changed

1 file changed

+9
-0
lines changed

spec/functional/consumer_group_spec.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,12 +476,21 @@ def call(cluster:, members:, partitions:)
476476
end
477477
end
478478

479+
joinined_consumers = []
479480
consumers = 2.times.map do |i|
480481
assignment_strategy = assignment_strategy_class.new(i + 1)
481482

482483
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
483484
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy: assignment_strategy)
484485
consumer.subscribe(topic)
486+
487+
allow(consumer).to receive(:trigger_heartbeat).and_wrap_original do |m, *args|
488+
joinined_consumers |= [consumer]
489+
# Wait until all the consumers try to join to prevent one consumer from processing all messages
490+
raise Kafka::HeartbeatError if joinined_consumers.size < consumers.size
491+
m.call(*args)
492+
end
493+
485494
consumer
486495
end
487496

0 commit comments

Comments
 (0)