File tree Expand file tree Collapse file tree 2 files changed +48
-45
lines changed Expand file tree Collapse file tree 2 files changed +48
-45
lines changed Original file line number Diff line number Diff line change @@ -179,11 +179,55 @@ def producer
179179 # Set callback to receive delivery reports on config
180180 Rdkafka ::Bindings . rd_kafka_conf_set_dr_msg_cb ( config , Rdkafka ::Callbacks ::DeliveryCallbackFunction )
181181 # Return producer with Kafka client
182- Rdkafka ::Producer . new ( native_kafka ( config , :rd_kafka_producer ) ) . tap do |producer |
182+ Rdkafka ::Producer . new ( ProducerClient . new ( native_kafka ( config , :rd_kafka_producer ) ) ) . tap do |producer |
183183 opaque . producer = producer
184184 end
185185 end
186186
187+ class ProducerClient
188+ def initialize ( native )
189+ @native = native
190+
191+ # Start thread to poll client for delivery callbacks
192+ @polling_thread = Thread . new do
193+ loop do
194+ Rdkafka ::Bindings . rd_kafka_poll ( native , 250 )
195+ # Exit thread if closing and the poll queue is empty
196+ if Thread . current [ :closing ] && Rdkafka ::Bindings . rd_kafka_outq_len ( native ) == 0
197+ break
198+ end
199+ end
200+ end
201+ @polling_thread . abort_on_exception = true
202+ @polling_thread [ :closing ] = false
203+ end
204+
205+ def native
206+ @native
207+ end
208+
209+ def finalizer
210+ -> ( _ ) { close }
211+ end
212+
213+ def closed?
214+ @native . nil?
215+ end
216+
217+ def close ( object_id = nil )
218+ return unless @native
219+
220+ # Indicate to polling thread that we're closing
221+ @polling_thread [ :closing ] = true
222+ # Wait for the polling thread to finish up
223+ @polling_thread . join
224+
225+ Rdkafka ::Bindings . rd_kafka_destroy ( @native )
226+
227+ @native = nil
228+ end
229+ end
230+
187231 # Create an admin instance with this configuration.
188232 #
189233 # @raise [ConfigError] When the configuration contains invalid options
Original file line number Diff line number Diff line change @@ -10,52 +10,11 @@ class Producer
1010 attr_reader :delivery_callback
1111
1212 # @private
13- def initialize ( native_kafka )
14- @client = ProducerClient . new ( native_kafka )
13+ def initialize ( client )
14+ @client = client
1515
1616 # Makes sure, that the producer gets closed before it gets GCed by Ruby
17- ObjectSpace . define_finalizer ( self , @client )
18- end
19-
20- class ProducerClient
21- def initialize ( native )
22- @native = native
23-
24- # Start thread to poll client for delivery callbacks
25- @polling_thread = Thread . new do
26- loop do
27- Rdkafka ::Bindings . rd_kafka_poll ( native , 250 )
28- # Exit thread if closing and the poll queue is empty
29- if Thread . current [ :closing ] && Rdkafka ::Bindings . rd_kafka_outq_len ( native ) == 0
30- break
31- end
32- end
33- end
34- @polling_thread . abort_on_exception = true
35- @polling_thread [ :closing ] = false
36- end
37-
38- def native
39- @native
40- end
41-
42- def closed?
43- @native . nil?
44- end
45-
46- def close ( object_id = nil )
47- return unless @native
48-
49- # Indicate to polling thread that we're closing
50- @polling_thread [ :closing ] = true
51- # Wait for the polling thread to finish up
52- @polling_thread . join
53-
54- Rdkafka ::Bindings . rd_kafka_destroy ( @native )
55-
56- @native = nil
57- end
58- alias call close
17+ ObjectSpace . define_finalizer ( self , client . finalizer )
5918 end
6019
6120 # Set a callback that will be called every time a message is successfully produced.
You can’t perform that action at this time.
0 commit comments