Skip to content

Commit 0bf3e65

Browse files
jpcamaramhenrixon
andcommitted
Refactor internals and api namespace of batches
* Thanks to Mikael Henriksson for his work in #590. His work decentralizes management of batch status by moving it to the BatchUpdateJob, and tracking status using counts rather than querying specific job statuses after the fact. This is a much simpler approach to tracking the jobs, and allows us to avoid a constantly polling set of queries in the dispatcher. Also add in arbitrary metadata to allow tracking data from start to end of execution. This also means enqueueing a BatchUpdateJob based on callbacks in two different kinds of Batchable, which are included when a job is updated and finished, or when a FailedExecution is created (since failed jobs never "finish"). * This batch feature already took some inspiration from the GoodJob batch implementation (https://github.com/bensheldon/good_job). But now we also increase that by adopting some of the buffering and abstractions in a similar form as GoodJob. To discourage heavy reliance on the JobBatch model, it has been renamed to BatchRecord, and a separate Batch interface is how you interact with batches, with some delegation to the core model. * A new Buffer class (also modeled after GoodJob) was added specifically for batches. This was primarily added to support enqueue_after_transaction_commit. We now override the ActiveJob #enqueue method so we can keep track of which jobs are attempting to enqueue. When enqueue_after_transaction_commit is on, those jobs do not enqueue until all transactions commit. By tracking them at the high level enqueue and keeping a buffer of jobs, we can ensure that the jobs get tracked even when their creation is deferred until the transaction is committed. The side benefit is that we get to enqueue all the jobs together, probably offering some performance advantage. This buffer also keeps track of child batches for the same reason. * To support triggering a callback/BatchUpdateJob when a job finishes, the update to finished_at needed to become an update! call * As a simplification, on_failure is now only fired after all jobs finish, rather than at the first time a job fails * The adapter logic itself also needed to be updated to support the buffer and enqueue_after_transaction_commit. If a job is coming from a batch enqueue, we ignore it here and allow the batching process to enqueue_all at the end of the enqueue block. If the job is originally from a batch, but is retrying, we make sure the job counts in the batch stay updated. I don't love this addition, since it adds alot of complication to the adapter code, all solely oriented around batches * Batches benefit from keeping jobs until the batch has finished. As such, we ignore the preserve jobs setting, but if it is set to false, we enqueue a cleanup job once the batch has finished and clear out finished jobs Co-authored-by: Mikael Henriksson <[email protected]>
1 parent 9499210 commit 0bf3e65

File tree

27 files changed

+878
-325
lines changed

27 files changed

+878
-325
lines changed

README.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -595,9 +595,10 @@ and optionally trigger callbacks based on their status. It supports the followin
595595
- Three available callbacks to fire:
596596
- `on_finish`: Fired when all jobs have finished, including retries. Fires even when some jobs have failed.
597597
- `on_success`: Fired when all jobs have succeeded, including retries. Will not fire if any jobs have failed, but will fire if jobs have been discarded using `discard_on`
598-
- `on_failure`: Fired the _first_ time a job fails, after all retries are exhausted.
598+
- `on_failure`: Fired when all jobs have finished, including retries. Will only fire if one or more jobs have failed.
599599
- If a job is part of a batch, it can enqueue more jobs for that batch using `batch#enqueue`
600-
- Batches can be nested within other batches, creating a hierarchy. Outer batches will not finish until all nested batches have finished.
600+
- Batches can be nested within other batches, creating a hierarchy. Outer batches will not fire callbacks until all nested jobs have finished.
601+
- Attaching arbitrary metadata to a batch
601602

602603
```rb
603604
class SleepyJob < ApplicationJob
@@ -614,7 +615,7 @@ class MultiStepJob < ApplicationJob
614615
# Because of this nested batch, the top-level batch won't finish until the inner,
615616
# 10 second job finishes
616617
# Both jobs will still run simultaneously
617-
SolidQueue::JobBatch.enqueue do
618+
SolidQueue::Batch.enqueue do
618619
SleepyJob.perform_later(10)
619620
end
620621
end
@@ -639,10 +640,11 @@ class BatchFailureJob < ApplicationJob
639640
end
640641
end
641642
642-
SolidQueue::JobBatch.enqueue(
643+
SolidQueue::Batch.enqueue(
643644
on_finish: BatchFinishJob,
644645
on_success: BatchSuccessJob,
645-
on_failure: BatchFailureJob
646+
on_failure: BatchFailureJob,
647+
metadata: { user_id: 123 }
646648
) do
647649
5.times.map { |i| SleepyJob.perform_later(i) }
648650
end
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class BatchUpdateJob < ActiveJob::Base
5+
class UpdateFailure < RuntimeError; end
6+
7+
queue_as :background
8+
9+
discard_on ActiveRecord::RecordNotFound
10+
11+
def perform(batch_id, job)
12+
batch = SolidQueue::BatchRecord.find_by!(batch_id: batch_id)
13+
14+
return if job.batch_id != batch_id
15+
16+
status = job.status
17+
return unless status.in?([ :finished, :failed ])
18+
19+
batch.job_finished!(job)
20+
rescue => e
21+
Rails.logger.error "[SolidQueue] BatchUpdateJob failed for batch #{batch_id}, job #{job.id}: #{e.message}"
22+
raise
23+
end
24+
end
25+
end
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class BatchRecord < Record
5+
self.table_name = "solid_queue_job_batches"
6+
7+
STATUSES = %w[pending processing completed failed]
8+
9+
belongs_to :parent_job_batch, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::BatchRecord", optional: true
10+
has_many :jobs, foreign_key: :batch_id, primary_key: :batch_id
11+
has_many :children, foreign_key: :parent_job_batch_id, primary_key: :batch_id, class_name: "SolidQueue::BatchRecord"
12+
13+
serialize :on_finish, coder: JSON
14+
serialize :on_success, coder: JSON
15+
serialize :on_failure, coder: JSON
16+
serialize :metadata, coder: JSON
17+
18+
validates :status, inclusion: { in: STATUSES }
19+
20+
scope :pending, -> { where(status: "pending") }
21+
scope :processing, -> { where(status: "processing") }
22+
scope :completed, -> { where(status: "completed") }
23+
scope :failed, -> { where(status: "failed") }
24+
scope :finished, -> { where(status: %w[completed failed]) }
25+
scope :unfinished, -> { where(status: %w[pending processing]) }
26+
27+
after_initialize :set_batch_id
28+
before_create :set_parent_job_batch_id
29+
30+
def on_success=(value)
31+
super(serialize_callback(value))
32+
end
33+
34+
def on_failure=(value)
35+
super(serialize_callback(value))
36+
end
37+
38+
def on_finish=(value)
39+
super(serialize_callback(value))
40+
end
41+
42+
def job_finished!(job)
43+
return if finished?
44+
45+
transaction do
46+
if job.failed_execution.present?
47+
self.class.where(id: id).update_all(
48+
"failed_jobs = failed_jobs + 1, pending_jobs = pending_jobs - 1"
49+
)
50+
else
51+
self.class.where(id: id).update_all(
52+
"completed_jobs = completed_jobs + 1, pending_jobs = pending_jobs - 1"
53+
)
54+
end
55+
56+
reload
57+
check_completion!
58+
end
59+
end
60+
61+
def check_completion!
62+
return if finished?
63+
64+
actual_children = children.count
65+
return if actual_children < expected_children
66+
67+
children.find_each do |child|
68+
return unless child.finished?
69+
end
70+
71+
if pending_jobs <= 0
72+
if failed_jobs > 0
73+
mark_as_failed!
74+
else
75+
mark_as_completed!
76+
end
77+
clear_unpreserved_jobs
78+
elsif status == "pending"
79+
update!(status: "processing")
80+
end
81+
end
82+
83+
def finished?
84+
status.in?(%w[completed failed])
85+
end
86+
87+
def processing?
88+
status == "processing"
89+
end
90+
91+
def pending?
92+
status == "pending"
93+
end
94+
95+
def progress_percentage
96+
return 0 if total_jobs == 0
97+
((completed_jobs + failed_jobs) * 100.0 / total_jobs).round(2)
98+
end
99+
100+
private
101+
102+
def set_parent_job_batch_id
103+
self.parent_job_batch_id ||= Batch.current_batch_id if Batch.current_batch_id.present?
104+
end
105+
106+
def set_batch_id
107+
self.batch_id ||= SecureRandom.uuid
108+
end
109+
110+
def as_active_job(active_job_klass)
111+
active_job_klass.is_a?(ActiveJob::Base) ? active_job_klass : active_job_klass.new
112+
end
113+
114+
def serialize_callback(value)
115+
return value if value.blank?
116+
as_active_job(value).serialize
117+
end
118+
119+
def perform_completion_job(job_field, attrs)
120+
active_job = ActiveJob::Base.deserialize(send(job_field))
121+
active_job.send(:deserialize_arguments_if_needed)
122+
active_job.arguments = [ Batch.new(_batch_record: self) ] + Array.wrap(active_job.arguments)
123+
ActiveJob.perform_all_later([ active_job ])
124+
125+
active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id
126+
attrs[job_field] = active_job.serialize
127+
end
128+
129+
def mark_as_completed!
130+
# SolidQueue does treats `discard_on` differently than failures. The job will report as being :finished,
131+
# and there is no record of the failure.
132+
# GoodJob would report a discard as an error. It's possible we should do that in the future?
133+
update!(status: "completed", finished_at: Time.current)
134+
135+
perform_completion_job(:on_success, {}) if on_success.present?
136+
perform_completion_job(:on_finish, {}) if on_finish.present?
137+
138+
if parent_job_batch_id.present?
139+
parent = BatchRecord.find_by(batch_id: parent_job_batch_id)
140+
parent&.reload&.check_completion!
141+
end
142+
end
143+
144+
def mark_as_failed!
145+
update!(status: "failed", finished_at: Time.current)
146+
perform_completion_job(:on_failure, {}) if on_failure.present?
147+
perform_completion_job(:on_finish, {}) if on_finish.present?
148+
149+
# Check if parent batch can now complete
150+
if parent_job_batch_id.present?
151+
parent = BatchRecord.find_by(batch_id: parent_job_batch_id)
152+
parent&.check_completion!
153+
end
154+
end
155+
156+
def clear_unpreserved_jobs
157+
SolidQueue::Batch::CleanupJob.perform_later(self) unless SolidQueue.preserve_finished_jobs?
158+
end
159+
end
160+
end
161+
162+
require_relative "job_batch/buffer"

app/models/solid_queue/claimed_execution.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,6 @@ def perform
6969
failed_with(result.error)
7070
raise result.error
7171
end
72-
73-
job.job_batch.touch(:changed_at, :last_changed_at) if job.batch_id.present?
7472
ensure
7573
unblock_next_job
7674
end
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Execution
5+
module Batchable
6+
extend ActiveSupport::Concern
7+
8+
included do
9+
after_create :update_batch_progress, if: -> { job.batch_id? }
10+
end
11+
12+
private
13+
def update_batch_progress
14+
BatchUpdateJob.perform_later(job.batch_id, job)
15+
rescue => e
16+
Rails.logger.error "[SolidQueue] Failed to notify batch #{batch_id} about job #{id} completion: #{e.message}"
17+
end
18+
end
19+
end
20+
end

app/models/solid_queue/failed_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
module SolidQueue
44
class FailedExecution < Execution
5-
include Dispatching
5+
include Dispatching, Batchable
66

77
serialize :error, coder: JSON
88

app/models/solid_queue/job.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@ module SolidQueue
44
class Job < Record
55
class EnqueueError < StandardError; end
66

7-
include Executable, Clearable, Recurrable
7+
include Executable, Clearable, Recurrable, Batchable
88

99
serialize :arguments, coder: JSON
1010

11-
belongs_to :job_batch, foreign_key: :batch_id, optional: true
12-
1311
class << self
1412
def enqueue_all(active_jobs)
1513
active_jobs_by_job_id = active_jobs.index_by(&:job_id)
@@ -56,7 +54,6 @@ def create_all_from_active_jobs(active_jobs)
5654
end
5755

5856
def attributes_from_active_job(active_job)
59-
active_job.batch_id = JobBatch.current_batch_id || active_job.batch_id
6057
{
6158
queue_name: active_job.queue_name || DEFAULT_QUEUE_NAME,
6259
active_job_id: active_job.job_id,
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Job
5+
module Batchable
6+
extend ActiveSupport::Concern
7+
8+
included do
9+
belongs_to :job_batch, foreign_key: :batch_id, optional: true
10+
11+
after_update :update_batch_progress, if: :batch_id?
12+
end
13+
14+
private
15+
def update_batch_progress
16+
return unless saved_change_to_finished_at? && finished_at.present?
17+
return unless batch_id.present?
18+
19+
BatchUpdateJob.perform_later(batch_id, self)
20+
rescue => e
21+
Rails.logger.error "[SolidQueue] Failed to notify batch #{batch_id} about job #{id} completion: #{e.message}"
22+
end
23+
end
24+
end
25+
end

app/models/solid_queue/job/executable.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ def dispatch_bypassing_concurrency_limits
7676
end
7777

7878
def finished!
79-
if SolidQueue.preserve_finished_jobs? || batch_id.present?
80-
touch(:finished_at)
79+
if SolidQueue.preserve_finished_jobs? || batch_id.present? # We clear jobs after the batch finishes
80+
update!(finished_at: Time.current)
8181
else
8282
destroy!
8383
end

0 commit comments

Comments
 (0)