Skip to content

Commit fc09974

Browse files
committed
Move away from a batch_processed_at to batch_execution model
* BatchExecution allows us to know for sure we only ever run completion on a job once. We destroy it and update the counts in a transaction. Also can remove the batch_processed_at field from jobs, which are meant to be touched as little as possible and relevant states reflected in *_execution models * It also gives us a slightly cleaner interface in the batchable classes * Updated some table naming and pruned unused fields/indexes * Increase child batch count as new batches are enqueued, even in existing batches * Refactor to a unified Batch interface * It was overly complicated to split Batch and BatchRecord apart just to keep a more strict interface * That concept was taken from GoodJob, but it didn't feel in the spirit of the simplicity of the SolidQueue project. It was alot of concepts to juggle in your head * Also moved around some files (like the cleanup and empty jobs) to the more appropriate app/jobs
1 parent a39037e commit fc09974

File tree

19 files changed

+384
-401
lines changed

19 files changed

+384
-401
lines changed
File renamed without changes.
File renamed without changes.

app/jobs/solid_queue/batch_update_job.rb

Lines changed: 0 additions & 25 deletions
This file was deleted.

app/models/solid_queue/batch.rb

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Batch < Record
5+
STATUSES = %w[pending processing completed failed]
6+
7+
belongs_to :parent_batch, foreign_key: :parent_batch_id, class_name: "SolidQueue::Batch", optional: true
8+
has_many :jobs, foreign_key: :batch_id, primary_key: :batch_id
9+
has_many :batch_executions, foreign_key: :batch_id, primary_key: :batch_id, class_name: "SolidQueue::BatchExecution"
10+
has_many :child_batches, foreign_key: :parent_batch_id, primary_key: :batch_id, class_name: "SolidQueue::Batch"
11+
12+
serialize :on_finish, coder: JSON
13+
serialize :on_success, coder: JSON
14+
serialize :on_failure, coder: JSON
15+
serialize :metadata, coder: JSON
16+
17+
validates :status, inclusion: { in: STATUSES }
18+
19+
scope :pending, -> { where(status: "pending") }
20+
scope :processing, -> { where(status: "processing") }
21+
scope :completed, -> { where(status: "completed") }
22+
scope :failed, -> { where(status: "failed") }
23+
scope :finished, -> { where(status: %w[completed failed]) }
24+
scope :unfinished, -> { where(status: %w[pending processing]) }
25+
26+
after_initialize :set_batch_id
27+
before_create :set_parent_batch_id
28+
29+
def enqueue(&block)
30+
raise "You cannot enqueue a batch that is already finished" if finished?
31+
32+
SolidQueue::Batch::Buffer.capture_child_batch(self) if new_record?
33+
34+
buffer = SolidQueue::Batch::Buffer.new
35+
buffer.capture do
36+
Batch.wrap_in_batch_context(batch_id) do
37+
block.call(self)
38+
end
39+
end
40+
41+
if enqueue_after_transaction_commit?
42+
ActiveRecord.after_all_transactions_commit do
43+
enqueue_batch(buffer)
44+
end
45+
else
46+
enqueue_batch(buffer)
47+
end
48+
end
49+
50+
def on_success=(value)
51+
super(serialize_callback(value))
52+
end
53+
54+
def on_failure=(value)
55+
super(serialize_callback(value))
56+
end
57+
58+
def on_finish=(value)
59+
super(serialize_callback(value))
60+
end
61+
62+
def check_completion!
63+
return if finished?
64+
65+
with_lock do
66+
return if finished_at?
67+
68+
if pending_jobs == 0
69+
unfinished_children = child_batches.where.not(status: %w[completed failed]).count
70+
71+
if total_child_batches == 0 || unfinished_children == 0
72+
new_status = failed_jobs > 0 ? "failed" : "completed"
73+
update!(status: new_status, finished_at: Time.current)
74+
execute_callbacks
75+
end
76+
elsif status == "pending" && (completed_jobs > 0 || failed_jobs > 0)
77+
# Move from pending to processing once any job completes
78+
update!(status: "processing")
79+
end
80+
end
81+
end
82+
83+
def finished?
84+
status.in?(%w[completed failed])
85+
end
86+
87+
def processing?
88+
status == "processing"
89+
end
90+
91+
def pending?
92+
status == "pending"
93+
end
94+
95+
def progress_percentage
96+
return 0 if total_jobs == 0
97+
((completed_jobs + failed_jobs) * 100.0 / total_jobs).round(2)
98+
end
99+
100+
private
101+
102+
def enqueue_after_transaction_commit?
103+
return false unless defined?(ApplicationJob.enqueue_after_transaction_commit)
104+
105+
case ApplicationJob.enqueue_after_transaction_commit
106+
when :always, true
107+
true
108+
when :never, false
109+
false
110+
when :default
111+
true
112+
end
113+
end
114+
115+
def enqueue_batch(buffer)
116+
if new_record?
117+
enqueue_new_batch(buffer)
118+
else
119+
enqueue_existing_batch(buffer)
120+
end
121+
end
122+
123+
def enqueue_new_batch(buffer)
124+
SolidQueue::Batch.transaction do
125+
save!
126+
127+
# If batch has no jobs, enqueue an EmptyJob
128+
# This ensures callbacks always execute, even for empty batches
129+
jobs = buffer.jobs.values
130+
if jobs.empty?
131+
empty_job = SolidQueue::Batch::EmptyJob.new
132+
empty_job.batch_id = batch_id
133+
jobs = [ empty_job ]
134+
end
135+
136+
# Enqueue jobs - this handles creation and preparation
137+
enqueued_count = SolidQueue::Job.enqueue_all(jobs)
138+
139+
persisted_jobs = jobs.select { |job| job.provider_job_id.present? }
140+
SolidQueue::BatchExecution.track_job_creation(persisted_jobs, batch_id)
141+
142+
# Update batch record with counts
143+
update!(
144+
total_jobs: enqueued_count,
145+
pending_jobs: enqueued_count,
146+
total_child_batches: buffer.child_batches.size
147+
)
148+
end
149+
end
150+
151+
def enqueue_existing_batch(buffer)
152+
jobs = buffer.jobs.values
153+
new_child_batches = buffer.child_batches.size
154+
155+
SolidQueue::Batch.transaction do
156+
enqueued_count = SolidQueue::Job.enqueue_all(jobs)
157+
158+
persisted_jobs = jobs.select(&:successfully_enqueued?)
159+
SolidQueue::BatchExecution.track_job_creation(persisted_jobs, batch_id)
160+
161+
Batch.where(batch_id: batch_id).update_all([
162+
"total_jobs = total_jobs + ?, pending_jobs = pending_jobs + ?, total_child_batches = total_child_batches + ?",
163+
enqueued_count, enqueued_count, new_child_batches
164+
])
165+
end
166+
167+
jobs.count(&:successfully_enqueued?)
168+
end
169+
170+
def set_parent_batch_id
171+
self.parent_batch_id ||= Batch.current_batch_id if Batch.current_batch_id.present?
172+
end
173+
174+
def set_batch_id
175+
self.batch_id ||= SecureRandom.uuid
176+
end
177+
178+
def as_active_job(active_job_klass)
179+
active_job_klass.is_a?(ActiveJob::Base) ? active_job_klass : active_job_klass.new
180+
end
181+
182+
def serialize_callback(value)
183+
return value if value.blank?
184+
active_job = as_active_job(value)
185+
# We can pick up batch ids from context, but callbacks should never be considered a part of the batch
186+
active_job.batch_id = nil
187+
active_job.serialize
188+
end
189+
190+
def perform_completion_job(job_field, attrs)
191+
active_job = ActiveJob::Base.deserialize(send(job_field))
192+
active_job.send(:deserialize_arguments_if_needed)
193+
active_job.arguments = [ self ] + Array.wrap(active_job.arguments)
194+
SolidQueue::Job.enqueue_all([ active_job ])
195+
196+
active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id
197+
attrs[job_field] = active_job.serialize
198+
end
199+
200+
def execute_callbacks
201+
if status == "failed"
202+
perform_completion_job(:on_failure, {}) if on_failure.present?
203+
elsif status == "completed"
204+
perform_completion_job(:on_success, {}) if on_success.present?
205+
end
206+
207+
perform_completion_job(:on_finish, {}) if on_finish.present?
208+
209+
clear_unpreserved_jobs
210+
211+
check_parent_completion!
212+
end
213+
214+
def clear_unpreserved_jobs
215+
SolidQueue::Batch::CleanupJob.perform_later(self) unless SolidQueue.preserve_finished_jobs?
216+
end
217+
218+
def check_parent_completion!
219+
if parent_batch_id.present?
220+
parent = Batch.find_by(batch_id: parent_batch_id)
221+
parent&.check_completion! unless parent&.finished?
222+
end
223+
end
224+
225+
class << self
226+
def enqueue(on_success: nil, on_failure: nil, on_finish: nil, metadata: nil, &block)
227+
new.tap do |batch|
228+
batch.assign_attributes(
229+
on_success: on_success,
230+
on_failure: on_failure,
231+
on_finish: on_finish,
232+
metadata: metadata,
233+
parent_batch_id: current_batch_id
234+
)
235+
236+
batch.enqueue(&block)
237+
end
238+
end
239+
240+
def update_job_count(batch_id, count)
241+
count = count.to_i
242+
Batch.where(batch_id: batch_id).update_all(
243+
"total_jobs = total_jobs + #{count}, pending_jobs = pending_jobs + #{count}",
244+
)
245+
end
246+
247+
def current_batch_id
248+
ActiveSupport::IsolatedExecutionState[:current_batch_id]
249+
end
250+
251+
def wrap_in_batch_context(batch_id)
252+
previous_batch_id = current_batch_id.presence || nil
253+
ActiveSupport::IsolatedExecutionState[:current_batch_id] = batch_id
254+
yield
255+
ensure
256+
ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id
257+
end
258+
end
259+
end
260+
end
261+
262+
require_relative "batch/buffer"

app/models/solid_queue/batch_record/buffer.rb renamed to app/models/solid_queue/batch/buffer.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# frozen_string_literal: true
22

33
module SolidQueue
4-
class BatchRecord
4+
class Batch
55
class Buffer
66
attr_reader :jobs, :child_batches
77

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class BatchExecution < Record
5+
belongs_to :job, optional: true
6+
belongs_to :batch, foreign_key: :batch_id, primary_key: :batch_id
7+
8+
class << self
9+
def track_job_creation(active_jobs, batch_id)
10+
execution_data = Array.wrap(active_jobs).map do |active_job|
11+
{
12+
job_id: active_job.provider_job_id,
13+
batch_id: batch_id
14+
}
15+
end
16+
17+
SolidQueue::BatchExecution.insert_all(execution_data)
18+
end
19+
20+
def process_job_completion(job, status)
21+
batch_id = job.batch_id
22+
batch_execution = job.batch_execution
23+
24+
return if batch_execution.blank?
25+
26+
transaction do
27+
batch_execution.destroy!
28+
29+
if status == "failed"
30+
Batch.where(batch_id: batch_id).update_all(
31+
"pending_jobs = pending_jobs - 1, failed_jobs = failed_jobs + 1"
32+
)
33+
else
34+
Batch.where(batch_id: batch_id).update_all(
35+
"pending_jobs = pending_jobs - 1, completed_jobs = completed_jobs + 1"
36+
)
37+
end
38+
end
39+
40+
batch = Batch.find_by(batch_id: batch_id)
41+
batch&.check_completion!
42+
end
43+
end
44+
end
45+
end

0 commit comments

Comments
 (0)