Skip to content

Commit c1a4f2f

Browse files
committed
Handle on_failure and on_success
* on_failure fires the first time any of the jobs fail, even once * on_success only fires if all jobs work (after retries) * remove unused job_id
1 parent f71bf0c commit c1a4f2f

File tree

3 files changed

+50
-14
lines changed

3 files changed

+50
-14
lines changed

app/models/solid_queue/job_batch.rb

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ class JobBatch < Record
77

88
serialize :on_finish_active_job, coder: JSON
99
serialize :on_success_active_job, coder: JSON
10+
serialize :on_failure_active_job, coder: JSON
1011

1112
scope :incomplete, -> {
1213
where(finished_at: nil).where("changed_at IS NOT NULL OR last_changed_at < ?", 1.hour.ago)
1314
}
15+
scope :finished, -> { where.not(finished_at: nil) }
1416

1517
class << self
1618
def current_batch_id
@@ -45,6 +47,7 @@ def dispatch_finished_batches
4547
def batch_attributes(attributes)
4648
on_finish_klass = attributes.delete(:on_finish)
4749
on_success_klass = attributes.delete(:on_success)
50+
on_failure_klass = attributes.delete(:on_failure)
4851

4952
if on_finish_klass.present?
5053
attributes[:on_finish_active_job] = as_active_job(on_finish_klass).serialize
@@ -54,6 +57,10 @@ def batch_attributes(attributes)
5457
attributes[:on_success_active_job] = as_active_job(on_success_klass).serialize
5558
end
5659

60+
if on_failure_klass.present?
61+
attributes[:on_failure_active_job] = as_active_job(on_failure_klass).serialize
62+
end
63+
5764
attributes
5865
end
5966

@@ -69,29 +76,51 @@ def finished?
6976
def finish
7077
return if finished?
7178
reset_changed_at
72-
jobs.find_each do |next_job|
73-
# FIXME: If it's failed but is going to retry, how do we know?
74-
# Because we need to know if we will determine what the failed execution means
75-
# FIXME: use "success" vs "finish" vs "discard" `completion_type` to determine
76-
# how to analyze each job
77-
return unless next_job.finished?
78-
end
7979

80+
all_jobs_succeeded = true
8081
attrs = {}
82+
jobs.find_each do |next_job|
83+
# SolidQueue does treats `discard_on` differently than failures. The job will report as being :finished,
84+
# and there is no record of the failure.
85+
# GoodJob would report a discard as an error. It's possible we should do that in the future?
86+
if fire_failure_job?(next_job)
87+
perform_completion_job(:on_failure_active_job, attrs)
88+
update!(attrs)
89+
end
90+
91+
status = next_job.status
92+
all_jobs_succeeded = all_jobs_succeeded && status != :failed
93+
return unless status.in?([ :finished, :failed ])
94+
end
8195

8296
if on_finish_active_job.present?
83-
active_job = ActiveJob::Base.deserialize(on_finish_active_job)
84-
active_job.send(:deserialize_arguments_if_needed)
85-
active_job.arguments = [ self ] + Array.wrap(active_job.arguments)
86-
ActiveJob.perform_all_later([ active_job ])
87-
attrs[:job] = Job.find_by(active_job_id: active_job.job_id)
97+
perform_completion_job(:on_finish_active_job, attrs)
98+
end
99+
100+
if on_success_active_job.present? && all_jobs_succeeded
101+
perform_completion_job(:on_success_active_job, attrs)
88102
end
89103

90104
update!({ finished_at: Time.zone.now }.merge(attrs))
91105
end
92106

93107
private
94108

109+
def fire_failure_job?(job)
110+
return false if on_failure_active_job.blank? || job.failed_execution.blank?
111+
job = ActiveJob::Base.deserialize(on_failure_active_job)
112+
job.provider_job_id.blank?
113+
end
114+
115+
def perform_completion_job(job_field, attrs)
116+
active_job = ActiveJob::Base.deserialize(send(job_field))
117+
active_job.send(:deserialize_arguments_if_needed)
118+
active_job.arguments = [ self ] + Array.wrap(active_job.arguments)
119+
ActiveJob.perform_all_later([ active_job ])
120+
active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id
121+
attrs[job_field] = active_job.serialize
122+
end
123+
95124
def reset_changed_at
96125
if changed_at.blank? && last_changed_at.present?
97126
update_columns(last_changed_at: Time.zone.now) # wait another hour before we check again

db/migrate/20240131013203_create_solid_queue_batch_table.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
class CreateSolidQueueBatchTable < ActiveRecord::Migration[7.1]
22
def change
33
create_table :solid_queue_job_batches do |t|
4-
t.references :job, index: { unique: true }
54
t.text :on_finish_active_job
65
t.text :on_success_active_job
6+
t.text :on_failure_active_job
77
t.datetime :finished_at
88
t.datetime :changed_at
99
t.datetime :last_changed_at
@@ -16,6 +16,5 @@ def change
1616

1717
add_reference :solid_queue_jobs, :batch, index: true
1818
add_foreign_key :solid_queue_jobs, :solid_queue_job_batches, column: :batch_id, on_delete: :cascade
19-
add_foreign_key :solid_queue_job_batches, :solid_queue_jobs, column: :job_id
2019
end
2120
end

test/test_helpers/jobs_test_helper.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ def wait_for_jobs_to_be_released_for(timeout = 1.second)
1717
end
1818
end
1919

20+
def wait_for_job_batches_to_finish_for(timeout = 1.second)
21+
wait_while_with_timeout(timeout) do
22+
skip_active_record_query_cache do
23+
SolidQueue::JobBatch.where(finished_at: nil).any?
24+
end
25+
end
26+
end
27+
2028
def assert_unfinished_jobs(*jobs)
2129
skip_active_record_query_cache do
2230
assert_equal jobs.map(&:job_id).sort, SolidQueue::Job.where(finished_at: nil).map(&:active_job_id).sort

0 commit comments

Comments
 (0)