Skip to content

Commit 8565c3a

Browse files
committed
Move batch completion checks to job
* Reduce load from each callback, and makes checks less susceptible to race conditions * Make sure monitor jobs can run, even absent of an ApplicationJob * Allow setting the queue on the maintenance jobs * Bring back emptyjob for empty queues
1 parent 85cb881 commit 8565c3a

File tree

6 files changed

+98
-50
lines changed

6 files changed

+98
-50
lines changed

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
2727
- [Performance considerations](#performance-considerations)
2828
- [Failed jobs and retries](#failed-jobs-and-retries)
2929
- [Error reporting on jobs](#error-reporting-on-jobs)
30+
- [Batch jobs](#batch-jobs)
3031
- [Puma plugin](#puma-plugin)
3132
- [Jobs and transactional integrity](#jobs-and-transactional-integrity)
3233
- [Recurring tasks](#recurring-tasks)
@@ -650,6 +651,21 @@ SolidQueue::Batch.enqueue(
650651
end
651652
```
652653

654+
### Batch options
655+
656+
As part of the processing of a batch, some jobs are automatically enqueued:
657+
658+
- A `SolidQueue::Batch::BatchMonitorJob` is enqueued for every `Batch` being processed
659+
- In the case of an empty batch, a `SolidQueue::Batch::EmptyJob` is enqueued
660+
661+
By default, these jobs run on the `default` queue. You can specify an alternative queue for them in an initializer:
662+
663+
```rb
664+
Rails.application.config.after_initialize do # or to_prepare
665+
SolidQueue::Batch.maintenance_queue_name = "my_batch_queue"
666+
end
667+
```
668+
653669
## Puma plugin
654670

655671
We provide a Puma plugin if you want to run the Solid Queue's supervisor together with Puma and have Puma monitor and manage it. You just need to add

app/jobs/solid_queue/batch/empty_job.rb

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22

33
module SolidQueue
44
class Batch
5-
class EmptyJob < ApplicationJob
6-
queue_as :background
7-
5+
class EmptyJob < (defined?(ApplicationJob) ? ApplicationJob : ActiveJob::Base)
86
def perform
97
# This job does nothing - it just exists to trigger batch completion
108
# The batch completion will be handled by the normal job_finished! flow
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class BatchMonitorJob < (defined?(ApplicationJob) ? ApplicationJob : ActiveJob::Base)
5+
POLLING_INTERVAL = 1.seconds
6+
7+
def perform(batch_id:)
8+
batch = Batch.find_by(batch_id: batch_id)
9+
return unless batch
10+
11+
return if batch.finished?
12+
13+
loop do
14+
batch.reload
15+
16+
break if batch.finished?
17+
18+
if check_completion?(batch)
19+
batch.check_completion!
20+
break if batch.reload.finished?
21+
end
22+
23+
sleep(POLLING_INTERVAL)
24+
end
25+
rescue => e
26+
Rails.logger.error "[SolidQueue] BatchMonitorJob error for batch #{batch_id}: #{e.message}"
27+
# Only re-enqueue on error, with a delay
28+
self.class.set(wait: 30.seconds).perform_later(batch_id: batch_id)
29+
end
30+
31+
private
32+
33+
def check_completion?(batch)
34+
has_incomplete_children = batch.child_batches.where(finished_at: nil).exists?
35+
!has_incomplete_children && batch.pending_jobs <= 0 && batch.total_jobs > 0
36+
end
37+
end
38+
end

app/models/solid_queue/batch.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ class Batch < Record
2121
after_initialize :set_batch_id
2222
before_create :set_parent_batch_id
2323

24+
mattr_accessor :maintenance_queue_name
25+
self.maintenance_queue_name = "default"
26+
2427
def enqueue(&block)
2528
raise "You cannot enqueue a batch that is already finished" if finished?
2629

@@ -29,6 +32,11 @@ def enqueue(&block)
2932
Batch.wrap_in_batch_context(batch_id) do
3033
block.call(self)
3134
end
35+
36+
ActiveRecord.after_all_transactions_commit do
37+
enqueue_empty_job if reload.total_jobs == 0
38+
enqueue_monitor_job
39+
end
3240
end
3341

3442
def on_success=(value)
@@ -121,6 +129,18 @@ def check_parent_completion!
121129
end
122130
end
123131

132+
def enqueue_empty_job
133+
Batch.wrap_in_batch_context(batch_id) do
134+
EmptyJob.set(queue: self.class.maintenance_queue_name || "default").perform_later
135+
end
136+
end
137+
138+
def enqueue_monitor_job
139+
Batch.wrap_in_batch_context(nil) do
140+
BatchMonitorJob.set(queue: self.class.maintenance_queue_name || "default").perform_later(batch_id: batch_id)
141+
end
142+
end
143+
124144
class << self
125145
def enqueue(on_success: nil, on_failure: nil, on_finish: nil, metadata: nil, &block)
126146
new.tap do |batch|

app/models/solid_queue/batch_execution.rb

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,6 @@ def process_job_completion(job, status)
4242
)
4343
end
4444
end
45-
46-
batch = Batch.find_by(batch_id: batch_id)
47-
batch&.check_completion!
4845
end
4946

5047
private

test/integration/batch_lifecycle_test.rb

Lines changed: 23 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,12 @@
55
class BatchLifecycleTest < ActiveSupport::TestCase
66
FailingJobError = Class.new(RuntimeError)
77

8-
def assert_finished_in_order(*finishables)
9-
finishables.each_cons(2) do |finished1, finished2|
10-
assert_equal finished1.finished_at < finished2.finished_at, true
11-
end
12-
end
13-
14-
def job!(active_job)
15-
SolidQueue::Job.find_by!(active_job_id: active_job.job_id)
16-
end
17-
188
setup do
199
@_on_thread_error = SolidQueue.on_thread_error
2010
SolidQueue.on_thread_error = silent_on_thread_error_for([ FailingJobError ], @_on_thread_error)
2111
@worker = SolidQueue::Worker.new(queues: "background", threads: 3)
2212
@dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2)
13+
SolidQueue::Batch.maintenance_queue_name = "background"
2314
end
2415

2516
teardown do
@@ -34,6 +25,7 @@ def job!(active_job)
3425

3526
ApplicationJob.enqueue_after_transaction_commit = false if defined?(ApplicationJob.enqueue_after_transaction_commit)
3627
SolidQueue.preserve_finished_jobs = true
28+
SolidQueue::Batch.maintenance_queue_name = nil
3729
end
3830

3931
class BatchOnSuccessJob < ApplicationJob
@@ -86,40 +78,12 @@ def perform
8678
end
8779
end
8880

89-
test "empty batches never finish" do
90-
# `enqueue_after_transaction_commit` makes it difficult to tell if a batch is empty, or if the
91-
# jobs are waiting to be run after commit.
92-
# If we could tell deterministically, we could enqueue an EmptyJob to make sure the batches
93-
# don't hang forever.
94-
SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("3")) do
95-
SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("2")) do
96-
SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("1")) { }
97-
SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("1.1")) { }
98-
end
99-
end
100-
101-
@dispatcher.start
102-
@worker.start
103-
104-
wait_for_batches_to_finish_for(2.seconds)
105-
wait_for_jobs_to_finish_for(1.second)
106-
107-
assert_equal [], JobBuffer.values
108-
assert_equal 4, SolidQueue::Batch.pending.count
109-
end
110-
11181
test "nested batches finish from the inside out" do
11282
batch2 = batch3 = batch4 = nil
11383
batch1 = SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("3")) do
114-
SolidQueue::Batch::EmptyJob.perform_later
11584
batch2 = SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("2")) do
116-
SolidQueue::Batch::EmptyJob.perform_later
117-
batch3 = SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("1")) do
118-
SolidQueue::Batch::EmptyJob.perform_later
119-
end
120-
batch4 = SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("1.1")) do
121-
SolidQueue::Batch::EmptyJob.perform_later
122-
end
85+
batch3 = SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("1")) { }
86+
batch4 = SolidQueue::Batch.enqueue(on_success: BatchOnSuccessJob.new("1.1")) { }
12387
end
12488
end
12589

@@ -188,10 +152,11 @@ def perform
188152

189153
wait_for_batches_to_finish_for(2.seconds)
190154

155+
jobs = batch_jobs(batch1, batch2, batch3)
191156
assert_equal [ "hey", "ho", "let's go" ], JobBuffer.values.sort
192157
assert_equal 3, SolidQueue::Batch.finished.count
193-
assert_equal 3, SolidQueue::Job.finished.count
194-
assert_equal 3, SolidQueue::Job.count
158+
assert_equal 3, jobs.finished.count
159+
assert_equal 3, jobs.count
195160
assert_finished_in_order(batch3.reload, batch2.reload, batch1.reload)
196161
assert_finished_in_order(job!(job3), batch3)
197162
assert_finished_in_order(job!(job2), batch2)
@@ -301,8 +266,8 @@ def perform
301266
end
302267

303268
assert_equal false, batch1.reload.finished?
304-
assert_equal 1, SolidQueue::Job.count
305-
assert_equal 0, SolidQueue::Job.finished.count
269+
assert_equal 1, batch1.jobs.count
270+
assert_equal 0, batch1.jobs.finished.count
306271

307272
@dispatcher.start
308273
@worker.start
@@ -360,4 +325,18 @@ def perform(batch)
360325
JobBuffer.add "Hi failure #{batch.batch_id}!"
361326
end
362327
end
328+
329+
def assert_finished_in_order(*finishables)
330+
finishables.each_cons(2) do |finished1, finished2|
331+
assert_equal finished1.finished_at < finished2.finished_at, true
332+
end
333+
end
334+
335+
def job!(active_job)
336+
SolidQueue::Job.find_by!(active_job_id: active_job.job_id)
337+
end
338+
339+
def batch_jobs(*batches)
340+
SolidQueue::Job.where(batch_id: batches.map(&:batch_id))
341+
end
363342
end

0 commit comments

Comments
 (0)