Skip to content

Commit d25b1fb

Browse files
committed
Use a finalizer bound to the object that will be finalized
Previous object finalizer was bound to a string (`@id`) being created on initialization instead of the object itself. This aimed to remove a warning about object finalizer reference after reading [1] but changed the first argument instead of the second that is mentioned on [1]: `proc { close }` has binding to the own object. The current implementation removes the warning but still does not work to run the finalizer when the reference to the producer is removed (like `producer = nil`) therefore running only on exit. This behavior is explained on [2]. Ref: 1. https://www.mikeperham.com/2010/02/24/the-trouble-with-ruby-finalizers/ 2. https://ruby-doc.org/core-3.0.2/ObjectSpace.html#method-c-define_finalizer --- Changelog: fix Ref: https://www.mikeperham.com/2010/02/24/the-trouble-with-ruby-finalizers/ Ref: https://ruby-doc.org/core-3.0.2/ObjectSpace.html#method-c-define_finalizer See-also: https://www.mikeperham.com/2010/02/24/the-trouble-with-ruby-finalizers/ See-also: https://ruby-doc.org/core-3.0.2/ObjectSpace.html#method-c-define_finalizer
1 parent a336c80 commit d25b1fb

File tree

1 file changed

+27
-14
lines changed

1 file changed

+27
-14
lines changed

lib/rdkafka/producer.rb

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
require "securerandom"
1+
require "objspace"
22

33
module Rdkafka
44
# A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that.
@@ -9,14 +9,31 @@ class Producer
99
# @return [Proc, nil]
1010
attr_reader :delivery_callback
1111

12+
class ClientFinalizer
13+
def initialize(client, polling_thread, reference_destroyer)
14+
@client = client
15+
@polling_thread = polling_thread
16+
@reference_destroyer = reference_destroyer
17+
end
18+
19+
def call(object_id)
20+
return unless @client
21+
22+
# Indicate to polling thread that we're closing
23+
@polling_thread[:closing] = true
24+
# Wait for the polling thread to finish up
25+
@polling_thread.join
26+
27+
Rdkafka::Bindings.rd_kafka_destroy(@client)
28+
29+
@reference_destroyer.call
30+
end
31+
end
32+
1233
# @private
1334
def initialize(native_kafka)
14-
@id = SecureRandom.uuid
1535
@native_kafka = native_kafka
1636

17-
# Makes sure, that the producer gets closed before it gets GCed by Ruby
18-
ObjectSpace.define_finalizer(@id, proc { close })
19-
2037
# Start thread to poll client for delivery callbacks
2138
@polling_thread = Thread.new do
2239
loop do
@@ -29,6 +46,9 @@ def initialize(native_kafka)
2946
end
3047
@polling_thread.abort_on_exception = true
3148
@polling_thread[:closing] = false
49+
50+
# Makes sure, that the producer gets closed before it gets GCed by Ruby
51+
ObjectSpace.define_finalizer(self, ClientFinalizer.new(@native_kafka, @polling_thread, proc { @native_kafka = nil }))
3252
end
3353

3454
# Set a callback that will be called every time a message is successfully produced.
@@ -44,16 +64,9 @@ def delivery_callback=(callback)
4464

4565
# Close this producer and wait for the internal poll queue to empty.
4666
def close
47-
ObjectSpace.undefine_finalizer(@id)
48-
49-
return unless @native_kafka
67+
ObjectSpace.undefine_finalizer(self)
5068

51-
# Indicate to polling thread that we're closing
52-
@polling_thread[:closing] = true
53-
# Wait for the polling thread to finish up
54-
@polling_thread.join
55-
Rdkafka::Bindings.rd_kafka_destroy(@native_kafka)
56-
@native_kafka = nil
69+
ClientFinalizer.new(@native_kafka, @polling_thread, proc { @native_kafka = nil }).call(self.object_id)
5770
end
5871

5972
# Partition count for a given topic.

0 commit comments

Comments
 (0)