Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/solid_queue/async_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ def replace_thread(thread_id, instance)
SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload|
payload[:thread] = instance

error = Processes::ThreadTerminatedError.new(terminated_instance.name)
release_claimed_jobs_by(terminated_instance, with_error: error)
error = Processes::ThreadTerminatedError.new(instance.name)
release_claimed_jobs_by(instance, with_error: error)

start_process(configured_processes.delete(thread_id))
end
Expand Down
38 changes: 24 additions & 14 deletions test/integration/async_processes_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ class AsyncProcessesLifecycleTest < ActiveSupport::TestCase
no_pause = enqueue_store_result_job("no pause")
pause = enqueue_store_result_job("pause", pause: 3.second)

signal_process(@pid, :KILL, wait: 0.2.seconds)
wait_for_jobs_to_finish_for(2.seconds)
# Wait for the "no pause" job to complete before sending KILL
wait_for_jobs_to_finish_for(2.seconds, except: pause)

signal_process(@pid, :KILL, wait: 0.1.seconds)
wait_for_registered_processes(1, timeout: 2.second)

assert_not process_exists?(@pid)
Expand Down Expand Up @@ -123,27 +125,35 @@ class AsyncProcessesLifecycleTest < ActiveSupport::TestCase
no_pause = enqueue_store_result_job("no pause")
pause = enqueue_store_result_job("pause", pause: SolidQueue.shutdown_timeout + 10.second)

wait_while_with_timeout(1.second) { SolidQueue::ReadyExecution.count > 1 }
# Wait for the "no pause" job to complete and the pause job to be claimed.
# This ensures the pause job is actively being processed.
wait_for_jobs_to_finish_for(3.seconds, except: pause)
wait_for(timeout: 2.seconds) { SolidQueue::ClaimedExecution.exists?(job_id: SolidQueue::Job.find_by(active_job_id: pause.job_id)&.id) }

signal_process(@pid, :TERM, wait: 0.5.second)
signal_process(@pid, :TERM, wait: 0.2.second)
wait_for_jobs_to_finish_for(2.seconds, except: pause)

# exit! exits with status 1 by default
wait_for_process_termination_with_timeout(@pid, timeout: SolidQueue.shutdown_timeout + 5.seconds, exitstatus: 1)
# Wait for process to terminate. In async mode, shutdown_timeout is used by both
# the supervisor and workers, creating a race: exit status may be 0 (graceful) or
# 1 (exit!) depending on which timeout check happens first.
wait_for_process_termination_with_timeout(@pid, timeout: SolidQueue.shutdown_timeout + 5.seconds, exitstatus: nil)
assert_not process_exists?(@pid)

assert_completed_job_results("no pause")
assert_job_status(no_pause, :finished)

# When timeout is exceeded, exit! is called without cleanup.
# The in-flight job stays claimed and processes stay registered.
# A future supervisor will need to prune and fail these orphaned executions.
# The pause job should have started but not completed
assert_started_job_result("pause")
assert_job_status(pause, :claimed)

assert_registered_supervisor
assert find_processes_registered_as("Worker").any? { |w| w.metadata["queues"].include?("background") }
assert_claimed_jobs
assert_not_equal "completed", skip_active_record_query_cache { JobResult.find_by(value: "pause")&.status }

# After shutdown, the pause job may be either:
# - claimed (exit! called, no cleanup) OR
# - ready (graceful exit, job released back to queue)
# Both are valid outcomes depending on the timing race between supervisor and worker timeouts.
skip_active_record_query_cache do
job = SolidQueue::Job.find_by(active_job_id: pause.job_id)
assert job.claimed? || job.ready?, "Expected pause job to be claimed or ready, but was neither"
end
end

test "process some jobs that raise errors" do
Expand Down
9 changes: 5 additions & 4 deletions test/integration/forked_processes_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase

test "quit supervisor while there are jobs in-flight" do
no_pause = enqueue_store_result_job("no pause")
pause = enqueue_store_result_job("pause", pause: 1.second)
# long enough pause to make sure it doesn't finish
pause = enqueue_store_result_job("pause", pause: 60.second)

wait_while_with_timeout(1.second) { SolidQueue::ReadyExecution.count > 0 }

Expand All @@ -87,7 +88,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase

test "term supervisor while there are jobs in-flight" do
no_pause = enqueue_store_result_job("no pause")
pause = enqueue_store_result_job("pause", pause: 0.2.seconds)
pause = enqueue_store_result_job("pause", pause: 1.second)

signal_process(@pid, :TERM, wait: 0.3.second)
wait_for_jobs_to_finish_for(3.seconds)
Expand All @@ -104,10 +105,10 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase

test "int supervisor while there are jobs in-flight" do
no_pause = enqueue_store_result_job("no pause")
pause = enqueue_store_result_job("pause", pause: 0.2.seconds)
pause = enqueue_store_result_job("pause", pause: 1.second)

signal_process(@pid, :INT, wait: 0.3.second)
wait_for_jobs_to_finish_for(2.second)
wait_for_jobs_to_finish_for(3.second)

assert_completed_job_results("no pause")
assert_completed_job_results("pause")
Expand Down
4 changes: 2 additions & 2 deletions test/integration/instrumentation_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ class InstrumentationTest < ActiveSupport::TestCase

events = subscribed("enqueue_recurring_task.solid_queue") do
scheduler.start
sleep(1.01)
sleep(1.5)
scheduler.stop
end

Expand All @@ -375,7 +375,7 @@ class InstrumentationTest < ActiveSupport::TestCase

events = subscribed("enqueue_recurring_task.solid_queue") do
scheduler.start
sleep(1.01)
sleep(1.5)
scheduler.stop
end

Expand Down
3 changes: 2 additions & 1 deletion test/integration/jobs_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
require "test_helper"

class JobsLifecycleTest < ActiveSupport::TestCase
self.use_transactional_tests = false

setup do
@_on_thread_error = SolidQueue.on_thread_error
SolidQueue.on_thread_error = silent_on_thread_error_for([ ExpectedTestError, RaisingJob::DefaultError ], @_on_thread_error)
Expand Down Expand Up @@ -34,7 +36,6 @@ class JobsLifecycleTest < ActiveSupport::TestCase
test "enqueue and run jobs that fail without retries" do
RaisingJob.perform_later(ExpectedTestError, "A")
RaisingJob.perform_later(ExpectedTestError, "B")
jobs = SolidQueue::Job.last(2)

@dispatcher.start
@worker.start
Expand Down
11 changes: 5 additions & 6 deletions test/integration/puma/plugin_testing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,27 @@ module PluginTesting
end
end

wait_for_registered_processes(5, timeout: 3.second)
wait_for_registered_processes(5, timeout: 5.second)
end

teardown do
terminate_process(@pid, signal: :INT) if process_exists?(@pid)

wait_for_registered_processes 0, timeout: 2.seconds
wait_for_registered_processes(0, timeout: 5.seconds)
end
end

test "perform jobs inside puma's process" do
StoreResultJob.perform_later(:puma_plugin)

wait_for_jobs_to_finish_for(2.seconds)
wait_for_jobs_to_finish_for(5.seconds)
assert_equal 1, JobResult.where(queue_name: :background, status: "completed", value: :puma_plugin).count
end

test "stop the queue on puma's restart" do
signal_process(@pid, :SIGUSR2)
# Ensure the restart finishes before we try to continue with the test
wait_for_registered_processes(0, timeout: 3.second)
wait_for_registered_processes(5, timeout: 3.second)
wait_for_registered_processes(0, timeout: 5.second)
wait_for_registered_processes(5, timeout: 5.second)

StoreResultJob.perform_later(:puma_plugin)
wait_for_jobs_to_finish_for(2.seconds)
Expand Down
6 changes: 4 additions & 2 deletions test/integration/recurring_tasks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ class RecurringTasksTest < ActiveSupport::TestCase

another_task = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: [ 42 ] } }
scheduler1 = SolidQueue::Scheduler.new(recurring_tasks: another_task).tap(&:start)
wait_for_registered_processes(6, timeout: 1.second)
wait_for_registered_processes(6, timeout: 2.seconds)
wait_for { SolidQueue::RecurringTask.find_by(key: "example_task").present? }

assert_recurring_tasks another_task

updated_task = { example_task: { class: "AddToBufferJob", schedule: "every minute" } }
scheduler2 = SolidQueue::Scheduler.new(recurring_tasks: updated_task).tap(&:start)
wait_for_registered_processes(7, timeout: 1.second)
wait_for_registered_processes(7, timeout: 2.seconds)
wait_for { SolidQueue::RecurringTask.find_by(key: "example_task")&.schedule == "every minute" }

assert_recurring_tasks updated_task

Expand Down
2 changes: 2 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def destroy_records
SolidQueue::RecurringTask.delete_all
SolidQueue::ScheduledExecution.delete_all
SolidQueue::ReadyExecution.delete_all
SolidQueue::ClaimedExecution.delete_all
SolidQueue::FailedExecution.delete_all
JobResult.delete_all
end

Expand Down
2 changes: 1 addition & 1 deletion test/test_helpers/processes_test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0, s
if process_exists?(pid)
begin
status = Process.waitpid2(pid).last
assert_equal exitstatus, status.exitstatus, "Expected pid #{pid} to exit with status #{exitstatus}" if status.exitstatus
assert_equal exitstatus, status.exitstatus, "Expected pid #{pid} to exit with status #{exitstatus}" if status.exitstatus && !exitstatus.nil?
assert_equal signaled, Signal.list.key(status.termsig).to_sym, "Expected pid #{pid} to be terminated with signal #{signaled}" if status.termsig
rescue Errno::ECHILD
# Child pid already reaped
Expand Down
25 changes: 12 additions & 13 deletions test/unit/async_supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@ class AsyncSupervisorTest < ActiveSupport::TestCase

test "start as non-standalone" do
supervisor = run_supervisor_as_thread
wait_for_registered_processes(4)
wait_for_registered_processes(4, timeout: 3.seconds) # supervisor + dispatcher + 2 workers

assert_registered_processes(kind: "Supervisor(async)")
assert_registered_processes(kind: "Worker", supervisor_id: supervisor.process_id, count: 2)
assert_registered_processes(kind: "Dispatcher", supervisor_id: supervisor.process_id)

ensure
supervisor.stop

assert_no_registered_processes
end

test "start standalone" do
pid = run_supervisor_as_fork(mode: :async)
wait_for_registered_processes(4)
wait_for_registered_processes(4, timeout: 5.seconds) # supervisor + dispatcher + 2 workers

assert_registered_processes(kind: "Supervisor(async)")
assert_registered_processes(kind: "Worker", supervisor_pid: pid, count: 2)
Expand All @@ -29,15 +28,15 @@ class AsyncSupervisorTest < ActiveSupport::TestCase
end

test "start as non-standalone with provided configuration" do
supervisor = run_supervisor_as_thread(workers: [], dispatchers: [ { batch_size: 100 } ])
wait_for_registered_processes(2) # supervisor + dispatcher
supervisor = run_supervisor_as_thread(workers: [], dispatchers: [ { batch_size: 100 } ], skip_recurring: false)
wait_for_registered_processes(3, timeout: 3.seconds) # supervisor + dispatcher + scheduler

assert_registered_processes(kind: "Supervisor(async)")
assert_registered_processes(kind: "Worker", count: 0)
assert_registered_processes(kind: "Dispatcher", supervisor_id: supervisor.process_id)

assert_registered_processes(kind: "Scheduler", supervisor_id: supervisor.process_id)
ensure
supervisor.stop

assert_no_registered_processes
end

Expand All @@ -50,17 +49,17 @@ class AsyncSupervisorTest < ActiveSupport::TestCase
}

supervisor = run_supervisor_as_thread(**config)
wait_for_registered_processes(2) # supervisor + 1 worker
wait_for_registered_processes(2, timeout: 3.seconds) # supervisor + 1 worker
assert_registered_processes(kind: "Supervisor(async)")

wait_while_with_timeout(1.second) { SolidQueue::ClaimedExecution.count > 0 }

supervisor.stop

skip_active_record_query_cache do
assert_equal 0, SolidQueue::ClaimedExecution.count
assert_equal 3, SolidQueue::FailedExecution.count
end
ensure
supervisor.stop
end

test "failed orphaned executions as standalone" do
Expand All @@ -72,7 +71,7 @@ class AsyncSupervisorTest < ActiveSupport::TestCase
}

pid = run_supervisor_as_fork(mode: :async, **config)
wait_for_registered_processes(2) # supervisor + 1 worker
wait_for_registered_processes(2, timeout: 3.seconds) # supervisor + 1 worker
assert_registered_processes(kind: "Supervisor(async)")

wait_while_with_timeout(1.second) { SolidQueue::ClaimedExecution.count > 0 }
Expand All @@ -87,7 +86,7 @@ class AsyncSupervisorTest < ActiveSupport::TestCase

private
def run_supervisor_as_thread(**options)
SolidQueue::Supervisor.start(mode: :async, standalone: false, **options)
SolidQueue::Supervisor.start(mode: :async, standalone: false, **options.with_defaults(skip_recurring: true))
end

def simulate_orphaned_executions(count)
Expand Down
3 changes: 2 additions & 1 deletion test/unit/process_recovery_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class ProcessRecoveryTest < ActiveSupport::TestCase
supervisor_process = SolidQueue::Process.find_by(kind: "Supervisor(fork)", pid: @pid)
assert supervisor_process

worker_process = SolidQueue::Process.find_by(kind: "Worker")
# Find the worker supervised by this specific supervisor to avoid interference from other tests
worker_process = SolidQueue::Process.find_by(kind: "Worker", supervisor_id: supervisor_process.id)
assert worker_process

# Enqueue a job and wait for it to be claimed
Expand Down
14 changes: 9 additions & 5 deletions test/unit/scheduler_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ class SchedulerTest < ActiveSupport::TestCase
end

schedulers.each(&:start)
wait_while_with_timeout(2.5.seconds) { SolidQueue::RecurringExecution.count != 2 }
wait_while_with_timeout(3.seconds) { SolidQueue::RecurringExecution.count < 2 }
schedulers.each(&:stop)

assert_equal SolidQueue::Job.count, SolidQueue::RecurringExecution.count
run_at_times = SolidQueue::RecurringExecution.all.map(&:run_at).sort
0.upto(run_at_times.length - 2) do |i|
assert_equal 1, run_at_times[i + 1] - run_at_times[i]
skip_active_record_query_cache do
assert SolidQueue::RecurringExecution.count >= 2, "Expected at least 2 recurring executions, got #{SolidQueue::RecurringExecution.count}"
assert_equal SolidQueue::Job.count, SolidQueue::RecurringExecution.count
run_at_times = SolidQueue::RecurringExecution.all.map(&:run_at).sort
0.upto(run_at_times.length - 2) do |i|
time_diff = run_at_times[i + 1] - run_at_times[i]
assert_in_delta 1, time_diff, 0.001, "Expected run_at times to be 1 second apart, got #{time_diff}. All run_at times: #{run_at_times.inspect}"
end
end
end
end