Skip to content

Commit 89d30c7

Browse files
authored
Merge pull request #277 from rails/fail-jobs-when-worker-is-killed
Fail in-progress jobs when the worker running them exits abnormally
2 parents 7373b67 + 76d2c0f commit 89d30c7

28 files changed

+315
-113
lines changed

UPGRADING.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,32 @@
1+
# Upgrading to version 0.6.x
2+
3+
## New migration in 3 steps
4+
This version adds two new migrations to modify the `solid_queue_processes` table. The goal of that migration is to add a new column that needs to be `NOT NULL`. This needs to be done with two migrations and the following steps to ensure it happens without downtime and with new processes being able to register just fine:
5+
1. Run the first migration that adds the new column, nullable
6+
2. Deploy the updated Solid Queue code that uses this column
7+
2. Run the second migration. This migration does two things:
8+
- Backfill existing rows that would have the column as NULL
9+
- Make the column not nullable and add a new index
10+
11+
Besides, it adds another migration with no effects to the `solid_queue_recurring_tasks` table. This one can be run just fine whenever, as the column affected is not used.
12+
13+
To install the migrations:
14+
```bash
15+
$ bin/rails solid_queue:install:migrations
16+
```
17+
18+
Or, if you're using a different database for Solid Queue:
19+
20+
```bash
21+
$ bin/rails solid_queue:install:migrations DATABASE=<the_name_of_your_solid_queue_db>
22+
```
23+
24+
And then follow the steps above, running first one, then deploying the code, then running the second one.
25+
26+
## New behaviour when workers are killed
27+
From this version onwards, when a worker is killed and the supervisor can detect that, it'll fail in-progress jobs claimed by that worker. For this to work correctly, you need to run the above migration and ensure you restart any supervisors you'd have.
28+
29+
130
# Upgrading to version 0.5.x
231
This version includes a new migration to improve recurring tasks. To install it, just run:
332

app/models/solid_queue/claimed_execution.rb

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ def release_all
3535
end
3636
end
3737

38+
def fail_all_with(error)
39+
SolidQueue.instrument(:fail_many_claimed) do |payload|
40+
includes(:job).tap do |executions|
41+
payload[:size] = executions.size
42+
payload[:process_ids] = executions.map(&:process_id).uniq
43+
payload[:job_ids] = executions.map(&:job_id).uniq
44+
45+
executions.each { |execution| execution.failed_with(error) }
46+
end
47+
end
48+
end
49+
3850
def discard_all_in_batches(*)
3951
raise UndiscardableError, "Can't discard jobs in progress"
4052
end
@@ -69,6 +81,13 @@ def discard
6981
raise UndiscardableError, "Can't discard a job in progress"
7082
end
7183

84+
def failed_with(error)
85+
transaction do
86+
job.failed_with(error)
87+
destroy!
88+
end
89+
end
90+
7291
private
7392
def execute
7493
ActiveJob::Base.execute(job.arguments)
@@ -83,11 +102,4 @@ def finished
83102
destroy!
84103
end
85104
end
86-
87-
def failed_with(error)
88-
transaction do
89-
job.failed_with(error)
90-
destroy!
91-
end
92-
end
93105
end

app/models/solid_queue/process.rb

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ def self.register(**attributes)
1313
create!(attributes.merge(last_heartbeat_at: Time.current)).tap do |process|
1414
payload[:process_id] = process.id
1515
end
16+
rescue Exception => error
17+
payload[:error] = error
18+
raise
1619
end
17-
rescue Exception => error
18-
SolidQueue.instrument :register_process, **attributes.merge(error: error)
19-
raise
2020
end
2121

2222
def heartbeat
@@ -25,8 +25,6 @@ def heartbeat
2525

2626
def deregister(pruned: false)
2727
SolidQueue.instrument :deregister_process, process: self, pruned: pruned do |payload|
28-
payload[:claimed_size] = claimed_executions.size if claims_executions?
29-
3028
destroy!
3129
rescue Exception => error
3230
payload[:error] = error

app/models/solid_queue/process/executor.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ module Executor
1111
after_destroy -> { claimed_executions.release_all }, if: :claims_executions?
1212
end
1313

14+
def fail_all_claimed_executions_with(error)
15+
if claims_executions?
16+
claimed_executions.fail_all_with(error)
17+
end
18+
end
19+
1420
private
1521
def claims_executions?
1622
kind == "Worker"

app/models/solid_queue/process/prunable.rb

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
# frozen_string_literal: true
22

33
module SolidQueue
4+
class ProcessPrunedError < RuntimeError
5+
def initialize(last_heartbeat_at)
6+
super("Process was found dead and pruned (last heartbeat at: #{last_heartbeat_at}")
7+
end
8+
end
9+
410
class Process
511
module Prunable
612
extend ActiveSupport::Concern
@@ -15,11 +21,18 @@ def prune
1521
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
1622
payload[:size] += batch.size
1723

18-
batch.each { |process| process.deregister(pruned: true) }
24+
batch.each(&:prune)
1925
end
2026
end
2127
end
2228
end
29+
30+
def prune
31+
error = ProcessPrunedError.new(last_heartbeat_at)
32+
fail_all_claimed_executions_with(error)
33+
34+
deregister(pruned: true)
35+
end
2336
end
2437
end
2538
end
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
class AddNameToProcesses < ActiveRecord::Migration[7.1]
2+
def change
3+
add_column :solid_queue_processes, :name, :string
4+
end
5+
end
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
class MakeNameNotNull < ActiveRecord::Migration[7.1]
2+
def up
3+
SolidQueue::Process.where(name: nil).find_each do |process|
4+
process.name ||= [ process.kind.downcase, SecureRandom.hex(10) ].join("-")
5+
process.save!
6+
end
7+
8+
change_column :solid_queue_processes, :name, :string, null: false
9+
add_index :solid_queue_processes, [ :name, :supervisor_id ], unique: true
10+
end
11+
12+
def down
13+
remove_index :solid_queue_processes, [ :name, :supervisor_id ]
14+
change_column :solid_queue_processes, :name, :string, null: false
15+
end
16+
end

lib/solid_queue/configuration.rb

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
module SolidQueue
44
class Configuration
5+
class Process < Struct.new(:kind, :attributes)
6+
def instantiate
7+
"SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes)
8+
end
9+
end
10+
511
WORKER_DEFAULTS = {
612
queues: "*",
713
threads: 3,
@@ -22,28 +28,10 @@ def initialize(mode: :fork, load_from: nil)
2228
@raw_config = config_from(load_from)
2329
end
2430

25-
def processes
31+
def configured_processes
2632
dispatchers + workers
2733
end
2834

29-
def workers
30-
workers_options.flat_map do |worker_options|
31-
processes = if mode.fork?
32-
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
33-
else
34-
WORKER_DEFAULTS[:processes]
35-
end
36-
processes.times.map { Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
37-
end
38-
end
39-
40-
def dispatchers
41-
dispatchers_options.map do |dispatcher_options|
42-
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]
43-
Dispatcher.new **dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
44-
end
45-
end
46-
4735
def max_number_of_threads
4836
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
4937
workers_options.map { |options| options[:threads] }.max + 2
@@ -54,6 +42,24 @@ def max_number_of_threads
5442

5543
DEFAULT_CONFIG_FILE_PATH = "config/solid_queue.yml"
5644

45+
def workers
46+
workers_options.flat_map do |worker_options|
47+
processes = if mode.fork?
48+
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
49+
else
50+
WORKER_DEFAULTS[:processes]
51+
end
52+
processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) }
53+
end
54+
end
55+
56+
def dispatchers
57+
dispatchers_options.map do |dispatcher_options|
58+
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]
59+
Process.new :dispatcher, dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
60+
end
61+
end
62+
5763
def config_from(file_or_hash, env: Rails.env)
5864
config = load_config_from(file_or_hash)
5965
config[env.to_sym] ? config[env.to_sym] : config

lib/solid_queue/log_subscriber.rb

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@ def claim(event)
1212
end
1313

1414
def release_many_claimed(event)
15-
debug formatted_event(event, action: "Release claimed jobs", **event.payload.slice(:size))
15+
info formatted_event(event, action: "Release claimed jobs", **event.payload.slice(:size))
16+
end
17+
18+
def fail_many_claimed(event)
19+
warn formatted_event(event, action: "Fail claimed jobs", **event.payload.slice(:job_ids, :process_ids))
1620
end
1721

1822
def release_claimed(event)
19-
debug formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id))
23+
info formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id))
2024
end
2125

2226
def retry_all(event)
@@ -63,7 +67,8 @@ def start_process(event)
6367
attributes = {
6468
pid: process.pid,
6569
hostname: process.hostname,
66-
process_id: process.process_id
70+
process_id: process.process_id,
71+
name: process.name
6772
}.merge(process.metadata)
6873

6974
info formatted_event(event, action: "Started #{process.kind}", **attributes)
@@ -75,15 +80,16 @@ def shutdown_process(event)
7580
attributes = {
7681
pid: process.pid,
7782
hostname: process.hostname,
78-
process_id: process.process_id
83+
process_id: process.process_id,
84+
name: process.name
7985
}.merge(process.metadata)
8086

8187
info formatted_event(event, action: "Shutdown #{process.kind}", **attributes)
8288
end
8389

8490
def register_process(event)
8591
process_kind = event.payload[:kind]
86-
attributes = event.payload.slice(:pid, :hostname, :process_id)
92+
attributes = event.payload.slice(:pid, :hostname, :process_id, :name)
8793

8894
if error = event.payload[:error]
8995
warn formatted_event(event, action: "Error registering #{process_kind}", **attributes.merge(error: formatted_error(error)))
@@ -99,6 +105,7 @@ def deregister_process(event)
99105
process_id: process.id,
100106
pid: process.pid,
101107
hostname: process.hostname,
108+
name: process.name,
102109
last_heartbeat_at: process.last_heartbeat_at.iso8601,
103110
claimed_size: event.payload[:claimed_size],
104111
pruned: event.payload[:pruned]
@@ -147,7 +154,7 @@ def replace_fork(event)
147154
termsig: status.termsig
148155

149156
if replaced_fork = event.payload[:fork]
150-
info formatted_event(event, action: "Replaced terminated #{replaced_fork.kind}", **attributes.merge(hostname: replaced_fork.hostname))
157+
info formatted_event(event, action: "Replaced terminated #{replaced_fork.kind}", **attributes.merge(hostname: replaced_fork.hostname, name: replaced_fork.name))
151158
else
152159
warn formatted_event(event, action: "Tried to replace forked process but it had already died", **attributes)
153160
end

lib/solid_queue/processes/base.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ class Base
66
include Callbacks # Defines callbacks needed by other concerns
77
include AppExecutor, Registrable, Interruptible, Procline
88

9+
attr_reader :name
10+
11+
def initialize(*)
12+
@name = generate_name
13+
end
14+
915
def kind
1016
self.class.name.demodulize
1117
end
@@ -21,6 +27,11 @@ def pid
2127
def metadata
2228
{}
2329
end
30+
31+
private
32+
def generate_name
33+
[ kind.downcase, SecureRandom.hex(10) ].join("-")
34+
end
2435
end
2536
end
2637
end

0 commit comments

Comments
 (0)