diff --git a/app/models/solid_queue/record.rb b/app/models/solid_queue/record.rb index d73e41b2..d88b833c 100644 --- a/app/models/solid_queue/record.rb +++ b/app/models/solid_queue/record.rb @@ -6,6 +6,11 @@ class Record < ActiveRecord::Base connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to + def self.clear_all_connections! + self.connection_handler.clear_all_connections!(:writing) + self.connection_handler.clear_all_connections!(:reading) + end + def self.non_blocking_lock if SolidQueue.use_skip_locked lock(Arel.sql("FOR UPDATE SKIP LOCKED")) diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 1e1961e6..fb90e946 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -20,6 +20,8 @@ module SolidQueue extend self + ForkSafety # add fork safety hooks + DEFAULT_LOGGER = ActiveSupport::Logger.new($stdout) mattr_accessor :logger, default: DEFAULT_LOGGER @@ -41,30 +43,20 @@ module SolidQueue mattr_accessor :clear_finished_jobs_after, default: 1.day mattr_accessor :default_concurrency_control_period, default: 3.minutes - delegate :on_start, :on_stop, to: Supervisor + delegate :on_start, :on_stop, :on_exit, to: Supervisor - def on_worker_start(...) - Worker.on_start(...) - end + [ Dispatcher, Scheduler, Worker ].each do |process| + define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block| + process.on_start { block.call } + end - def on_worker_stop(...) - Worker.on_stop(...) - end - - def on_dispatcher_start(...) - Dispatcher.on_start(...) - end - - def on_dispatcher_stop(...) - Dispatcher.on_stop(...) - end - - def on_scheduler_start(...) - Scheduler.on_start(...) - end + define_singleton_method(:"on_#{process.name.demodulize.downcase}_stop") do |&block| + process.on_stop { block.call } + end - def on_scheduler_stop(...) - Scheduler.on_stop(...) + define_singleton_method(:"on_#{process.name.demodulize.downcase}_exit") do |&block| + process.on_exit { block.call } + end end def supervisor? diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index a443df2e..6f7ec245 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -8,7 +8,8 @@ class Dispatcher < Processes::Poller after_boot :run_start_hooks after_boot :start_concurrency_maintenance before_shutdown :stop_concurrency_maintenance - after_shutdown :run_stop_hooks + before_shutdown :run_stop_hooks + after_shutdown :run_exit_hooks def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS) diff --git a/lib/solid_queue/fork_safety.rb b/lib/solid_queue/fork_safety.rb new file mode 100644 index 00000000..2c6abd53 --- /dev/null +++ b/lib/solid_queue/fork_safety.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module SolidQueue + module ForkSafety + def _fork + Record.clear_all_connections! + + pid = super + + pid + end + end +end + +Process.singleton_class.prepend(SolidQueue::ForkSafety) diff --git a/lib/solid_queue/lifecycle_hooks.rb b/lib/solid_queue/lifecycle_hooks.rb index fabddac4..0403459a 100644 --- a/lib/solid_queue/lifecycle_hooks.rb +++ b/lib/solid_queue/lifecycle_hooks.rb @@ -5,7 +5,7 @@ module LifecycleHooks extend ActiveSupport::Concern included do - mattr_reader :lifecycle_hooks, default: { start: [], stop: [] } + mattr_reader :lifecycle_hooks, default: { start: [], stop: [], exit: [] } end class_methods do @@ -17,7 +17,12 @@ def on_stop(&block) self.lifecycle_hooks[:stop] << block end + def on_exit(&block) + self.lifecycle_hooks[:exit] << block + end + def clear_hooks + self.lifecycle_hooks[:exit] = [] self.lifecycle_hooks[:start] = [] self.lifecycle_hooks[:stop] = [] end @@ -32,6 +37,10 @@ def run_stop_hooks run_hooks_for :stop end + def run_exit_hooks + run_hooks_for :exit + end + def run_hooks_for(event) self.class.lifecycle_hooks.fetch(event, []).each do |block| block.call diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index b68075dc..d3164ed5 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -11,6 +11,7 @@ class Scheduler < Processes::Base after_boot :schedule_recurring_tasks before_shutdown :unschedule_recurring_tasks before_shutdown :run_stop_hooks + after_shutdown :run_exit_hooks def initialize(recurring_tasks:, **options) @recurring_schedule = RecurringSchedule.new(recurring_tasks) diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index e8f075eb..f2207691 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -5,6 +5,8 @@ class Supervisor < Processes::Base include LifecycleHooks include Maintenance, Signals, Pidfiled + after_shutdown :run_exit_hooks + class << self def start(**options) SolidQueue.supervisor = true diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index f34a14f0..54d4d870 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -6,7 +6,7 @@ class Worker < Processes::Poller after_boot :run_start_hooks before_shutdown :run_stop_hooks - + after_shutdown :run_exit_hooks attr_accessor :queues, :pool diff --git a/test/integration/lifecycle_hooks_test.rb b/test/integration/lifecycle_hooks_test.rb index 8bc4dc94..7da73228 100644 --- a/test/integration/lifecycle_hooks_test.rb +++ b/test/integration/lifecycle_hooks_test.rb @@ -8,15 +8,19 @@ class LifecycleHooksTest < ActiveSupport::TestCase test "run lifecycle hooks" do SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) } SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) } + SolidQueue.on_exit { JobResult.create!(status: :hook_called, value: :exit) } SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) } SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) } + SolidQueue.on_worker_exit { JobResult.create!(status: :hook_called, value: :worker_exit) } SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) } SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) } + SolidQueue.on_dispatcher_exit { JobResult.create!(status: :hook_called, value: :dispatcher_exit) } SolidQueue.on_scheduler_start { JobResult.create!(status: :hook_called, value: :scheduler_start) } SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) } + SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_exit) } pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false) wait_for_registered_processes(4) @@ -24,13 +28,20 @@ class LifecycleHooksTest < ActiveSupport::TestCase terminate_process(pid) wait_for_registered_processes(0) + results = skip_active_record_query_cache do - assert_equal 8, JobResult.count - JobResult.last(8) + job_results = JobResult.where(status: :hook_called) + assert_equal 12, job_results.count + job_results end - assert_equal({ "hook_called" => 8 }, results.map(&:status).tally) - assert_equal %w[start stop worker_start worker_stop dispatcher_start dispatcher_stop scheduler_start scheduler_stop].sort, results.map(&:value).sort + assert_equal({ "hook_called" => 12 }, results.map(&:status).tally) + assert_equal %w[ + start stop exit + worker_start worker_stop worker_exit + dispatcher_start dispatcher_stop dispatcher_exit + scheduler_start scheduler_stop scheduler_exit + ].sort, results.map(&:value).sort ensure SolidQueue::Supervisor.clear_hooks SolidQueue::Worker.clear_hooks diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index ebb1100c..69086761 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -127,7 +127,7 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase signal_process(@pid, :TERM, wait: 0.5) - sleep(SolidQueue.shutdown_timeout + 0.5.second) + sleep(SolidQueue.shutdown_timeout + 0.1.second) assert_completed_job_results("no pause") assert_job_status(no_pause, :finished)