Skip to content

Commit 316d414

Browse files
authored
Merge pull request #172 from jvortmann/main
Extract lower level kafka client from producer to decouple finalizer logic and close logic
2 parents 655d14e + 0a40727 commit 316d414

File tree

5 files changed

+202
-31
lines changed

5 files changed

+202
-31
lines changed

lib/rdkafka.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,6 @@
1717
require "rdkafka/error"
1818
require "rdkafka/metadata"
1919
require "rdkafka/producer"
20+
require "rdkafka/producer/client"
2021
require "rdkafka/producer/delivery_handle"
2122
require "rdkafka/producer/delivery_report"

lib/rdkafka/config.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ def producer
179179
# Set callback to receive delivery reports on config
180180
Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction)
181181
# Return producer with Kafka client
182-
Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer)).tap do |producer|
182+
Rdkafka::Producer.new(Rdkafka::Producer::Client.new(native_kafka(config, :rd_kafka_producer))).tap do |producer|
183183
opaque.producer = producer
184184
end
185185
end

lib/rdkafka/producer.rb

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
require "securerandom"
1+
require "objspace"
22

33
module Rdkafka
44
# A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that.
@@ -10,25 +10,11 @@ class Producer
1010
attr_reader :delivery_callback
1111

1212
# @private
13-
def initialize(native_kafka)
14-
@id = SecureRandom.uuid
15-
@closing = false
16-
@native_kafka = native_kafka
13+
def initialize(client)
14+
@client = client
1715

1816
# Makes sure, that the producer gets closed before it gets GCed by Ruby
19-
ObjectSpace.define_finalizer(@id, proc { close })
20-
21-
# Start thread to poll client for delivery callbacks
22-
@polling_thread = Thread.new do
23-
loop do
24-
Rdkafka::Bindings.rd_kafka_poll(@native_kafka, 250)
25-
# Exit thread if closing and the poll queue is empty
26-
if @closing && Rdkafka::Bindings.rd_kafka_outq_len(@native_kafka) == 0
27-
break
28-
end
29-
end
30-
end
31-
@polling_thread.abort_on_exception = true
17+
ObjectSpace.define_finalizer(self, client.finalizer)
3218
end
3319

3420
# Set a callback that will be called every time a message is successfully produced.
@@ -44,16 +30,9 @@ def delivery_callback=(callback)
4430

4531
# Close this producer and wait for the internal poll queue to empty.
4632
def close
47-
ObjectSpace.undefine_finalizer(@id)
48-
49-
return unless @native_kafka
33+
ObjectSpace.undefine_finalizer(self)
5034

51-
# Indicate to polling thread that we're closing
52-
@closing = true
53-
# Wait for the polling thread to finish up
54-
@polling_thread.join
55-
Rdkafka::Bindings.rd_kafka_destroy(@native_kafka)
56-
@native_kafka = nil
35+
@client.close
5736
end
5837

5938
# Partition count for a given topic.
@@ -65,7 +44,7 @@ def close
6544
#
6645
def partition_count(topic)
6746
closed_producer_check(__method__)
68-
Rdkafka::Metadata.new(@native_kafka, topic).topics&.first[:partition_count]
47+
Rdkafka::Metadata.new(@client.native, topic).topics&.first[:partition_count]
6948
end
7049

7150
# Produces a message to a Kafka topic. The message is added to rdkafka's queue, call {DeliveryHandle#wait wait} on the returned delivery handle to make sure it is delivered.
@@ -157,7 +136,7 @@ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil,
157136

158137
# Produce the message
159138
response = Rdkafka::Bindings.rd_kafka_producev(
160-
@native_kafka,
139+
@client.native,
161140
*args
162141
)
163142

@@ -176,7 +155,7 @@ def call_delivery_callback(delivery_handle)
176155
end
177156

178157
def closed_producer_check(method)
179-
raise Rdkafka::ClosedProducerError.new(method) if @native_kafka.nil?
158+
raise Rdkafka::ClosedProducerError.new(method) if @client.closed?
180159
end
181160
end
182161
end

lib/rdkafka/producer/client.rb

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
module Rdkafka
2+
class Producer
3+
class Client
4+
def initialize(native)
5+
@native = native
6+
7+
# Start thread to poll client for delivery callbacks
8+
@polling_thread = Thread.new do
9+
loop do
10+
Rdkafka::Bindings.rd_kafka_poll(native, 250)
11+
# Exit thread if closing and the poll queue is empty
12+
if Thread.current[:closing] && Rdkafka::Bindings.rd_kafka_outq_len(native) == 0
13+
break
14+
end
15+
end
16+
end
17+
@polling_thread.abort_on_exception = true
18+
@polling_thread[:closing] = false
19+
end
20+
21+
def native
22+
@native
23+
end
24+
25+
def finalizer
26+
->(_) { close }
27+
end
28+
29+
def closed?
30+
@native.nil?
31+
end
32+
33+
def close(object_id=nil)
34+
return unless @native
35+
36+
# Indicate to polling thread that we're closing
37+
@polling_thread[:closing] = true
38+
# Wait for the polling thread to finish up
39+
@polling_thread.join
40+
41+
Rdkafka::Bindings.rd_kafka_destroy(@native)
42+
43+
@native = nil
44+
end
45+
end
46+
end
47+
end
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
require "spec_helper"
2+
3+
describe Rdkafka::Producer::Client do
4+
let(:native) { double }
5+
let(:closing) { false }
6+
let(:thread) { double(Thread) }
7+
8+
subject(:client) { described_class.new(native) }
9+
10+
before do
11+
allow(Rdkafka::Bindings).to receive(:rd_kafka_poll).with(native, 250)
12+
allow(Rdkafka::Bindings).to receive(:rd_kafka_outq_len).with(native).and_return(0)
13+
allow(Rdkafka::Bindings).to receive(:rd_kafka_destroy)
14+
allow(Thread).to receive(:new).and_return(thread)
15+
16+
allow(thread).to receive(:[]=).with(:closing, anything)
17+
allow(thread).to receive(:join)
18+
allow(thread).to receive(:abort_on_exception=).with(anything)
19+
end
20+
21+
context "defaults" do
22+
it "sets the thread to abort on exception" do
23+
expect(thread).to receive(:abort_on_exception=).with(true)
24+
25+
client
26+
end
27+
28+
it "sets the thread `closing` flag to false" do
29+
expect(thread).to receive(:[]=).with(:closing, false)
30+
31+
client
32+
end
33+
end
34+
35+
context "the polling thread" do
36+
it "is created" do
37+
expect(Thread).to receive(:new)
38+
39+
client
40+
end
41+
42+
it "polls the native with default 250ms timeout" do
43+
polling_loop_expects do
44+
expect(Rdkafka::Bindings).to receive(:rd_kafka_poll).with(native, 250)
45+
end
46+
end
47+
48+
it "check the out queue of native client" do
49+
polling_loop_expects do
50+
expect(Rdkafka::Bindings).to receive(:rd_kafka_outq_len).with(native)
51+
end
52+
end
53+
end
54+
55+
def polling_loop_expects(&block)
56+
Thread.current[:closing] = true # this forces the loop break with line #12
57+
58+
allow(Thread).to receive(:new).and_yield do |_|
59+
block.call
60+
end.and_return(thread)
61+
62+
client
63+
end
64+
65+
it "exposes `native` client" do
66+
expect(client.native).to eq(native)
67+
end
68+
69+
context "when client was not yet closed (`nil`)" do
70+
it "is not closed" do
71+
expect(client.closed?).to eq(false)
72+
end
73+
74+
context "and attempt to close" do
75+
it "calls the `destroy` binding" do
76+
expect(Rdkafka::Bindings).to receive(:rd_kafka_destroy).with(native)
77+
78+
client.close
79+
end
80+
81+
it "indicates to the polling thread that it is closing" do
82+
expect(thread).to receive(:[]=).with(:closing, true)
83+
84+
client.close
85+
end
86+
87+
it "joins the polling thread" do
88+
expect(thread).to receive(:join)
89+
90+
client.close
91+
end
92+
93+
it "closes and unassign the native client" do
94+
client.close
95+
96+
expect(client.native).to eq(nil)
97+
expect(client.closed?).to eq(true)
98+
end
99+
end
100+
end
101+
102+
context "when client was already closed" do
103+
before { client.close }
104+
105+
it "is closed" do
106+
expect(client.closed?).to eq(true)
107+
end
108+
109+
context "and attempt to close again" do
110+
it "does not call the `destroy` binding" do
111+
expect(Rdkafka::Bindings).not_to receive(:rd_kafka_destroy)
112+
113+
client.close
114+
end
115+
116+
it "does not indicate to the polling thread that it is closing" do
117+
expect(thread).not_to receive(:[]=).with(:closing, true)
118+
119+
client.close
120+
end
121+
122+
it "does not join the polling thread" do
123+
expect(thread).not_to receive(:join)
124+
125+
client.close
126+
end
127+
128+
it "does not close and unassign the native client again" do
129+
client.close
130+
131+
expect(client.native).to eq(nil)
132+
expect(client.closed?).to eq(true)
133+
end
134+
end
135+
end
136+
137+
it "provide a finalizer Proc that closes the `native` client" do
138+
expect(client.closed?).to eq(false)
139+
140+
client.finalizer.call("some-ignored-object-id")
141+
142+
expect(client.closed?).to eq(true)
143+
end
144+
end

0 commit comments

Comments
 (0)