File tree Expand file tree Collapse file tree 1 file changed +3
-3
lines changed Expand file tree Collapse file tree 1 file changed +3
-3
lines changed Original file line number Diff line number Diff line change @@ -12,7 +12,6 @@ class Producer
1212 # @private
1313 def initialize ( native_kafka )
1414 @id = SecureRandom . uuid
15- @closing = false
1615 @native_kafka = native_kafka
1716
1817 # Makes sure, that the producer gets closed before it gets GCed by Ruby
@@ -23,12 +22,13 @@ def initialize(native_kafka)
2322 loop do
2423 Rdkafka ::Bindings . rd_kafka_poll ( @native_kafka , 250 )
2524 # Exit thread if closing and the poll queue is empty
26- if @ closing && Rdkafka ::Bindings . rd_kafka_outq_len ( @native_kafka ) == 0
25+ if Thread . current [ : closing] && Rdkafka ::Bindings . rd_kafka_outq_len ( @native_kafka ) == 0
2726 break
2827 end
2928 end
3029 end
3130 @polling_thread . abort_on_exception = true
31+ @polling_thread [ :closing ] = false
3232 end
3333
3434 # Set a callback that will be called every time a message is successfully produced.
@@ -49,7 +49,7 @@ def close
4949 return unless @native_kafka
5050
5151 # Indicate to polling thread that we're closing
52- @closing = true
52+ @polling_thread [ : closing] = true
5353 # Wait for the polling thread to finish up
5454 @polling_thread . join
5555 Rdkafka ::Bindings . rd_kafka_destroy ( @native_kafka )
You can’t perform that action at this time.
0 commit comments