Skip to content

Commit dd19954

Browse files
committed
Gracefully close Redpanda producer and consumer in example
1 parent c70dd80 commit dd19954

File tree

1 file changed

+31
-7
lines changed

1 file changed

+31
-7
lines changed

examples/redpanda_rspec.rb

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
require "rdkafka"
1313
require "rspec"
1414
require "rspec/autorun"
15+
require "timeout"
1516

1617
RSpec.configure do |config|
1718
config.add_setting :redpanda_container, default: nil
@@ -28,21 +29,44 @@
2829

2930
RSpec.describe "Redpanda" do
3031
it "works" do
31-
config = {
32+
topic = "ruby-test-topic"
33+
producer_config = {
34+
"bootstrap.servers": RSpec.configuration.redpanda_container.connection_url,
35+
"message.timeout.ms": 10_000
36+
}
37+
consumer_config = {
3238
"bootstrap.servers": RSpec.configuration.redpanda_container.connection_url,
3339
"group.id": "ruby-test",
3440
"auto.offset.reset": "earliest"
3541
}
3642

37-
producer = Rdkafka::Config.new(config).producer
38-
producer.produce(payload: "test", topic: "ruby-test-topic").wait
43+
producer = nil
44+
consumer = nil
45+
46+
begin
47+
producer = Rdkafka::Config.new(producer_config).producer
48+
consumer = Rdkafka::Config.new(consumer_config).consumer
49+
consumer.subscribe(topic)
50+
51+
producer.produce(payload: "test", topic: topic).wait
3952

40-
consumer = Rdkafka::Config.new(config).consumer
41-
consumer.subscribe("ruby-test-topic")
53+
message = nil
54+
begin
55+
Timeout.timeout(15) do
56+
loop do
57+
message = consumer.poll(1000)
58+
break if message
59+
end
60+
end
61+
rescue Timeout::Error
62+
fail "Timed out waiting for message on #{topic}"
63+
end
4264

43-
consumer.each do |message|
4465
expect(message.payload).to eq("test")
45-
break
66+
ensure
67+
producer&.flush(5_000)
68+
producer&.close
69+
consumer&.close
4670
end
4771
end
4872
end

0 commit comments

Comments
 (0)