Skip to content
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,4 @@ jobs:
run: |
bin/rails db:setup
- name: Run tests
run: bin/rails test
run: bin/rails test test/integration/processes_lifecycle_test.rb:138
2 changes: 2 additions & 0 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def claiming(job_ids, process_id, &block)
SolidQueue.instrument(:claim, process_id: process_id, job_ids: job_ids) do |payload|
insert_all!(job_data)
where(job_id: job_ids, process_id: process_id).load.tap do |claimed|
puts "claimed executions added: #{claimed.map(&:id)}"
block.call(claimed)

payload[:size] = claimed.size
Expand Down Expand Up @@ -101,6 +102,7 @@ def execute
def finished
transaction do
job.finished!
puts "Removing claimed execution id=#{id},job_id=#{job_id}"
destroy!
end
end
Expand Down
6 changes: 6 additions & 0 deletions app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ def finished?
finished_at.present?
end

def statuses
%w[ ready claimed failed ].filter_map do |status|
public_send("#{status}_execution").present? ? status : nil
end
end

def status
if finished?
:finished
Expand Down
1 change: 1 addition & 0 deletions app/models/solid_queue/ready_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def lock_candidates(executions, process_id)

SolidQueue::ClaimedExecution.claiming(executions.map(&:job_id), process_id) do |claimed|
ids_to_delete = executions.index_by(&:job_id).values_at(*claimed.map(&:job_id)).map(&:id)
puts "Deleting ready_executions #{ids_to_delete}"
where(id: ids_to_delete).delete_all
end
end
Expand Down
5 changes: 5 additions & 0 deletions app/models/solid_queue/record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ class Record < ActiveRecord::Base

connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to

def self.clear_all_connections!
self.connection_handler.clear_all_connections!(:writing)
self.connection_handler.clear_all_connections!(:reading)
end

def self.non_blocking_lock
if SolidQueue.use_skip_locked
lock(Arel.sql("FOR UPDATE SKIP LOCKED"))
Expand Down
2 changes: 2 additions & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
module SolidQueue
extend self

ForkSafety # add fork safety hooks

DEFAULT_LOGGER = ActiveSupport::Logger.new($stdout)

mattr_accessor :logger, default: DEFAULT_LOGGER
Expand Down
15 changes: 15 additions & 0 deletions lib/solid_queue/fork_safety.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

module SolidQueue
module ForkSafety
def _fork
Record.clear_all_connections!

pid = super

pid
end
end
end

Process.singleton_class.prepend(SolidQueue::ForkSafety)
4 changes: 4 additions & 0 deletions lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ def start
def stop
super

puts "#{self.name} stopping"

wake_up
@thread&.join

puts "#{self.name} stopped"
end

private
Expand Down
3 changes: 3 additions & 0 deletions lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def set_procline
end

def terminate_gracefully
puts "#{self.name} terminate_gracefully"
SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload|
term_forks

Expand Down Expand Up @@ -134,6 +135,7 @@ def term_forks
end

def quit_forks
puts "#{self.name} quitting: #{forks.keys}"
signal_processes(forks.keys, :QUIT)
end

Expand All @@ -151,6 +153,7 @@ def reap_terminated_forks
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
break unless pid

puts "#{self.name} reaping #{pid}, #{status}"
if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0)
handle_claimed_jobs_by(terminated_fork, status)
end
Expand Down
17 changes: 15 additions & 2 deletions test/integration/processes_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,27 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase

test "term supervisor exceeding timeout while there are jobs in-flight" do
no_pause = enqueue_store_result_job("no pause")
puts "no_pause job | id=#{get_job(no_pause).id}"
puts "no_pause statuses #{get_job(no_pause).statuses}"
pause = enqueue_store_result_job("pause", pause: SolidQueue.shutdown_timeout + 10.second)
puts "pause job | id=#{get_job(pause).id}"
puts "pause statuses #{get_job(pause).statuses}"

wait_while_with_timeout(1.second) { SolidQueue::ReadyExecution.count > 0 }

puts "Signaling TERM"
signal_process(@pid, :TERM, wait: 0.5)

sleep(SolidQueue.shutdown_timeout + 0.5.second)
sleep(SolidQueue.shutdown_timeout + 0.1.second)

assert_completed_job_results("no pause")
puts "no_pause statuses #{get_job(no_pause).statuses}"
assert_job_status(no_pause, :finished)

# This job was left claimed as the worker was shutdown without
# a chance to terminate orderly
assert_started_job_result("pause")
puts "pause statuses #{get_job(pause).statuses}"
assert_job_status(pause, :claimed)

# The process running the long job couldn't deregister, the other did
Expand Down Expand Up @@ -300,6 +307,12 @@ def enqueue_store_result_job(value, queue_name = :background, **options)
StoreResultJob.set(queue: queue_name).perform_later(value, **options)
end

def get_job(active_job)
skip_active_record_query_cache do
SolidQueue::Job.find_by(active_job_id: active_job.job_id)
end
end

def assert_completed_job_results(value, queue_name = :background, count = 1)
skip_active_record_query_cache do
assert_equal count, JobResult.where(queue_name: queue_name, status: "completed", value: value).count
Expand All @@ -320,7 +333,7 @@ def assert_job_status(active_job, status)
# might have been deleted in the forked processes.
skip_active_record_query_cache do
job = SolidQueue::Job.find_by(active_job_id: active_job.job_id)
assert job.public_send("#{status}?")
assert_equal status, job.status
end
end
end
15 changes: 15 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ def write(...)
Logger::LogDevice.prepend(BlockLogDeviceTimeoutExceptions)
class ExpectedTestError < RuntimeError; end

ActiveSupport::Notifications.subscribe('register_process.solid_queue') do |event|
puts "#{event.name} | #{event.duration} | #{event.payload}"
end
ActiveSupport::Notifications.subscribe('start_process.solid_queue') do |event|
puts "#{event.name} | #{event.duration} | #{event.payload[:process].name} | id=#{event.payload[:process].instance_variable_get(:@process).id},pid=#{event.payload[:process].pid}"
end
ActiveSupport::Notifications.subscribe('claim.solid_queue') do |event|
puts "#{event.name} | #{event.duration} | #{event.payload}"
end
# ActiveSupport::Notifications.subscribe(/solid_queue$/) do |event|
# puts "#{event.name}?"
# end
ActiveRecord::Base.logger = Logger.new STDOUT
SolidQueue.silence_polling = false


class ActiveSupport::TestCase
include ConfigurationTestHelper, ProcessesTestHelper, JobsTestHelper
Expand Down
Loading