diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 21b023bc..78ecd1a7 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -64,4 +64,4 @@ jobs: run: | bin/rails db:setup - name: Run tests - run: bin/rails test + run: bin/rails test test/integration/processes_lifecycle_test.rb:138 diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index c2b13909..05f3f34b 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -18,6 +18,7 @@ def claiming(job_ids, process_id, &block) SolidQueue.instrument(:claim, process_id: process_id, job_ids: job_ids) do |payload| insert_all!(job_data) where(job_id: job_ids, process_id: process_id).load.tap do |claimed| + puts "claimed executions added: #{claimed.map(&:id)}" block.call(claimed) payload[:size] = claimed.size @@ -101,6 +102,7 @@ def execute def finished transaction do job.finished! + puts "Removing claimed execution id=#{id},job_id=#{job_id}" destroy! end end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index e2146a67..3a4cd897 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -87,6 +87,12 @@ def finished? finished_at.present? end + def statuses + %w[ ready claimed failed ].filter_map do |status| + public_send("#{status}_execution").present? ? status : nil + end + end + def status if finished? :finished diff --git a/app/models/solid_queue/ready_execution.rb b/app/models/solid_queue/ready_execution.rb index 35a11292..8386c118 100644 --- a/app/models/solid_queue/ready_execution.rb +++ b/app/models/solid_queue/ready_execution.rb @@ -38,6 +38,7 @@ def lock_candidates(executions, process_id) SolidQueue::ClaimedExecution.claiming(executions.map(&:job_id), process_id) do |claimed| ids_to_delete = executions.index_by(&:job_id).values_at(*claimed.map(&:job_id)).map(&:id) + puts "Deleting ready_executions #{ids_to_delete}" where(id: ids_to_delete).delete_all end end diff --git a/app/models/solid_queue/record.rb b/app/models/solid_queue/record.rb index d73e41b2..d88b833c 100644 --- a/app/models/solid_queue/record.rb +++ b/app/models/solid_queue/record.rb @@ -6,6 +6,11 @@ class Record < ActiveRecord::Base connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to + def self.clear_all_connections! + self.connection_handler.clear_all_connections!(:writing) + self.connection_handler.clear_all_connections!(:reading) + end + def self.non_blocking_lock if SolidQueue.use_skip_locked lock(Arel.sql("FOR UPDATE SKIP LOCKED")) diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 02b88d05..fb90e946 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -20,6 +20,8 @@ module SolidQueue extend self + ForkSafety # add fork safety hooks + DEFAULT_LOGGER = ActiveSupport::Logger.new($stdout) mattr_accessor :logger, default: DEFAULT_LOGGER diff --git a/lib/solid_queue/fork_safety.rb b/lib/solid_queue/fork_safety.rb new file mode 100644 index 00000000..2c6abd53 --- /dev/null +++ b/lib/solid_queue/fork_safety.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module SolidQueue + module ForkSafety + def _fork + Record.clear_all_connections! + + pid = super + + pid + end + end +end + +Process.singleton_class.prepend(SolidQueue::ForkSafety) diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index 33b441f6..1fbafb42 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -19,8 +19,12 @@ def start def stop super + puts "#{self.name} stopping" + wake_up @thread&.join + + puts "#{self.name} stopped" end private diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index f2207691..c66ff64a 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -93,6 +93,7 @@ def set_procline end def terminate_gracefully + puts "#{self.name} terminate_gracefully" SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| term_forks @@ -134,6 +135,7 @@ def term_forks end def quit_forks + puts "#{self.name} quitting: #{forks.keys}" signal_processes(forks.keys, :QUIT) end @@ -151,6 +153,7 @@ def reap_terminated_forks pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) break unless pid + puts "#{self.name} reaping #{pid}, #{status}" if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0) handle_claimed_jobs_by(terminated_fork, status) end diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index ebb1100c..8150f767 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -121,20 +121,27 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase test "term supervisor exceeding timeout while there are jobs in-flight" do no_pause = enqueue_store_result_job("no pause") + puts "no_pause job | id=#{get_job(no_pause).id}" + puts "no_pause statuses #{get_job(no_pause).statuses}" pause = enqueue_store_result_job("pause", pause: SolidQueue.shutdown_timeout + 10.second) + puts "pause job | id=#{get_job(pause).id}" + puts "pause statuses #{get_job(pause).statuses}" wait_while_with_timeout(1.second) { SolidQueue::ReadyExecution.count > 0 } + puts "Signaling TERM" signal_process(@pid, :TERM, wait: 0.5) - sleep(SolidQueue.shutdown_timeout + 0.5.second) + sleep(SolidQueue.shutdown_timeout + 0.1.second) assert_completed_job_results("no pause") + puts "no_pause statuses #{get_job(no_pause).statuses}" assert_job_status(no_pause, :finished) # This job was left claimed as the worker was shutdown without # a chance to terminate orderly assert_started_job_result("pause") + puts "pause statuses #{get_job(pause).statuses}" assert_job_status(pause, :claimed) # The process running the long job couldn't deregister, the other did @@ -300,6 +307,12 @@ def enqueue_store_result_job(value, queue_name = :background, **options) StoreResultJob.set(queue: queue_name).perform_later(value, **options) end + def get_job(active_job) + skip_active_record_query_cache do + SolidQueue::Job.find_by(active_job_id: active_job.job_id) + end + end + def assert_completed_job_results(value, queue_name = :background, count = 1) skip_active_record_query_cache do assert_equal count, JobResult.where(queue_name: queue_name, status: "completed", value: value).count @@ -320,7 +333,7 @@ def assert_job_status(active_job, status) # might have been deleted in the forked processes. skip_active_record_query_cache do job = SolidQueue::Job.find_by(active_job_id: active_job.job_id) - assert job.public_send("#{status}?") + assert_equal status, job.status end end end diff --git a/test/test_helper.rb b/test/test_helper.rb index f54b73f2..5e53bac8 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -26,6 +26,21 @@ def write(...) Logger::LogDevice.prepend(BlockLogDeviceTimeoutExceptions) class ExpectedTestError < RuntimeError; end +ActiveSupport::Notifications.subscribe('register_process.solid_queue') do |event| + puts "#{event.name} | #{event.duration} | #{event.payload}" +end +ActiveSupport::Notifications.subscribe('start_process.solid_queue') do |event| + puts "#{event.name} | #{event.duration} | #{event.payload[:process].name} | id=#{event.payload[:process].instance_variable_get(:@process).id},pid=#{event.payload[:process].pid}" +end +ActiveSupport::Notifications.subscribe('claim.solid_queue') do |event| + puts "#{event.name} | #{event.duration} | #{event.payload}" +end +# ActiveSupport::Notifications.subscribe(/solid_queue$/) do |event| +# puts "#{event.name}?" +# end +ActiveRecord::Base.logger = Logger.new STDOUT +SolidQueue.silence_polling = false + class ActiveSupport::TestCase include ConfigurationTestHelper, ProcessesTestHelper, JobsTestHelper