Skip to content

Commit a28aaef

Browse files
authored
mitigate ffi bugs (#491)
1 parent 24a7b7e commit a28aaef

File tree

4 files changed

+56
-15
lines changed

4 files changed

+56
-15
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## 0.18.0 (Unreleased)
44
- [Enhancement] Update `librdkafka` to `2.5.0`
5+
- [Fix] Mitigate a case where FFI would not restart the background events callback dispatcher in forks
56

67
## 0.17.0 (2024-08-03)
78
- [Feature] Add `#seek_by` to be able to seek for a message by topic, partition and offset (zinahia)

lib/rdkafka/callbacks.rb

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
module Rdkafka
44
module Callbacks
5-
65
# Extracts attributes of a rd_kafka_topic_result_t
76
#
87
# @private
@@ -149,13 +148,6 @@ def initialize(event_ptr)
149148
end
150149
end
151150

152-
# FFI Function used for Create Topic and Delete Topic callbacks
153-
BackgroundEventCallbackFunction = FFI::Function.new(
154-
:void, [:pointer, :pointer, :pointer]
155-
) do |client_ptr, event_ptr, opaque_ptr|
156-
BackgroundEventCallback.call(client_ptr, event_ptr, opaque_ptr)
157-
end
158-
159151
# @private
160152
class BackgroundEventCallback
161153
def self.call(_, event_ptr, _)
@@ -348,13 +340,6 @@ def self.process_describe_acl(event_ptr)
348340
end
349341
end
350342

351-
# FFI Function used for Message Delivery callbacks
352-
DeliveryCallbackFunction = FFI::Function.new(
353-
:void, [:pointer, :pointer, :pointer]
354-
) do |client_ptr, message_ptr, opaque_ptr|
355-
DeliveryCallback.call(client_ptr, message_ptr, opaque_ptr)
356-
end
357-
358343
# @private
359344
class DeliveryCallback
360345
def self.call(_, message_ptr, opaque_ptr)
@@ -387,5 +372,44 @@ def self.call(_, message_ptr, opaque_ptr)
387372
end
388373
end
389374
end
375+
376+
@@mutex = Mutex.new
377+
@@current_pid = nil
378+
379+
class << self
380+
# Defines or recreates after fork callbacks that require FFI thread so the callback thread
381+
# is always correctly initialized
382+
#
383+
# @see https://github.com/ffi/ffi/issues/1114
384+
def ensure_ffi_running
385+
@@mutex.synchronize do
386+
return if @@current_pid == ::Process.pid
387+
388+
if const_defined?(:BackgroundEventCallbackFunction, false)
389+
send(:remove_const, :BackgroundEventCallbackFunction)
390+
send(:remove_const, :DeliveryCallbackFunction)
391+
end
392+
393+
# FFI Function used for Create Topic and Delete Topic callbacks
394+
background_event_callback_function = FFI::Function.new(
395+
:void, [:pointer, :pointer, :pointer]
396+
) do |client_ptr, event_ptr, opaque_ptr|
397+
BackgroundEventCallback.call(client_ptr, event_ptr, opaque_ptr)
398+
end
399+
400+
# FFI Function used for Message Delivery callbacks
401+
delivery_callback_function = FFI::Function.new(
402+
:void, [:pointer, :pointer, :pointer]
403+
) do |client_ptr, message_ptr, opaque_ptr|
404+
DeliveryCallback.call(client_ptr, message_ptr, opaque_ptr)
405+
end
406+
407+
const_set(:BackgroundEventCallbackFunction, background_event_callback_function)
408+
const_set(:DeliveryCallbackFunction, delivery_callback_function)
409+
410+
@@current_pid = ::Process.pid
411+
end
412+
end
413+
end
390414
end
391415
end

lib/rdkafka/config.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ def self.opaques
146146
#
147147
# @return [Config]
148148
def initialize(config_hash = {})
149+
Callbacks.ensure_ffi_running
150+
149151
@config_hash = DEFAULT_CONFIG.merge(config_hash)
150152
@consumer_rebalance_listener = nil
151153
@consumer_poll_set = true

spec/rdkafka/admin_spec.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,4 +737,18 @@
737737
end
738738
end
739739
end
740+
741+
context "when operating from a fork" do
742+
# @see https://github.com/ffi/ffi/issues/1114
743+
it 'expect to be able to create topics and run other admin operations without hanging' do
744+
# If the FFI issue is not mitigated, this will hang forever
745+
pid = fork do
746+
admin
747+
.create_topic(topic_name, topic_partition_count, topic_replication_factor)
748+
.wait
749+
end
750+
751+
Process.wait(pid)
752+
end
753+
end
740754
end

0 commit comments

Comments
 (0)