Skip to content

Commit f588105

Browse files
committed
DRY a bit releasing claimed executions by both supervisors
1 parent dce990d commit f588105

File tree

5 files changed

+43
-56
lines changed

5 files changed

+43
-56
lines changed

lib/solid_queue/async_supervisor.rb

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,43 +3,35 @@
33
module SolidQueue
44
class AsyncSupervisor < Supervisor
55
private
6-
def check_and_replace_terminated_processes
7-
terminated_threads = process_instances.select { |thread_id, instance| !instance.alive? }
8-
terminated_threads.each { |thread_id, instance| replace_thread(thread_id, instance) }
9-
end
106

11-
def replace_thread(thread_id, instance)
12-
SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload|
13-
payload[:thread] = instance
14-
handle_claimed_jobs_by(terminated_instance, thread)
7+
def check_and_replace_terminated_processes
8+
terminated_threads = process_instances.select { |thread_id, instance| !instance.alive? }
9+
terminated_threads.each { |thread_id, instance| replace_thread(thread_id, instance) }
10+
end
1511

16-
start_process(configured_processes.delete(thread_id))
17-
end
18-
end
12+
def replace_thread(thread_id, instance)
13+
SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload|
14+
payload[:thread] = instance
1915

20-
def perform_graceful_termination
21-
process_instances.values.each(&:stop)
16+
error = Processes::ThreadTerminatedError.new(terminated_instance.name)
17+
release_claimed_jobs_by(terminated_instance, with_error: error)
2218

23-
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? })
19+
start_process(configured_processes.delete(thread_id))
2420
end
21+
end
2522

26-
def perform_immediate_termination
27-
exit!
28-
end
23+
def perform_graceful_termination
24+
process_instances.values.each(&:stop)
2925

30-
def all_processes_terminated?
31-
process_instances.values.none?(&:alive?)
32-
end
26+
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? })
27+
end
3328

34-
# When a supervised thread terminates unexpectedly, mark all executions
35-
# it had claimed as failed so they can be retried by another worker.
36-
def handle_claimed_jobs_by(terminated_instance, thread)
37-
wrap_in_app_executor do
38-
if registered_process = SolidQueue::Process.find_by(name: terminated_instance.name)
39-
error = Processes::ThreadTerminatedError.new(terminated_instance.name)
40-
registered_process.fail_all_claimed_executions_with(error)
41-
end
42-
end
43-
end
29+
def perform_immediate_termination
30+
exit!
31+
end
32+
33+
def all_processes_terminated?
34+
process_instances.values.none?(&:alive?)
35+
end
4436
end
4537
end

lib/solid_queue/fork_supervisor.rb

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ def reap_terminated_forks
3939
break unless pid
4040

4141
if (terminated_fork = process_instances.delete(pid)) && !status.exited? || status.exitstatus > 0
42-
handle_claimed_jobs_by(terminated_fork, status)
42+
error = Processes::ProcessExitError.new(status)
43+
release_claimed_jobs_by(terminated_fork, with_error: error)
4344
end
4445

4546
configured_processes.delete(pid)
@@ -52,25 +53,14 @@ def replace_fork(pid, status)
5253
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
5354
if terminated_fork = process_instances.delete(pid)
5455
payload[:fork] = terminated_fork
55-
handle_claimed_jobs_by(terminated_fork, status)
56+
error = Processes::ProcessExitError.new(status)
57+
release_claimed_jobs_by(terminated_fork, with_error: error)
5658

5759
start_process(configured_processes.delete(pid))
5860
end
5961
end
6062
end
6163

62-
# When a supervised fork crashes or exits we need to mark all the
63-
# executions it had claimed as failed so that they can be retried
64-
# by some other worker.
65-
def handle_claimed_jobs_by(terminated_fork, status)
66-
wrap_in_app_executor do
67-
if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name)
68-
error = Processes::ProcessExitError.new(status)
69-
registered_process.fail_all_claimed_executions_with(error)
70-
end
71-
end
72-
end
73-
7464
def all_processes_terminated?
7565
process_instances.empty?
7666
end

lib/solid_queue/supervisor.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,4 +144,4 @@ def sync_std_streams
144144
STDOUT.sync = STDERR.sync = true
145145
end
146146
end
147-
end
147+
end

lib/solid_queue/supervisor/maintenance.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,16 @@ def fail_orphaned_executions
3232
ClaimedExecution.orphaned.fail_all_with(Processes::ProcessMissingError.new)
3333
end
3434
end
35+
36+
# When a supervised process crashes or exits we need to mark all the
37+
# executions it had claimed as failed so that they can be retried
38+
# by some other worker.
39+
def release_claimed_jobs_by(terminated_process, with_error:)
40+
wrap_in_app_executor do
41+
if registered_process = SolidQueue::Process.find_by(name: terminated_process.name)
42+
registered_process.fail_all_claimed_executions_with(with_error)
43+
end
44+
end
45+
end
3546
end
3647
end

test/unit/fork_supervisor_test.rb

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,8 @@ class ForkSupervisorTest < ActiveSupport::TestCase
186186
end
187187

188188
# Regression test for supervisor failing to handle claimed jobs when its own
189-
# process record has been pruned (NoMethodError in #handle_claimed_jobs_by).
190-
test "handle_claimed_jobs_by fails claimed executions even if supervisor record is missing" do
189+
# process record has been pruned (NoMethodError in #release_claimed_jobs_by).
190+
test "release_claimed_jobs_by fails claimed executions even if supervisor record is missing" do
191191
worker_name = "worker-test-#{SecureRandom.hex(4)}"
192192

193193
worker_process = SolidQueue::Process.register(kind: "Worker", pid: 999_999, name: worker_name)
@@ -196,20 +196,14 @@ class ForkSupervisorTest < ActiveSupport::TestCase
196196
claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process.id).first
197197

198198
terminated_fork = Struct.new(:name).new(worker_name)
199-
200-
DummyStatus = Struct.new(:pid, :exitstatus) do
201-
def signaled? = false
202-
def termsig = nil
203-
end
204-
status = DummyStatus.new(worker_process.pid, 1)
205-
206199
supervisor = SolidQueue::ForkSupervisor.allocate
200+
error = RuntimeError.new
207201

208-
supervisor.send(:handle_claimed_jobs_by, terminated_fork, status)
202+
supervisor.send(:release_claimed_jobs_by, terminated_fork, with_error: error)
209203

210204
failed = SolidQueue::FailedExecution.find_by(job_id: claimed_execution.job_id)
211205
assert failed.present?
212-
assert_equal "SolidQueue::Processes::ProcessExitError", failed.exception_class
206+
assert_equal "RuntimeError", failed.exception_class
213207
end
214208

215209
private

0 commit comments

Comments
 (0)