Skip to content

Commit 5d722f8

Browse files
committed
Release any orphaned claimed executions as part of the process prune
Orphaned executions shouldn't happen as part of regular operation but some people have reported them. The only way I can think of this happening could be that a worker gets deregistered while still working normally, so that it happens to be claiming executions while the process record is deleted. This means that the callback to release claimed executions might not see yet the executions that are being claimed at the same time, and as such, it won't release any. Or, in other words, those executions would get claimed by a process that's getting deleted and that once they're committed to the DB, it no longer exists. To recover from this scenario, as well as checking for processes with an expired heartbeat, the supervisor will also check for orphaned executions and release them.
1 parent 6d10767 commit 5d722f8

File tree

4 files changed

+44
-8
lines changed

4 files changed

+44
-8
lines changed

app/models/solid_queue/claimed_execution.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
class SolidQueue::ClaimedExecution < SolidQueue::Execution
44
belongs_to :process
55

6+
scope :orphaned, -> { where.missing(:process) }
7+
68
class Result < Struct.new(:success, :error)
79
def success?
810
success

lib/solid_queue/supervisor.rb

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ def start
2222
run_callbacks(:boot) { boot }
2323

2424
start_forks
25-
launch_process_prune
25+
launch_maintenance_task
26+
2627
supervise
2728
rescue Processes::GracefulTerminationRequested
2829
graceful_termination
@@ -65,9 +66,12 @@ def start_forks
6566
configured_processes.each { |configured_process| start_fork(configured_process) }
6667
end
6768

68-
def launch_process_prune
69-
@prune_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) { prune_dead_processes }
70-
@prune_task.execute
69+
def launch_maintenance_task
70+
@maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do
71+
prune_dead_processes
72+
release_orphaned_executions
73+
end
74+
@maintenance_task.execute
7175
end
7276

7377
def shutdown
@@ -106,7 +110,7 @@ def quit_forks
106110
end
107111

108112
def stop_process_prune
109-
@prune_task&.shutdown
113+
@maintenance_task&.shutdown
110114
end
111115

112116
def delete_pidfile
@@ -117,6 +121,10 @@ def prune_dead_processes
117121
wrap_in_app_executor { SolidQueue::Process.prune }
118122
end
119123

124+
def release_orphaned_executions
125+
wrap_in_app_executor { SolidQueue::ClaimedExecution.orphaned.release_all }
126+
end
127+
120128
def start_fork(configured_process)
121129
configured_process.supervised_by process
122130

test/integration/processes_lifecycle_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
2020
end
2121

2222
test "enqueue jobs in multiple queues" do
23-
6.times.map { |i| enqueue_store_result_job("job_#{i}") }
24-
6.times.map { |i| enqueue_store_result_job("job_#{i}", :default) }
23+
6.times { |i| enqueue_store_result_job("job_#{i}") }
24+
6.times { |i| enqueue_store_result_job("job_#{i}", :default) }
2525

2626
wait_for_jobs_to_finish_for(0.5.seconds)
2727

test/unit/supervisor_test.rb

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class SupervisorTest < ActiveSupport::TestCase
6767
wait_for_process_termination_with_timeout(pid, exitstatus: 1)
6868
end
6969

70-
test "deletes previous pidfile if the owner is dead" do
70+
test "delete previous pidfile if the owner is dead" do
7171
pid = run_supervisor_as_fork(mode: :all)
7272
wait_for_registered_processes(4)
7373

@@ -87,6 +87,32 @@ class SupervisorTest < ActiveSupport::TestCase
8787
terminate_process(pid)
8888
end
8989

90+
test "release orphaned executions" do
91+
3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) }
92+
process = SolidQueue::Process.register(kind: "Worker", pid: 42)
93+
94+
SolidQueue::ReadyExecution.claim("*", 5, process.id)
95+
96+
assert_equal 3, SolidQueue::ClaimedExecution.count
97+
assert_equal 0, SolidQueue::ReadyExecution.count
98+
99+
assert_equal [ process.id ], SolidQueue::ClaimedExecution.last(3).pluck(:process_id).uniq
100+
101+
# Simnulate orphaned executions by just wiping the claiming process
102+
process.delete
103+
104+
pid = run_supervisor_as_fork
105+
wait_for_registered_processes(4)
106+
assert_registered_supervisor(pid)
107+
108+
terminate_process(pid)
109+
110+
skip_active_record_query_cache do
111+
assert_equal 0, SolidQueue::ClaimedExecution.count
112+
assert_equal 3, SolidQueue::ReadyExecution.count
113+
end
114+
end
115+
90116
private
91117
def assert_registered_workers(count, supervisor_pid:, **metadata)
92118
skip_active_record_query_cache do

0 commit comments

Comments
 (0)