Skip to content

Commit d57b8e2

Browse files
authored
Merge pull request #127 from basecamp/discard-retry-bulk
Implement retry_all and discard_all
2 parents ffa7028 + 1660bb9 commit d57b8e2

16 files changed

+263
-46
lines changed

app/models/solid_queue/claimed_execution.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ def claiming(job_ids, process_id, &block)
2323
def release_all
2424
includes(:job).each(&:release)
2525
end
26+
27+
def discard_all_in_batches(*)
28+
raise UndiscardableError, "Can't discard jobs in progress"
29+
end
30+
31+
def discard_all_from_jobs(*)
32+
raise UndiscardableError, "Can't discard jobs in progress"
33+
end
2634
end
2735

2836
def perform

app/models/solid_queue/execution.rb

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,41 @@ def execution_data_from_jobs(jobs)
2222
attributes_from_job(job).merge(job_id: job.id)
2323
end
2424
end
25+
26+
def discard_all_in_batches(batch_size: 500)
27+
pending = count
28+
discarded = 0
29+
30+
loop do
31+
transaction do
32+
job_ids = limit(batch_size).order(:job_id).lock.pluck(:job_id)
33+
34+
discard_jobs job_ids
35+
discarded = where(job_id: job_ids).delete_all
36+
pending -= discarded
37+
end
38+
39+
break if pending <= 0 || discarded == 0
40+
end
41+
end
42+
43+
def discard_all_from_jobs(jobs)
44+
transaction do
45+
job_ids = lock_all_from_jobs(jobs)
46+
47+
discard_jobs job_ids
48+
where(job_id: job_ids).delete_all
49+
end
50+
end
51+
52+
private
53+
def lock_all_from_jobs(jobs)
54+
where(job_id: jobs.map(&:id)).order(:job_id).lock.pluck(:job_id)
55+
end
56+
57+
def discard_jobs(job_ids)
58+
Job.where(id: job_ids).delete_all
59+
end
2560
end
2661

2762
def discard
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Execution
5+
module Dispatching
6+
extend ActiveSupport::Concern
7+
8+
class_methods do
9+
def dispatch_jobs(job_ids)
10+
jobs = Job.where(id: job_ids)
11+
12+
Job.dispatch_all(jobs).map(&:id).tap do |dispatched_job_ids|
13+
where(job_id: dispatched_job_ids).delete_all
14+
SolidQueue.logger.info("[SolidQueue] Dispatched #{dispatched_job_ids.size} jobs")
15+
end
16+
end
17+
end
18+
end
19+
end
20+
end
Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,37 @@
11
# frozen_string_literal: true
22

3-
class SolidQueue::FailedExecution < SolidQueue::Execution
4-
serialize :error, coder: JSON
3+
module SolidQueue
4+
class FailedExecution < Execution
5+
include Dispatching
56

6-
before_create :expand_error_details_from_exception
7+
serialize :error, coder: JSON
78

8-
attr_accessor :exception
9+
before_create :expand_error_details_from_exception
910

10-
def retry
11-
transaction do
12-
job.prepare_for_execution
13-
destroy!
11+
attr_accessor :exception
12+
13+
def self.retry_all(jobs)
14+
transaction do
15+
dispatch_jobs lock_all_from_jobs(jobs)
16+
end
17+
end
18+
19+
def retry
20+
with_lock do
21+
job.prepare_for_execution
22+
destroy!
23+
end
1424
end
15-
end
1625

17-
%i[ exception_class message backtrace ].each do |attribute|
18-
define_method(attribute) { error.with_indifferent_access[attribute] }
19-
end
26+
%i[ exception_class message backtrace ].each do |attribute|
27+
define_method(attribute) { error.with_indifferent_access[attribute] }
28+
end
2029

21-
private
22-
def expand_error_details_from_exception
23-
if exception
24-
self.error = { exception_class: exception.class.name, message: exception.message, backtrace: exception.backtrace }
30+
private
31+
def expand_error_details_from_exception
32+
if exception
33+
self.error = { exception_class: exception.class.name, message: exception.message, backtrace: exception.backtrace }
34+
end
2535
end
2636
end
2737
end

app/models/solid_queue/job/concurrency_controls.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ module ConcurrencyControls
1313
before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? }
1414
end
1515

16+
class_methods do
17+
def release_all_concurrency_locks(jobs)
18+
Semaphore.signal_all(jobs.select(&:concurrency_limited?))
19+
end
20+
end
21+
1622
def unblock_next_blocked_job
1723
if release_concurrency_lock
1824
release_next_blocked_job

app/models/solid_queue/job/executable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def retry
9494
end
9595

9696
def discard
97-
execution.discard
97+
execution&.discard
9898
end
9999

100100
def failed_with(exception)

app/models/solid_queue/queue.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def resume
3333
end
3434

3535
def clear
36-
Job.where(queue_name: name).each(&:discard)
36+
ReadyExecution.queued_as(name).discard_all_in_batches
3737
end
3838

3939
def size

app/models/solid_queue/ready_execution.rb

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,12 @@ def lock_candidates(job_ids, process_id)
4040
where(job_id: claimed.pluck(:job_id)).delete_all
4141
end
4242
end
43-
end
4443

45-
def discard
46-
with_lock do
47-
job.destroy
48-
destroy
49-
end
44+
45+
def discard_jobs(job_ids)
46+
Job.release_all_concurrency_locks Job.where(id: job_ids)
47+
super
48+
end
5049
end
5150
end
5251
end

app/models/solid_queue/scheduled_execution.rb

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
module SolidQueue
44
class ScheduledExecution < Execution
5+
include Dispatching
6+
57
scope :due, -> { where(scheduled_at: ..Time.current) }
68
scope :ordered, -> { order(scheduled_at: :asc, priority: :asc) }
79
scope :next_batch, ->(batch_size) { due.ordered.limit(batch_size) }
@@ -14,20 +16,10 @@ def dispatch_next_batch(batch_size)
1416
job_ids = next_batch(batch_size).non_blocking_lock.pluck(:job_id)
1517
if job_ids.empty? then []
1618
else
17-
dispatch_batch(job_ids)
19+
dispatch_jobs(job_ids)
1820
end
1921
end
2022
end
21-
22-
private
23-
def dispatch_batch(job_ids)
24-
jobs = Job.where(id: job_ids)
25-
26-
Job.dispatch_all(jobs).map(&:id).tap do |dispatched_job_ids|
27-
where(job_id: dispatched_job_ids).delete_all
28-
SolidQueue.logger.info("[SolidQueue] Dispatched scheduled batch with #{dispatched_job_ids.size} jobs")
29-
end
30-
end
3123
end
3224
end
3325
end

app/models/solid_queue/semaphore.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,17 @@ def wait(job)
1313
def signal(job)
1414
Proxy.new(job).signal
1515
end
16+
17+
def signal_all(jobs)
18+
Proxy.signal_all(jobs)
19+
end
1620
end
1721

1822
class Proxy
23+
def self.signal_all(jobs)
24+
Semaphore.where(key: jobs.map(&:concurrency_key)).update_all("value = value + 1")
25+
end
26+
1927
def initialize(job)
2028
@job = job
2129
@retries = 0

0 commit comments

Comments
 (0)