Skip to content

Commit bf8e3af

Browse files
committed
Remove support for nested batches for now
* We still track it, but it was causing alot of race conditions while trying to keep exclusively in callbacks. Running in a job or worker/dispatcher it works easily, but adds more overhead to the code and processing * Move to explicit timestamp fields instead of status fields so it's easier to track specifics of batch transitions * Move batches lower in the schema, after current models
1 parent af2bdc1 commit bf8e3af

File tree

9 files changed

+830
-167
lines changed

9 files changed

+830
-167
lines changed

CLAUDE.md

Lines changed: 729 additions & 0 deletions
Large diffs are not rendered by default.

app/jobs/solid_queue/batch/cleanup_job.rb

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22

33
module SolidQueue
44
class Batch
5-
class CleanupJob < ApplicationJob
6-
queue_as :background
7-
5+
class CleanupJob < (defined?(ApplicationJob) ? ApplicationJob : ActiveJob::Base)
86
discard_on ActiveRecord::RecordNotFound
97

108
def perform(job_batch)

app/jobs/solid_queue/batch_monitor_job.rb

Lines changed: 0 additions & 38 deletions
This file was deleted.

app/models/solid_queue/batch.rb

Lines changed: 15 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@
22

33
module SolidQueue
44
class Batch < Record
5-
STATUSES = %w[pending processing completed failed]
6-
75
include Trackable
86

9-
belongs_to :parent_batch, foreign_key: :parent_batch_id, class_name: "SolidQueue::Batch", optional: true
7+
belongs_to :parent_batch, foreign_key: :parent_batch_id, primary_key: :batch_id, class_name: "SolidQueue::Batch", optional: true
108
has_many :jobs, foreign_key: :batch_id, primary_key: :batch_id
119
has_many :batch_executions, foreign_key: :batch_id, primary_key: :batch_id, class_name: "SolidQueue::BatchExecution"
1210
has_many :child_batches, foreign_key: :parent_batch_id, primary_key: :batch_id, class_name: "SolidQueue::Batch"
@@ -16,10 +14,9 @@ class Batch < Record
1614
serialize :on_failure, coder: JSON
1715
serialize :metadata, coder: JSON
1816

19-
validates :status, inclusion: { in: STATUSES }
20-
2117
after_initialize :set_batch_id
2218
before_create :set_parent_batch_id
19+
after_commit :start_batch, on: :create, unless: -> { ActiveRecord.respond_to?(:after_all_transactions_commit) }
2320

2421
mattr_accessor :maintenance_queue_name
2522
self.maintenance_queue_name = "default"
@@ -33,12 +30,10 @@ def enqueue(&block)
3330
block.call(self)
3431
end
3532

36-
if ActiveSupport.respond_to?(:after_all_transactions_commit)
33+
if ActiveRecord.respond_to?(:after_all_transactions_commit)
3734
ActiveRecord.after_all_transactions_commit do
38-
start_monitoring
35+
start_batch
3936
end
40-
else
41-
start_monitoring
4237
end
4338
end
4439

@@ -55,22 +50,16 @@ def on_finish=(value)
5550
end
5651

5752
def check_completion!
58-
return if finished?
53+
return if finished? || !ready?
5954

6055
with_lock do
61-
return if finished_at?
56+
return if finished_at? || !ready?
6257

6358
if pending_jobs == 0
64-
unfinished_children = child_batches.where.not(status: %w[completed failed]).count
65-
66-
if unfinished_children == 0
67-
new_status = failed_jobs > 0 ? "failed" : "completed"
68-
update!(status: new_status, finished_at: Time.current)
69-
execute_callbacks
70-
end
71-
elsif status == "pending" && (completed_jobs > 0 || failed_jobs > 0)
72-
# Move from pending to processing once any job completes
73-
update!(status: "processing")
59+
finished_attributes = { finished_at: Time.current }
60+
finished_attributes[:failed_at] = Time.current if failed_jobs > 0
61+
update!(finished_attributes)
62+
execute_callbacks
7463
end
7564
end
7665
end
@@ -108,28 +97,19 @@ def perform_completion_job(job_field, attrs)
10897
end
10998

11099
def execute_callbacks
111-
if status == "failed"
100+
if failed_at?
112101
perform_completion_job(:on_failure, {}) if on_failure.present?
113-
elsif status == "completed"
102+
else
114103
perform_completion_job(:on_success, {}) if on_success.present?
115104
end
116105

117106
perform_completion_job(:on_finish, {}) if on_finish.present?
118107

119108
clear_unpreserved_jobs
120-
121-
check_parent_completion!
122109
end
123110

124111
def clear_unpreserved_jobs
125-
SolidQueue::Batch::CleanupJob.perform_later(self) unless SolidQueue.preserve_finished_jobs?
126-
end
127-
128-
def check_parent_completion!
129-
if parent_batch_id.present?
130-
parent = Batch.find_by(batch_id: parent_batch_id)
131-
parent&.check_completion! unless parent&.finished?
132-
end
112+
SolidQueue::Batch::CleanupJob.set(queue: self.class.maintenance_queue_name || "default").perform_later(self) unless SolidQueue.preserve_finished_jobs?
133113
end
134114

135115
def enqueue_empty_job
@@ -138,15 +118,9 @@ def enqueue_empty_job
138118
end
139119
end
140120

141-
def enqueue_monitor_job
142-
Batch.wrap_in_batch_context(nil) do
143-
BatchMonitorJob.set(queue: self.class.maintenance_queue_name || "default").perform_later(batch_id: batch_id)
144-
end
145-
end
146-
147-
def start_monitoring
121+
def start_batch
148122
enqueue_empty_job if reload.total_jobs == 0
149-
enqueue_monitor_job
123+
update!(enqueued_at: Time.current)
150124
end
151125

152126
class << self

app/models/solid_queue/batch/trackable.rb

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,26 @@ module Trackable
66
extend ActiveSupport::Concern
77

88
included do
9-
scope :pending, -> { where(status: "pending") }
10-
scope :processing, -> { where(status: "processing") }
11-
scope :completed, -> { where(status: "completed") }
12-
scope :failed, -> { where(status: "failed") }
13-
scope :finished, -> { where(status: %w[completed failed]) }
14-
scope :unfinished, -> { where(status: %w[pending processing]) }
9+
scope :finished, -> { where.not(finished_at: nil) }
10+
scope :succeeded, -> { finished.where(failed_at: nil) }
11+
scope :unfinished, -> { where(finished_at: nil) }
12+
scope :failed, -> { where.not(failed_at: nil) }
1513
end
1614

17-
def finished?
18-
status.in?(%w[completed failed])
15+
def failed?
16+
failed_at.present?
17+
end
18+
19+
def succeeded?
20+
finished? && !failed?
1921
end
2022

21-
def processing?
22-
status == "processing"
23+
def finished?
24+
finished_at.present?
2325
end
2426

25-
def pending?
26-
status == "pending"
27+
def ready?
28+
enqueued_at.present?
2729
end
2830

2931
def progress_percentage

app/models/solid_queue/batch_execution.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ def process_job_completion(job, status)
4242
)
4343
end
4444
end
45+
46+
batch = Batch.find_by(batch_id: batch_id)
47+
batch.check_completion! if batch.present?
4548
end
4649

4750
private

lib/generators/solid_queue/install/templates/db/queue_schema.rb

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,32 +26,6 @@
2626
t.index [ "job_id" ], name: "index_solid_queue_failed_executions_on_job_id", unique: true
2727
end
2828

29-
create_table "solid_queue_batches", force: :cascade do |t|
30-
t.string "batch_id", null: false
31-
t.string "parent_batch_id"
32-
t.text "on_finish"
33-
t.text "on_success"
34-
t.text "on_failure"
35-
t.text "metadata"
36-
t.integer "total_jobs", default: 0, null: false
37-
t.integer "pending_jobs", default: 0, null: false
38-
t.integer "completed_jobs", default: 0, null: false
39-
t.integer "failed_jobs", default: 0, null: false
40-
t.string "status", default: "pending", null: false
41-
t.datetime "finished_at"
42-
t.datetime "created_at", null: false
43-
t.datetime "updated_at", null: false
44-
t.index [ "batch_id" ], name: "index_solid_queue_batches_on_batch_id", unique: true
45-
t.index [ "parent_batch_id" ], name: "index_solid_queue_batches_on_parent_batch_id"
46-
end
47-
48-
create_table "solid_queue_batch_executions", force: :cascade do |t|
49-
t.bigint "job_id", null: false
50-
t.string "batch_id", null: false
51-
t.datetime "created_at", null: false
52-
t.index [ "job_id" ], name: "index_solid_queue_batch_executions_on_job_id", unique: true
53-
end
54-
5529
create_table "solid_queue_jobs", force: :cascade do |t|
5630
t.string "queue_name", null: false
5731
t.string "class_name", null: false
@@ -148,6 +122,33 @@
148122
t.index [ "key" ], name: "index_solid_queue_semaphores_on_key", unique: true
149123
end
150124

125+
create_table "solid_queue_batches", force: :cascade do |t|
126+
t.string "batch_id"
127+
t.string "parent_batch_id"
128+
t.text "on_finish"
129+
t.text "on_success"
130+
t.text "on_failure"
131+
t.text "metadata"
132+
t.integer "total_jobs", default: 0, null: false
133+
t.integer "pending_jobs", default: 0, null: false
134+
t.integer "completed_jobs", default: 0, null: false
135+
t.integer "failed_jobs", default: 0, null: false
136+
t.datetime "enqueued_at"
137+
t.datetime "finished_at"
138+
t.datetime "failed_at"
139+
t.datetime "created_at", null: false
140+
t.datetime "updated_at", null: false
141+
t.index ["batch_id"], name: "index_solid_queue_batches_on_batch_id", unique: true
142+
t.index ["parent_batch_id"], name: "index_solid_queue_batches_on_parent_batch_id"
143+
end
144+
145+
create_table "solid_queue_batch_executions", force: :cascade do |t|
146+
t.bigint "job_id", null: false
147+
t.string "batch_id", null: false
148+
t.datetime "created_at", null: false
149+
t.index [ "job_id" ], name: "index_solid_queue_batch_executions_on_job_id", unique: true
150+
end
151+
151152
add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
152153
add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
153154
add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade

test/dummy/db/queue_schema.rb

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,32 +38,6 @@
3838
t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true
3939
end
4040

41-
create_table "solid_queue_batches", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
42-
t.string "batch_id"
43-
t.string "parent_batch_id"
44-
t.text "on_finish"
45-
t.text "on_success"
46-
t.text "on_failure"
47-
t.text "metadata"
48-
t.integer "total_jobs", default: 0, null: false
49-
t.integer "pending_jobs", default: 0, null: false
50-
t.integer "completed_jobs", default: 0, null: false
51-
t.integer "failed_jobs", default: 0, null: false
52-
t.string "status", default: "pending", null: false
53-
t.datetime "finished_at"
54-
t.datetime "created_at", null: false
55-
t.datetime "updated_at", null: false
56-
t.index ["batch_id"], name: "index_solid_queue_batches_on_batch_id", unique: true
57-
t.index ["parent_batch_id"], name: "index_solid_queue_batches_on_parent_batch_id"
58-
end
59-
60-
create_table "solid_queue_batch_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
61-
t.bigint "job_id", null: false
62-
t.string "batch_id", null: false
63-
t.datetime "created_at", null: false
64-
t.index ["job_id"], name: "index_solid_queue_batch_executions_on_job_id", unique: true
65-
end
66-
6741
create_table "solid_queue_jobs", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
6842
t.string "queue_name", null: false
6943
t.string "class_name", null: false
@@ -160,6 +134,33 @@
160134
t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true
161135
end
162136

137+
create_table "solid_queue_batches", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
138+
t.string "batch_id"
139+
t.string "parent_batch_id"
140+
t.text "on_finish"
141+
t.text "on_success"
142+
t.text "on_failure"
143+
t.text "metadata"
144+
t.integer "total_jobs", default: 0, null: false
145+
t.integer "pending_jobs", default: 0, null: false
146+
t.integer "completed_jobs", default: 0, null: false
147+
t.integer "failed_jobs", default: 0, null: false
148+
t.datetime "enqueued_at"
149+
t.datetime "finished_at"
150+
t.datetime "failed_at"
151+
t.datetime "created_at", null: false
152+
t.datetime "updated_at", null: false
153+
t.index ["batch_id"], name: "index_solid_queue_batches_on_batch_id", unique: true
154+
t.index ["parent_batch_id"], name: "index_solid_queue_batches_on_parent_batch_id"
155+
end
156+
157+
create_table "solid_queue_batch_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
158+
t.bigint "job_id", null: false
159+
t.string "batch_id", null: false
160+
t.datetime "created_at", null: false
161+
t.index ["job_id"], name: "index_solid_queue_batch_executions_on_job_id", unique: true
162+
end
163+
163164
add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
164165
add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
165166
add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade

0 commit comments

Comments
 (0)