Skip to content

Commit e12997a

Browse files
committed
Implement discard_all, from executions
When discarding ready executions that are concurrency-limited, we need to unblock the next jobs. Since we're aiming for performance here, let's just signal all applicable semaphores and let the concurrency maintenance task take care of blocked executions.
1 parent f7d62ec commit e12997a

File tree

10 files changed

+79
-10
lines changed

10 files changed

+79
-10
lines changed

app/models/solid_queue/claimed_execution.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ def claiming(job_ids, process_id, &block)
2323
def release_all
2424
includes(:job).each(&:release)
2525
end
26+
27+
def discard_all(*)
28+
raise UndiscardableError, "Can't discard jobs in progress"
29+
end
2630
end
2731

2832
def perform

app/models/solid_queue/execution.rb

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,24 @@ def execution_data_from_jobs(jobs)
2222
attributes_from_job(job).merge(job_id: job.id)
2323
end
2424
end
25+
26+
def discard_all_from_jobs(jobs)
27+
transaction do
28+
job_ids = lock_all_from_jobs(jobs)
29+
30+
discard_jobs job_ids
31+
where(job_id: job_ids).delete_all
32+
end
33+
end
34+
35+
private
36+
def lock_all_from_jobs(jobs)
37+
where(job_id: jobs.map(&:id)).order(:job_id).lock.pluck(:job_id)
38+
end
39+
40+
def discard_jobs(job_ids)
41+
Job.where(id: job_ids).delete_all
42+
end
2543
end
2644

2745
def discard

app/models/solid_queue/execution/dispatching.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ module Dispatching
66
extend ActiveSupport::Concern
77

88
class_methods do
9-
def dispatch_batch(job_ids)
9+
def dispatch_jobs(job_ids)
1010
jobs = Job.where(id: job_ids)
1111

1212
Job.dispatch_all(jobs).map(&:id).tap do |dispatched_job_ids|
1313
where(job_id: dispatched_job_ids).delete_all
14-
SolidQueue.logger.info("[SolidQueue] Dispatched scheduled batch with #{dispatched_job_ids.size} jobs")
14+
SolidQueue.logger.info("[SolidQueue] Dispatched #{dispatched_job_ids.size} jobs")
1515
end
1616
end
1717
end

app/models/solid_queue/failed_execution.rb

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,9 @@ class FailedExecution < Execution
1010

1111
attr_accessor :exception
1212

13-
class << self
14-
def retry_all(jobs)
15-
transaction do
16-
retriable_job_ids = where(job_id: jobs.map(&:id)).order(:job_id).lock.pluck(:job_id)
17-
dispatch_batch(retriable_job_ids)
18-
end
13+
def self.retry_all(jobs)
14+
transaction do
15+
dispatch_jobs lock_all_from_jobs(jobs)
1916
end
2017
end
2118

app/models/solid_queue/job/concurrency_controls.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ module ConcurrencyControls
1313
before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? }
1414
end
1515

16+
class_methods do
17+
def release_all_concurrency_locks(jobs)
18+
Semaphore.signal_all(jobs.select(&:concurrency_limited?))
19+
end
20+
end
21+
1622
def unblock_next_blocked_job
1723
if release_concurrency_lock
1824
release_next_blocked_job

app/models/solid_queue/job/executable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def retry
9494
end
9595

9696
def discard
97-
execution.discard
97+
execution&.discard
9898
end
9999

100100
def failed_with(exception)

app/models/solid_queue/ready_execution.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ def lock_candidates(job_ids, process_id)
4040
where(job_id: claimed.pluck(:job_id)).delete_all
4141
end
4242
end
43+
44+
45+
def discard_jobs(job_ids)
46+
Job.release_all_concurrency_locks Job.where(id: job_ids)
47+
super
48+
end
4349
end
4450
end
4551
end

app/models/solid_queue/scheduled_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def dispatch_next_batch(batch_size)
1616
job_ids = next_batch(batch_size).non_blocking_lock.pluck(:job_id)
1717
if job_ids.empty? then []
1818
else
19-
dispatch_batch(job_ids)
19+
dispatch_jobs(job_ids)
2020
end
2121
end
2222
end

app/models/solid_queue/semaphore.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,17 @@ def wait(job)
1313
def signal(job)
1414
Proxy.new(job).signal
1515
end
16+
17+
def signal_all(jobs)
18+
Proxy.signal_all(jobs)
19+
end
1620
end
1721

1822
class Proxy
23+
def self.signal_all(jobs)
24+
Semaphore.where(key: jobs.map(&:concurrency_key)).update_all("value = value + 1")
25+
end
26+
1927
def initialize(job)
2028
@job = job
2129
@retries = 0

test/models/solid_queue/job_test.rb

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,36 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
207207
assert blocked_job.reload.ready?
208208
end
209209

210+
test "discard jobs by execution type in bulk" do
211+
active_jobs = [
212+
AddToBufferJob.new(2),
213+
AddToBufferJob.new(6).set(wait: 2.minutes),
214+
NonOverlappingJob.new(@result),
215+
StoreResultJob.new(42),
216+
AddToBufferJob.new(4),
217+
NonOverlappingGroupedJob1.new(@result),
218+
AddToBufferJob.new(6).set(wait: 3.minutes),
219+
NonOverlappingJob.new(@result),
220+
NonOverlappingGroupedJob2.new(@result)
221+
]
222+
223+
assert_job_counts(ready: 5, scheduled: 2, blocked: 2) do
224+
ActiveJob.perform_all_later(active_jobs)
225+
end
226+
227+
assert_job_counts(ready: -5) do
228+
SolidQueue::ReadyExecution.discard_all_from_jobs(SolidQueue::Job.all)
229+
end
230+
231+
assert_job_counts(scheduled: -2) do
232+
SolidQueue::ScheduledExecution.discard_all_from_jobs(SolidQueue::Job.all)
233+
end
234+
235+
assert_job_counts(blocked: -2) do
236+
SolidQueue::BlockedExecution.discard_all_from_jobs(SolidQueue::Job.all)
237+
end
238+
end
239+
210240
if ENV["SEPARATE_CONNECTION"] && ENV["TARGET_DB"] != "sqlite"
211241
test "uses a different connection and transaction than the one in use when connects_to is specified" do
212242
assert_difference -> { SolidQueue::Job.count } do

0 commit comments

Comments
 (0)