Skip to content

Commit e2855f6

Browse files
committed
Create batch executions alongside ready and scheduled executions
* Making it explicit is the easiest option, and the most in alignment with solid queue * Fix errors around upserting across providers. SQLite and Postgres share identical syntax (at least for this use-case) and mysql works differently
1 parent 74d20ce commit e2855f6

File tree

7 files changed

+61
-32
lines changed

7 files changed

+61
-32
lines changed

app/models/solid_queue/batch_execution.rb

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@ def create_all_from_jobs(jobs)
1818
total = jobs.size
1919
SolidQueue::Batch.upsert(
2020
{ batch_id:, total_jobs: total, pending_jobs: total },
21-
on_duplicate: Arel.sql(
22-
"total_jobs = total_jobs + #{total}, pending_jobs = pending_jobs + #{total}"
23-
)
21+
**provider_upsert_options
2422
)
2523
end
2624
end
@@ -48,6 +46,27 @@ def process_job_completion(job, status)
4846
batch = Batch.find_by(batch_id: batch_id)
4947
batch&.check_completion!
5048
end
49+
50+
private
51+
52+
def provider_upsert_options
53+
case connection.adapter_name
54+
when "PostgreSQL", "SQLite"
55+
{
56+
unique_by: :batch_id,
57+
on_duplicate: Arel.sql(
58+
"total_jobs = solid_queue_batches.total_jobs + excluded.total_jobs, " \
59+
"pending_jobs = solid_queue_batches.pending_jobs + excluded.pending_jobs"
60+
)
61+
}
62+
else
63+
{
64+
on_duplicate: Arel.sql(
65+
"total_jobs = total_jobs + VALUES(total_jobs), pending_jobs = pending_jobs + VALUES(pending_jobs)"
66+
)
67+
}
68+
end
69+
end
5170
end
5271
end
5372
end

app/models/solid_queue/execution/batch_preparable.rb

Lines changed: 0 additions & 25 deletions
This file was deleted.

app/models/solid_queue/job/batchable.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,21 @@ module Batchable
99
belongs_to :batch, foreign_key: :batch_id, primary_key: :batch_id, class_name: "SolidQueue::Batch", optional: true
1010
has_one :batch_execution, foreign_key: :job_id, dependent: :destroy
1111

12+
after_create :create_batch_execution, if: :batch_id?
1213
after_update :update_batch_progress, if: :batch_id?
1314
end
1415

16+
class_methods do
17+
def batch_all(jobs)
18+
BatchExecution.create_all_from_jobs(jobs)
19+
end
20+
end
21+
1522
private
23+
def create_batch_execution
24+
BatchExecution.create_all_from_jobs([ self ])
25+
end
26+
1627
def update_batch_progress
1728
return unless saved_change_to_finished_at? && finished_at.present?
1829
return unless batch_id.present?

app/models/solid_queue/job/executable.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ module Executable
1919
class_methods do
2020
def prepare_all_for_execution(jobs)
2121
due, not_yet_due = jobs.partition(&:due?)
22-
dispatch_all(due) + schedule_all(not_yet_due)
22+
(dispatch_all(due) + schedule_all(not_yet_due)).tap do |jobs|
23+
batch_all(jobs.select { |job| job.batch_id.present? })
24+
end
2325
end
2426

2527
def dispatch_all(jobs)

app/models/solid_queue/ready_execution.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
module SolidQueue
44
class ReadyExecution < Execution
5-
include BatchPreparable
6-
75
scope :queued_as, ->(queue_name) { where(queue_name: queue_name) }
86

97
assumes_attributes_from_job

app/models/solid_queue/scheduled_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
module SolidQueue
44
class ScheduledExecution < Execution
5-
include Dispatching, BatchPreparable
5+
include Dispatching
66

77
scope :due, -> { where(scheduled_at: ..Time.current) }
88
scope :ordered, -> { order(scheduled_at: :asc, priority: :asc, job_id: :asc) }

test/integration/batch_lifecycle_test.rb

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,30 @@ def perform
234234
assert_finished_in_order(batch2.reload, batch1.reload)
235235
end
236236

237+
test "executes the same with perform_all_later as it does a normal enqueue" do
238+
batch2 = nil
239+
batch1 = SolidQueue::Batch.enqueue do
240+
ActiveJob.perform_all_later([ FailingJob.new, FailingJob.new ])
241+
batch2 = SolidQueue::Batch.enqueue do
242+
ActiveJob.perform_all_later([ AddToBufferJob.new("ok"), AddToBufferJob.new("ok2") ])
243+
end
244+
end
245+
246+
@dispatcher.start
247+
@worker.start
248+
249+
wait_for_batches_to_finish_for(3.seconds)
250+
wait_for_jobs_to_finish_for(1.second)
251+
252+
assert_equal 6, batch1.reload.jobs.count
253+
assert_equal 6, batch1.total_jobs
254+
assert_equal 2, SolidQueue::Batch.finished.count
255+
assert_equal "failed", batch1.status
256+
assert_equal 2, batch2.reload.jobs.count
257+
assert_equal 2, batch2.total_jobs
258+
assert_equal "completed", batch2.status
259+
end
260+
237261
test "discarded jobs fire properly" do
238262
batch2 = nil
239263
batch1 = SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("0")) do

0 commit comments

Comments
 (0)