Skip to content

Commit 08ac10b

Browse files
committed
Performance improvements
* By querying batch executions remaining, the query times remain very fast. * When we are constantly updating the single batch row counts, it becomes a hotspot. Fast executing jobs quickly accumulate and slow down overall job processing (processing a few thousand jobs goes for 10ish seconds to 40ish seconds). This still adds a bit of overhead, but significantly less (10ish seconds to 15ish seconds) * Handle batch completion in an after_commit to make sure the transaction is visible before checking executions. This may mean we need to introduce some monitoring in the cases an after_commit fails to fire due network issues or a database issue
1 parent 3f04e8f commit 08ac10b

File tree

5 files changed

+52
-36
lines changed

5 files changed

+52
-36
lines changed

app/models/solid_queue/batch.rb

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,28 @@ def on_finish=(value)
5050

5151
def check_completion!
5252
return if finished? || !ready?
53+
return if batch_executions.limit(1).exists?
5354

54-
with_lock do
55-
return if finished_at? || !ready?
55+
rows = Batch
56+
.by_batch_id(batch_id)
57+
.unfinished
58+
.empty_executions
59+
.update_all(finished_at: Time.current)
60+
61+
return if rows.zero?
5662

57-
if pending_jobs == 0
58-
finished_attributes = { finished_at: Time.current }
59-
finished_attributes[:failed_at] = Time.current if failed_jobs > 0
60-
update!(finished_attributes)
61-
execute_callbacks
63+
with_lock do
64+
failed = jobs.joins(:failed_execution).count
65+
finished_attributes = {}
66+
if failed > 0
67+
finished_attributes[:failed_at] = Time.current
68+
finished_attributes[:failed_jobs] = failed
6269
end
70+
finished_attributes[:completed_jobs] = total_jobs - failed
71+
finished_attributes[:pending_jobs] = 0
72+
73+
update!(finished_attributes)
74+
execute_callbacks
6375
end
6476
end
6577

app/models/solid_queue/batch/trackable.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,16 @@ module Trackable
1010
scope :succeeded, -> { finished.where(failed_at: nil) }
1111
scope :unfinished, -> { where(finished_at: nil) }
1212
scope :failed, -> { where.not(failed_at: nil) }
13+
scope :by_batch_id, ->(batch_id) { where(batch_id:) }
14+
scope :empty_executions, -> {
15+
where(<<~SQL)
16+
NOT EXISTS (
17+
SELECT 1 FROM solid_queue_batch_executions
18+
WHERE solid_queue_batch_executions.batch_id = solid_queue_batches.batch_id
19+
LIMIT 1
20+
)
21+
SQL
22+
}
1323
end
1424

1525
def status
@@ -38,6 +48,18 @@ def ready?
3848
enqueued_at.present?
3949
end
4050

51+
def completed_jobs
52+
finished? ? self[:completed_jobs] : total_jobs - batch_executions.count
53+
end
54+
55+
def failed_jobs
56+
finished? ? self[:failed_jobs] : jobs.joins(:failed_execution).count
57+
end
58+
59+
def pending_jobs
60+
finished? ? self[:pending_jobs] : batch_executions.count
61+
end
62+
4163
def progress_percentage
4264
return 0 if total_jobs == 0
4365
((completed_jobs + failed_jobs) * 100.0 / total_jobs).round(2)

app/models/solid_queue/batch_execution.rb

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ class BatchExecution < Record
55
belongs_to :job, optional: true
66
belongs_to :batch, foreign_key: :batch_id, primary_key: :batch_id
77

8+
after_commit :check_completion, on: :destroy
9+
10+
private
11+
def check_completion
12+
batch = Batch.find_by(batch_id: batch_id)
13+
batch.check_completion! if batch.present?
14+
end
15+
816
class << self
917
def create_all_from_jobs(jobs)
1018
batch_jobs = jobs.select { |job| job.batch_id.present? }
@@ -23,30 +31,6 @@ def create_all_from_jobs(jobs)
2331
end
2432
end
2533

26-
def process_job_completion(job, status)
27-
batch_id = job.batch_id
28-
batch_execution = job.batch_execution
29-
30-
return if batch_execution.blank?
31-
32-
transaction do
33-
batch_execution.destroy!
34-
35-
if status == "failed"
36-
Batch.where(batch_id: batch_id).update_all(
37-
"pending_jobs = pending_jobs - 1, failed_jobs = failed_jobs + 1"
38-
)
39-
else
40-
Batch.where(batch_id: batch_id).update_all(
41-
"pending_jobs = pending_jobs - 1, completed_jobs = completed_jobs + 1"
42-
)
43-
end
44-
end
45-
46-
batch = Batch.find_by(batch_id: batch_id)
47-
batch.check_completion! if batch.present?
48-
end
49-
5034
private
5135

5236
def provider_upsert_options

app/models/solid_queue/execution/batchable.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ module Batchable
1111

1212
private
1313
def update_batch_progress
14-
# FailedExecutions are only created when the job is done retrying
1514
if is_a?(FailedExecution)
16-
BatchExecution.process_job_completion(job, "failed")
15+
# FailedExecutions are only created when the job is done retrying
16+
job.batch_execution&.destroy!
1717
end
1818
rescue => e
1919
Rails.logger.error "[SolidQueue] Failed to notify batch #{job.batch_id} about job #{job.id} failure: #{e.message}"

app/models/solid_queue/job/batchable.rb

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ def update_batch_progress
2828
return unless saved_change_to_finished_at? && finished_at.present?
2929
return unless batch_id.present?
3030

31-
# Jobs marked as finished are considered completed
32-
# (even if they failed and are being retried - we don't know that here)
33-
BatchExecution.process_job_completion(self, "completed")
31+
batch_execution&.destroy!
3432
rescue => e
3533
Rails.logger.error "[SolidQueue] Failed to update batch #{batch_id} progress for job #{id}: #{e.message}"
3634
end

0 commit comments

Comments
 (0)