File tree Expand file tree Collapse file tree 2 files changed +48
-45
lines changed Expand file tree Collapse file tree 2 files changed +48
-45
lines changed Original file line number Diff line number Diff line change @@ -179,55 +179,11 @@ def producer
179179 # Set callback to receive delivery reports on config
180180 Rdkafka ::Bindings . rd_kafka_conf_set_dr_msg_cb ( config , Rdkafka ::Callbacks ::DeliveryCallbackFunction )
181181 # Return producer with Kafka client
182- Rdkafka ::Producer . new ( ProducerClient . new ( native_kafka ( config , :rd_kafka_producer ) ) ) . tap do |producer |
182+ Rdkafka ::Producer . new ( Rdkafka :: Producer :: Client . new ( native_kafka ( config , :rd_kafka_producer ) ) ) . tap do |producer |
183183 opaque . producer = producer
184184 end
185185 end
186186
187- class ProducerClient
188- def initialize ( native )
189- @native = native
190-
191- # Start thread to poll client for delivery callbacks
192- @polling_thread = Thread . new do
193- loop do
194- Rdkafka ::Bindings . rd_kafka_poll ( native , 250 )
195- # Exit thread if closing and the poll queue is empty
196- if Thread . current [ :closing ] && Rdkafka ::Bindings . rd_kafka_outq_len ( native ) == 0
197- break
198- end
199- end
200- end
201- @polling_thread . abort_on_exception = true
202- @polling_thread [ :closing ] = false
203- end
204-
205- def native
206- @native
207- end
208-
209- def finalizer
210- -> ( _ ) { close }
211- end
212-
213- def closed?
214- @native . nil?
215- end
216-
217- def close ( object_id = nil )
218- return unless @native
219-
220- # Indicate to polling thread that we're closing
221- @polling_thread [ :closing ] = true
222- # Wait for the polling thread to finish up
223- @polling_thread . join
224-
225- Rdkafka ::Bindings . rd_kafka_destroy ( @native )
226-
227- @native = nil
228- end
229- end
230-
231187 # Create an admin instance with this configuration.
232188 #
233189 # @raise [ConfigError] When the configuration contains invalid options
Original file line number Diff line number Diff line change 1+ module Rdkafka
2+ class Producer
3+ class Client
4+ def initialize ( native )
5+ @native = native
6+
7+ # Start thread to poll client for delivery callbacks
8+ @polling_thread = Thread . new do
9+ loop do
10+ Rdkafka ::Bindings . rd_kafka_poll ( native , 250 )
11+ # Exit thread if closing and the poll queue is empty
12+ if Thread . current [ :closing ] && Rdkafka ::Bindings . rd_kafka_outq_len ( native ) == 0
13+ break
14+ end
15+ end
16+ end
17+ @polling_thread . abort_on_exception = true
18+ @polling_thread [ :closing ] = false
19+ end
20+
21+ def native
22+ @native
23+ end
24+
25+ def finalizer
26+ -> ( _ ) { close }
27+ end
28+
29+ def closed?
30+ @native . nil?
31+ end
32+
33+ def close ( object_id = nil )
34+ return unless @native
35+
36+ # Indicate to polling thread that we're closing
37+ @polling_thread [ :closing ] = true
38+ # Wait for the polling thread to finish up
39+ @polling_thread . join
40+
41+ Rdkafka ::Bindings . rd_kafka_destroy ( @native )
42+
43+ @native = nil
44+ end
45+ end
46+ end
47+ end
You can’t perform that action at this time.
0 commit comments