From 9cd6bc3a16826d04c63ba667c93bd8cbb094db81 Mon Sep 17 00:00:00 2001 From: "thomas.crambert" Date: Wed, 29 Jan 2025 14:10:10 +0100 Subject: [PATCH 1/5] feat: add on_exit hook --- lib/solid_queue.rb | 32 ++++++++---------------- lib/solid_queue/dispatcher.rb | 3 ++- lib/solid_queue/lifecycle_hooks.rb | 11 +++++++- lib/solid_queue/scheduler.rb | 1 + lib/solid_queue/supervisor.rb | 2 ++ lib/solid_queue/worker.rb | 2 +- test/integration/lifecycle_hooks_test.rb | 19 +++++++++++--- 7 files changed, 42 insertions(+), 28 deletions(-) diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 1e1961e6..02b88d05 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -41,30 +41,20 @@ module SolidQueue mattr_accessor :clear_finished_jobs_after, default: 1.day mattr_accessor :default_concurrency_control_period, default: 3.minutes - delegate :on_start, :on_stop, to: Supervisor + delegate :on_start, :on_stop, :on_exit, to: Supervisor - def on_worker_start(...) - Worker.on_start(...) - end - - def on_worker_stop(...) - Worker.on_stop(...) - end - - def on_dispatcher_start(...) - Dispatcher.on_start(...) - end - - def on_dispatcher_stop(...) - Dispatcher.on_stop(...) - end + [ Dispatcher, Scheduler, Worker ].each do |process| + define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block| + process.on_start { block.call } + end - def on_scheduler_start(...) - Scheduler.on_start(...) - end + define_singleton_method(:"on_#{process.name.demodulize.downcase}_stop") do |&block| + process.on_stop { block.call } + end - def on_scheduler_stop(...) - Scheduler.on_stop(...) + define_singleton_method(:"on_#{process.name.demodulize.downcase}_exit") do |&block| + process.on_exit { block.call } + end end def supervisor? diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index a443df2e..6f7ec245 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -8,7 +8,8 @@ class Dispatcher < Processes::Poller after_boot :run_start_hooks after_boot :start_concurrency_maintenance before_shutdown :stop_concurrency_maintenance - after_shutdown :run_stop_hooks + before_shutdown :run_stop_hooks + after_shutdown :run_exit_hooks def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS) diff --git a/lib/solid_queue/lifecycle_hooks.rb b/lib/solid_queue/lifecycle_hooks.rb index fabddac4..0403459a 100644 --- a/lib/solid_queue/lifecycle_hooks.rb +++ b/lib/solid_queue/lifecycle_hooks.rb @@ -5,7 +5,7 @@ module LifecycleHooks extend ActiveSupport::Concern included do - mattr_reader :lifecycle_hooks, default: { start: [], stop: [] } + mattr_reader :lifecycle_hooks, default: { start: [], stop: [], exit: [] } end class_methods do @@ -17,7 +17,12 @@ def on_stop(&block) self.lifecycle_hooks[:stop] << block end + def on_exit(&block) + self.lifecycle_hooks[:exit] << block + end + def clear_hooks + self.lifecycle_hooks[:exit] = [] self.lifecycle_hooks[:start] = [] self.lifecycle_hooks[:stop] = [] end @@ -32,6 +37,10 @@ def run_stop_hooks run_hooks_for :stop end + def run_exit_hooks + run_hooks_for :exit + end + def run_hooks_for(event) self.class.lifecycle_hooks.fetch(event, []).each do |block| block.call diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index b68075dc..d3164ed5 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -11,6 +11,7 @@ class Scheduler < Processes::Base after_boot :schedule_recurring_tasks before_shutdown :unschedule_recurring_tasks before_shutdown :run_stop_hooks + after_shutdown :run_exit_hooks def initialize(recurring_tasks:, **options) @recurring_schedule = RecurringSchedule.new(recurring_tasks) diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index e8f075eb..f2207691 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -5,6 +5,8 @@ class Supervisor < Processes::Base include LifecycleHooks include Maintenance, Signals, Pidfiled + after_shutdown :run_exit_hooks + class << self def start(**options) SolidQueue.supervisor = true diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index f34a14f0..54d4d870 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -6,7 +6,7 @@ class Worker < Processes::Poller after_boot :run_start_hooks before_shutdown :run_stop_hooks - + after_shutdown :run_exit_hooks attr_accessor :queues, :pool diff --git a/test/integration/lifecycle_hooks_test.rb b/test/integration/lifecycle_hooks_test.rb index 8bc4dc94..7da73228 100644 --- a/test/integration/lifecycle_hooks_test.rb +++ b/test/integration/lifecycle_hooks_test.rb @@ -8,15 +8,19 @@ class LifecycleHooksTest < ActiveSupport::TestCase test "run lifecycle hooks" do SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) } SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) } + SolidQueue.on_exit { JobResult.create!(status: :hook_called, value: :exit) } SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) } SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) } + SolidQueue.on_worker_exit { JobResult.create!(status: :hook_called, value: :worker_exit) } SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) } SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) } + SolidQueue.on_dispatcher_exit { JobResult.create!(status: :hook_called, value: :dispatcher_exit) } SolidQueue.on_scheduler_start { JobResult.create!(status: :hook_called, value: :scheduler_start) } SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) } + SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_exit) } pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false) wait_for_registered_processes(4) @@ -24,13 +28,20 @@ class LifecycleHooksTest < ActiveSupport::TestCase terminate_process(pid) wait_for_registered_processes(0) + results = skip_active_record_query_cache do - assert_equal 8, JobResult.count - JobResult.last(8) + job_results = JobResult.where(status: :hook_called) + assert_equal 12, job_results.count + job_results end - assert_equal({ "hook_called" => 8 }, results.map(&:status).tally) - assert_equal %w[start stop worker_start worker_stop dispatcher_start dispatcher_stop scheduler_start scheduler_stop].sort, results.map(&:value).sort + assert_equal({ "hook_called" => 12 }, results.map(&:status).tally) + assert_equal %w[ + start stop exit + worker_start worker_stop worker_exit + dispatcher_start dispatcher_stop dispatcher_exit + scheduler_start scheduler_stop scheduler_exit + ].sort, results.map(&:value).sort ensure SolidQueue::Supervisor.clear_hooks SolidQueue::Worker.clear_hooks From c96ad6b91ed8be4150beabe9a3de404d352e9fca Mon Sep 17 00:00:00 2001 From: Alexandre Chakroun Date: Wed, 5 Feb 2025 15:21:03 +0100 Subject: [PATCH 2/5] Make sure to close DB connections before fork --- app/models/solid_queue/record.rb | 5 +++++ lib/solid_queue/supervisor.rb | 2 ++ 2 files changed, 7 insertions(+) diff --git a/app/models/solid_queue/record.rb b/app/models/solid_queue/record.rb index d73e41b2..d88b833c 100644 --- a/app/models/solid_queue/record.rb +++ b/app/models/solid_queue/record.rb @@ -6,6 +6,11 @@ class Record < ActiveRecord::Base connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to + def self.clear_all_connections! + self.connection_handler.clear_all_connections!(:writing) + self.connection_handler.clear_all_connections!(:reading) + end + def self.non_blocking_lock if SolidQueue.use_skip_locked lock(Arel.sql("FOR UPDATE SKIP LOCKED")) diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index f2207691..53a46017 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -80,6 +80,8 @@ def start_process(configured_process) instance.mode = :fork end + Record.clear_all_connections! + pid = fork do process_instance.start end From 1ddb833b8c6dddf742b2ff5b2798b5ac933f6049 Mon Sep 17 00:00:00 2001 From: Alexandre Chakroun Date: Thu, 6 Feb 2025 00:39:36 +0100 Subject: [PATCH 3/5] Try disconnecting before each fork made in the lib --- lib/puma/plugin/solid_queue.rb | 2 +- lib/solid_queue.rb | 5 +++++ lib/solid_queue/supervisor.rb | 4 +--- test/integration/puma/plugin_test.rb | 2 +- test/test_helpers/processes_test_helper.rb | 2 +- 5 files changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/puma/plugin/solid_queue.rb b/lib/puma/plugin/solid_queue.rb index eca5fa5f..dd52e127 100644 --- a/lib/puma/plugin/solid_queue.rb +++ b/lib/puma/plugin/solid_queue.rb @@ -12,7 +12,7 @@ def start(launcher) end launcher.events.on_booted do - @solid_queue_pid = fork do + @solid_queue_pid = SolidQueue.safe_fork do Thread.new { monitor_puma } SolidQueue::Supervisor.start end diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 02b88d05..25de0869 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -73,5 +73,10 @@ def instrument(channel, **options, &block) ActiveSupport::Notifications.instrument("#{channel}.solid_queue", **options, &block) end + def safe_fork(&block) + Record.clear_all_connections! + fork { block.call } + end + ActiveSupport.run_load_hooks(:solid_queue, self) end diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 53a46017..219f2cae 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -80,9 +80,7 @@ def start_process(configured_process) instance.mode = :fork end - Record.clear_all_connections! - - pid = fork do + pid = SolidQueue.safe_fork do process_instance.start end diff --git a/test/integration/puma/plugin_test.rb b/test/integration/puma/plugin_test.rb index bac98a2b..fa791b59 100644 --- a/test/integration/puma/plugin_test.rb +++ b/test/integration/puma/plugin_test.rb @@ -15,7 +15,7 @@ class PluginTest < ActiveSupport::TestCase -s config.ru ] - @pid = fork do + @pid = SolidQueue.safe_fork do exec(*cmd) end end diff --git a/test/test_helpers/processes_test_helper.rb b/test/test_helpers/processes_test_helper.rb index 9a6d0f65..3704e7dd 100644 --- a/test/test_helpers/processes_test_helper.rb +++ b/test/test_helpers/processes_test_helper.rb @@ -2,7 +2,7 @@ module ProcessesTestHelper private def run_supervisor_as_fork(**options) - fork do + SolidQueue.safe_fork do SolidQueue::Supervisor.start(**options.with_defaults(skip_recurring: true)) end end From 4e3270653f45ee509d58e127550aa14c5ad50116 Mon Sep 17 00:00:00 2001 From: Alexandre Chakroun Date: Thu, 6 Feb 2025 17:06:37 +0100 Subject: [PATCH 4/5] Use _fork hook instead of custom fork method --- lib/puma/plugin/solid_queue.rb | 2 +- lib/solid_queue.rb | 7 ++----- lib/solid_queue/fork_safety.rb | 15 +++++++++++++++ lib/solid_queue/supervisor.rb | 2 +- test/integration/puma/plugin_test.rb | 2 +- test/test_helpers/processes_test_helper.rb | 2 +- 6 files changed, 21 insertions(+), 9 deletions(-) create mode 100644 lib/solid_queue/fork_safety.rb diff --git a/lib/puma/plugin/solid_queue.rb b/lib/puma/plugin/solid_queue.rb index dd52e127..eca5fa5f 100644 --- a/lib/puma/plugin/solid_queue.rb +++ b/lib/puma/plugin/solid_queue.rb @@ -12,7 +12,7 @@ def start(launcher) end launcher.events.on_booted do - @solid_queue_pid = SolidQueue.safe_fork do + @solid_queue_pid = fork do Thread.new { monitor_puma } SolidQueue::Supervisor.start end diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 25de0869..fb90e946 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -20,6 +20,8 @@ module SolidQueue extend self + ForkSafety # add fork safety hooks + DEFAULT_LOGGER = ActiveSupport::Logger.new($stdout) mattr_accessor :logger, default: DEFAULT_LOGGER @@ -73,10 +75,5 @@ def instrument(channel, **options, &block) ActiveSupport::Notifications.instrument("#{channel}.solid_queue", **options, &block) end - def safe_fork(&block) - Record.clear_all_connections! - fork { block.call } - end - ActiveSupport.run_load_hooks(:solid_queue, self) end diff --git a/lib/solid_queue/fork_safety.rb b/lib/solid_queue/fork_safety.rb new file mode 100644 index 00000000..2c6abd53 --- /dev/null +++ b/lib/solid_queue/fork_safety.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module SolidQueue + module ForkSafety + def _fork + Record.clear_all_connections! + + pid = super + + pid + end + end +end + +Process.singleton_class.prepend(SolidQueue::ForkSafety) diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 219f2cae..f2207691 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -80,7 +80,7 @@ def start_process(configured_process) instance.mode = :fork end - pid = SolidQueue.safe_fork do + pid = fork do process_instance.start end diff --git a/test/integration/puma/plugin_test.rb b/test/integration/puma/plugin_test.rb index fa791b59..bac98a2b 100644 --- a/test/integration/puma/plugin_test.rb +++ b/test/integration/puma/plugin_test.rb @@ -15,7 +15,7 @@ class PluginTest < ActiveSupport::TestCase -s config.ru ] - @pid = SolidQueue.safe_fork do + @pid = fork do exec(*cmd) end end diff --git a/test/test_helpers/processes_test_helper.rb b/test/test_helpers/processes_test_helper.rb index 3704e7dd..9a6d0f65 100644 --- a/test/test_helpers/processes_test_helper.rb +++ b/test/test_helpers/processes_test_helper.rb @@ -2,7 +2,7 @@ module ProcessesTestHelper private def run_supervisor_as_fork(**options) - SolidQueue.safe_fork do + fork do SolidQueue::Supervisor.start(**options.with_defaults(skip_recurring: true)) end end From 57036dcebcf930c20cda67f6ce1bf15aec793ca8 Mon Sep 17 00:00:00 2001 From: Alexandre Chakroun Date: Fri, 7 Feb 2025 23:23:57 +0100 Subject: [PATCH 5/5] just sleep less? --- test/integration/processes_lifecycle_test.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index ebb1100c..69086761 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -127,7 +127,7 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase signal_process(@pid, :TERM, wait: 0.5) - sleep(SolidQueue.shutdown_timeout + 0.5.second) + sleep(SolidQueue.shutdown_timeout + 0.1.second) assert_completed_job_results("no pause") assert_job_status(no_pause, :finished)