Skip to content

Commit 1660bb9

Browse files
committed
Implement discard_all for executions to be used to clear queues
Clearing a queue applies only to ready executions, but it's easy to implement this for all types of executions.
1 parent e12997a commit 1660bb9

File tree

4 files changed

+43
-2
lines changed

4 files changed

+43
-2
lines changed

app/models/solid_queue/claimed_execution.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ def release_all
2424
includes(:job).each(&:release)
2525
end
2626

27-
def discard_all(*)
27+
def discard_all_in_batches(*)
28+
raise UndiscardableError, "Can't discard jobs in progress"
29+
end
30+
31+
def discard_all_from_jobs(*)
2832
raise UndiscardableError, "Can't discard jobs in progress"
2933
end
3034
end

app/models/solid_queue/execution.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,23 @@ def execution_data_from_jobs(jobs)
2323
end
2424
end
2525

26+
def discard_all_in_batches(batch_size: 500)
27+
pending = count
28+
discarded = 0
29+
30+
loop do
31+
transaction do
32+
job_ids = limit(batch_size).order(:job_id).lock.pluck(:job_id)
33+
34+
discard_jobs job_ids
35+
discarded = where(job_id: job_ids).delete_all
36+
pending -= discarded
37+
end
38+
39+
break if pending <= 0 || discarded == 0
40+
end
41+
end
42+
2643
def discard_all_from_jobs(jobs)
2744
transaction do
2845
job_ids = lock_all_from_jobs(jobs)

app/models/solid_queue/queue.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def resume
3333
end
3434

3535
def clear
36-
Job.where(queue_name: name).each(&:discard)
36+
ReadyExecution.queued_as(name).discard_all_in_batches
3737
end
3838

3939
def size

test/models/solid_queue/ready_execution_test.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,26 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
127127
end
128128
end
129129

130+
test "discard all" do
131+
3.times { |i| AddToBufferJob.perform_later(i) }
132+
133+
assert_difference [ -> { SolidQueue::ReadyExecution.count }, -> { SolidQueue::Job.count } ], -8 do
134+
SolidQueue::ReadyExecution.discard_all_in_batches
135+
end
136+
end
137+
138+
test "discard all by queue" do
139+
3.times { |i| AddToBufferJob.perform_later(i) }
140+
141+
assert_difference [ -> { SolidQueue::ReadyExecution.count }, -> { SolidQueue::Job.count } ], -5 do
142+
SolidQueue::ReadyExecution.queued_as(:backend).discard_all_in_batches
143+
end
144+
145+
assert_no_difference [ -> { SolidQueue::ReadyExecution.count }, -> { SolidQueue::Job.count } ] do
146+
SolidQueue::ReadyExecution.queued_as(:backend).discard_all_in_batches
147+
end
148+
end
149+
130150
private
131151
def assert_claimed_jobs(count, &block)
132152
assert_difference -> { SolidQueue::ClaimedExecution.count } => +count, -> { SolidQueue::ReadyExecution.count } => -count do

0 commit comments

Comments
 (0)