Skip to content

Commit 9fb89f1

Browse files
committed
Fail in-progress jobs when the worker running them exits abnormally
This applies to: - Killed workers that the supervisor detects as dead. - Reaped workers without a clear exit status. - Orphaned executions that somehow lost their worker. - Workers whose heartbeat expired.
1 parent cb5669f commit 9fb89f1

File tree

14 files changed

+171
-35
lines changed

14 files changed

+171
-35
lines changed

UPGRADING.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,28 @@
1+
# Upgrading to version 0.6.x
2+
3+
## New migration in 3 steps
4+
This version adds a new migration to the `solid_queue_processes` table. This migration adds a new column that needs to be `NOT NULL`. It will run in three steps:
5+
1. Add the new column, nullable
6+
2. Backfill existing rows that would have the column as NULL
7+
3. Make the column not nullable and add a new index
8+
9+
To install it:
10+
```bash
11+
$ bin/rails solid_queue:install:migrations
12+
```
13+
14+
Or, if you're using a different database for Solid Queue:
15+
16+
```bash
17+
$ bin/rails solid_queue:install:migrations DATABASE=<the_name_of_your_solid_queue_db>
18+
```
19+
20+
And then just run it.
21+
22+
## New behaviour when workers are killed
23+
From this version onwards, when a worker is killed and the supervisor can detect that, it'll fail in-progress jobs claimed by that worker. For this to work correctly, you need to run the above migration and ensure you restart any supervisors you'd have.
24+
25+
126
# Upgrading to version 0.5.x
227
This version includes a new migration to improve recurring tasks. To install it, just run:
328

app/models/solid_queue/claimed_execution.rb

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ def release_all
3535
end
3636
end
3737

38+
def fail_all_with(error)
39+
SolidQueue.instrument(:fail_many_claimed) do |payload|
40+
includes(:job).tap do |executions|
41+
payload[:size] = executions.size
42+
payload[:process_ids] = executions.map(&:process_id).uniq
43+
payload[:job_ids] = executions.map(&:job_id).uniq
44+
45+
executions.each { |execution| execution.failed_with(error) }
46+
end
47+
end
48+
end
49+
3850
def discard_all_in_batches(*)
3951
raise UndiscardableError, "Can't discard jobs in progress"
4052
end
@@ -69,6 +81,13 @@ def discard
6981
raise UndiscardableError, "Can't discard a job in progress"
7082
end
7183

84+
def failed_with(error)
85+
transaction do
86+
job.failed_with(error)
87+
destroy!
88+
end
89+
end
90+
7291
private
7392
def execute
7493
ActiveJob::Base.execute(job.arguments)
@@ -83,11 +102,4 @@ def finished
83102
destroy!
84103
end
85104
end
86-
87-
def failed_with(error)
88-
transaction do
89-
job.failed_with(error)
90-
destroy!
91-
end
92-
end
93105
end

app/models/solid_queue/process.rb

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ def self.register(**attributes)
1313
create!(attributes.merge(last_heartbeat_at: Time.current)).tap do |process|
1414
payload[:process_id] = process.id
1515
end
16+
rescue Exception => error
17+
payload[:error] = error
18+
raise
1619
end
17-
rescue Exception => error
18-
SolidQueue.instrument :register_process, **attributes.merge(error: error)
19-
raise
2020
end
2121

2222
def heartbeat
@@ -25,8 +25,6 @@ def heartbeat
2525

2626
def deregister(pruned: false)
2727
SolidQueue.instrument :deregister_process, process: self, pruned: pruned do |payload|
28-
payload[:claimed_size] = claimed_executions.size if claims_executions?
29-
3028
destroy!
3129
rescue Exception => error
3230
payload[:error] = error

app/models/solid_queue/process/executor.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ module Executor
1111
after_destroy -> { claimed_executions.release_all }, if: :claims_executions?
1212
end
1313

14+
def fail_all_claimed_executions_with(error)
15+
if claims_executions?
16+
claimed_executions.fail_all_with(error)
17+
end
18+
end
19+
1420
private
1521
def claims_executions?
1622
kind == "Worker"

app/models/solid_queue/process/prunable.rb

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
# frozen_string_literal: true
22

33
module SolidQueue
4+
class ProcessPrunedError < RuntimeError
5+
def initialize(last_heartbeat_at)
6+
super("Process was found dead and pruned (last heartbeat at: #{last_heartbeat_at}")
7+
end
8+
end
9+
410
class Process
511
module Prunable
612
extend ActiveSupport::Concern
@@ -15,11 +21,18 @@ def prune
1521
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
1622
payload[:size] += batch.size
1723

18-
batch.each { |process| process.deregister(pruned: true) }
24+
batch.each(&:prune)
1925
end
2026
end
2127
end
2228
end
29+
30+
def prune
31+
error = ProcessPrunedError.new(last_heartbeat_at)
32+
fail_all_claimed_executions_with(error)
33+
34+
deregister(pruned: true)
35+
end
2336
end
2437
end
2538
end

lib/solid_queue/log_subscriber.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@ def claim(event)
1212
end
1313

1414
def release_many_claimed(event)
15-
debug formatted_event(event, action: "Release claimed jobs", **event.payload.slice(:size))
15+
info formatted_event(event, action: "Release claimed jobs", **event.payload.slice(:size))
16+
end
17+
18+
def fail_many_claimed(event)
19+
warn formatted_event(event, action: "Fail claimed jobs", **event.payload.slice(:job_ids, :process_ids))
1620
end
1721

1822
def release_claimed(event)
19-
debug formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id))
23+
info formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id))
2024
end
2125

2226
def retry_all(event)

lib/solid_queue/supervisor/fork_supervisor.rb

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,18 @@
11
# frozen_string_literal: true
22

33
module SolidQueue
4+
class ProcessExitError < RuntimeError
5+
def initialize(status)
6+
message = case
7+
when status.exitstatus.present? then "Process pid=#{status.pid} exited with status #{status.exitstatus}"
8+
when status.signaled? then "Process pid=#{status.pid} received unhandled signal #{status.termsig}"
9+
else "Process pid=#{status.pid} exited unexpectedly"
10+
end
11+
12+
super(message)
13+
end
14+
end
15+
416
class Supervisor::ForkSupervisor < Supervisor
517
include Signals, Pidfiled
618

@@ -91,7 +103,10 @@ def reap_terminated_forks
91103
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
92104
break unless pid
93105

94-
forks.delete(pid)
106+
if (terminated_fork = forks.delete(pid)) && !status.exited? || status.exitstatus > 0
107+
handle_claimed_jobs_by(terminated_fork, status)
108+
end
109+
95110
configured_processes.delete(pid)
96111
end
97112
rescue SystemCallError
@@ -100,14 +115,22 @@ def reap_terminated_forks
100115

101116
def replace_fork(pid, status)
102117
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
103-
if supervised_fork = forks.delete(pid)
104-
payload[:fork] = supervised_fork
118+
if terminated_fork = forks.delete(pid)
119+
payload[:fork] = terminated_fork
120+
handle_claimed_jobs_by(terminated_fork, status)
105121

106122
start_process(configured_processes.delete(pid))
107123
end
108124
end
109125
end
110126

127+
def handle_claimed_jobs_by(terminated_fork, status)
128+
if registered_process = process.supervisees.find_by(name: terminated_fork.name)
129+
error = ProcessExitError.new(status)
130+
registered_process.fail_all_claimed_executions_with(error)
131+
end
132+
end
133+
111134
def all_forks_terminated?
112135
forks.empty?
113136
end

lib/solid_queue/supervisor/maintenance.rb

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
module SolidQueue
2+
class ProcessMissingError < RuntimeError
3+
def initialize
4+
super("The process that was running this job no longer exists")
5+
end
6+
end
7+
28
module Supervisor::Maintenance
39
extend ActiveSupport::Concern
410

511
included do
6-
after_boot :release_orphaned_executions
12+
after_boot :fail_orphaned_executions
713
end
814

915
private
@@ -27,8 +33,10 @@ def prune_dead_processes
2733
wrap_in_app_executor { SolidQueue::Process.prune }
2834
end
2935

30-
def release_orphaned_executions
31-
wrap_in_app_executor { SolidQueue::ClaimedExecution.orphaned.release_all }
36+
def fail_orphaned_executions
37+
wrap_in_app_executor do
38+
SolidQueue::ClaimedExecution.orphaned.fail_all_with(ProcessMissingError.new)
39+
end
3240
end
3341
end
3442
end

test/integration/forked_processes_lifecycle_test.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,11 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
190190
assert process_exists?(@pid)
191191
terminate_process(@pid)
192192

193+
# Since the worker exited abnormally, the jobs it had claimed would be failed now
194+
[ exit_job, pause_job ].each do |job|
195+
assert_job_status(job, :failed)
196+
end
197+
193198
assert_clean_termination
194199
end
195200

@@ -240,10 +245,10 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
240245
# And there's a new worker that has been registered for the background queue
241246
wait_for_registered_processes(4, timeout: 5.second)
242247

243-
# The job in the background queue was left claimed as the worker couldn't
244-
# finish orderly
248+
# The job in the background queue would be failed by the supervisor
249+
# when it replaced the killed worker
245250
assert_started_job_result("killed_pause")
246-
assert_job_status(killed_pause, :claimed)
251+
assert_job_status(killed_pause, :failed)
247252
# The other one could finish
248253
assert_completed_job_results("pause", :default)
249254

test/integration/instrumentation_test.rb

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class InstrumentationTest < ActiveSupport::TestCase
8989
assert_equal 2, events.size
9090
register_event, deregister_event = events
9191
assert_event register_event, "register_process", kind: "Worker", pid: ::Process.pid, process_id: process.id
92-
assert_event deregister_event, "deregister_process", process: process, pruned: false, claimed_size: 1
92+
assert_event deregister_event, "deregister_process", process: process, pruned: false
9393
end
9494

9595
test "starting and stopping a dispatcher emits register_process and deregister_process events" do
@@ -112,11 +112,7 @@ class InstrumentationTest < ActiveSupport::TestCase
112112
end
113113

114114
test "pruning processes emit prune_processes and deregister_process events" do
115-
time = Time.now
116-
processes = 3.times.collect { |i| SolidQueue::Process.create!(kind: "Worker", supervisor_id: 42, pid: 10 + i, hostname: "localhost", last_heartbeat_at: time, name: "worker-123#{i}") }
117-
118-
# Heartbeats will expire
119-
travel_to 3.days.from_now
115+
processes = 3.times.collect { |i| SolidQueue::Process.create!(kind: "Worker", supervisor_id: 42, pid: 10 + i, hostname: "localhost", last_heartbeat_at: 3.days.ago, name: "worker-123#{i}") }
120116

121117
events = subscribed(/.*process.*\.solid_queue/) do
122118
SolidQueue::Process.prune
@@ -129,10 +125,26 @@ class InstrumentationTest < ActiveSupport::TestCase
129125

130126
assert_event prune_event, "prune_processes", size: 3
131127
deregister_events.each_with_index do |event, i|
132-
assert_event event, "deregister_process", process: processes[i], pruned: true, claimed_size: 0
128+
assert_event event, "deregister_process", process: processes[i], pruned: true
133129
end
134130
end
135131

132+
test "pruning a process with claimed executions emits fail_many_claimed event" do
133+
process = SolidQueue::Process.create!(kind: "Worker", supervisor_id: 42, pid: 10, last_heartbeat_at: 3.days.ago, name: "worker-123")
134+
135+
3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) }
136+
jobs = SolidQueue::Job.last(3)
137+
138+
SolidQueue::ReadyExecution.claim("*", 5, process.id)
139+
140+
events = subscribed(/fail.*_claimed\.solid_queue/) do
141+
SolidQueue::Process.prune
142+
end
143+
144+
assert_equal 1, events.count
145+
assert_event events.first, "fail_many_claimed", process_ids: [ process.id ], job_ids: jobs.map(&:id), size: 3
146+
end
147+
136148
test "errors when deregistering processes are included in deregister_process events" do
137149
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false
138150
error = RuntimeError.new("everything is broken")

0 commit comments

Comments
 (0)