Skip to content

Commit e79b97b

Browse files
authored
Merge pull request #125 from basecamp/fix-discard-concurrency-locks
Release next blocked job when discarding a job that holds a concurrency lock
2 parents d608031 + 86c8330 commit e79b97b

File tree

9 files changed

+124
-22
lines changed

9 files changed

+124
-22
lines changed

app/models/solid_queue/claimed_execution.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ def release
4444
end
4545
end
4646

47+
def discard
48+
raise UndiscardableError, "Can't discard a job in progress"
49+
end
50+
4751
private
4852
def execute
4953
ActiveJob::Base.execute(job.arguments)

app/models/solid_queue/execution.rb

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
module SolidQueue
44
class Execution < Record
5+
class UndiscardableError < StandardError; end
6+
57
include JobAttributes
68

79
self.abstract_class = true
@@ -10,8 +12,6 @@ class Execution < Record
1012

1113
belongs_to :job
1214

13-
alias_method :discard, :destroy
14-
1515
class << self
1616
def create_all_from_jobs(jobs)
1717
insert_all execution_data_from_jobs(jobs)
@@ -23,5 +23,12 @@ def execution_data_from_jobs(jobs)
2323
end
2424
end
2525
end
26+
27+
def discard
28+
with_lock do
29+
job.destroy
30+
destroy
31+
end
32+
end
2633
end
2734
end

app/models/solid_queue/job/concurrency_controls.rb

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ module ConcurrencyControls
66
extend ActiveSupport::Concern
77

88
included do
9-
has_one :blocked_execution, dependent: :destroy
9+
has_one :blocked_execution
1010

1111
delegate :concurrency_limit, :concurrency_duration, to: :job_class
12+
13+
before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? }
1214
end
1315

1416
def unblock_next_blocked_job
@@ -21,6 +23,10 @@ def concurrency_limited?
2123
concurrency_key.present?
2224
end
2325

26+
def blocked?
27+
blocked_execution.present?
28+
end
29+
2430
private
2531
def acquire_concurrency_lock
2632
return true unless concurrency_limited?
@@ -45,6 +51,10 @@ def release_next_blocked_job
4551
def job_class
4652
@job_class ||= class_name.safe_constantize
4753
end
54+
55+
def execution
56+
super || blocked_execution
57+
end
4858
end
4959
end
5060
end

app/models/solid_queue/job/executable.rb

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ module Executable
88
included do
99
include Clearable, ConcurrencyControls, Schedulable
1010

11-
has_one :ready_execution, dependent: :destroy
12-
has_one :claimed_execution, dependent: :destroy
13-
has_one :failed_execution, dependent: :destroy
11+
has_one :ready_execution
12+
has_one :claimed_execution
13+
has_one :failed_execution
1414

1515
after_create :prepare_for_execution
1616

@@ -89,14 +89,14 @@ def finished?
8989
finished_at.present?
9090
end
9191

92-
def discard
93-
destroy unless claimed?
94-
end
95-
9692
def retry
9793
failed_execution&.retry
9894
end
9995

96+
def discard
97+
execution.discard
98+
end
99+
100100
def failed_with(exception)
101101
FailedExecution.create_or_find_by!(job_id: id, exception: exception)
102102
end
@@ -106,6 +106,9 @@ def ready
106106
ReadyExecution.create_or_find_by!(job_id: id)
107107
end
108108

109+
def execution
110+
%w[ ready claimed failed ].reduce(nil) { |acc, status| acc || public_send("#{status}_execution") }
111+
end
109112

110113
def preserve_finished_jobs?
111114
SolidQueue.preserve_finished_jobs

app/models/solid_queue/job/schedulable.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ module Schedulable
66
extend ActiveSupport::Concern
77

88
included do
9-
has_one :scheduled_execution, dependent: :destroy
9+
has_one :scheduled_execution
1010

1111
scope :scheduled, -> { where.not(finished_at: nil) }
1212
end
@@ -35,6 +35,10 @@ def due?
3535
def schedule
3636
ScheduledExecution.create_or_find_by!(job_id: id)
3737
end
38+
39+
def execution
40+
super || scheduled_execution
41+
end
3842
end
3943
end
4044
end

app/models/solid_queue/ready_execution.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,12 @@ def lock_candidates(job_ids, process_id)
3737
end
3838
end
3939
end
40+
41+
def discard
42+
with_lock do
43+
job.destroy
44+
destroy
45+
end
46+
end
4047
end
4148
end

bin/setup

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ bundle
99

1010
echo "Creating databases..."
1111

12-
rails db:setup
12+
rails db:reset

test/integration/concurrency_controls_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
168168
end
169169

170170
test "don't block claimed executions that get released" do
171-
SequentialUpdateResultJob.perform_later(@result, name: name, pause: SolidQueue.shutdown_timeout + 3.seconds)
171+
SequentialUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 3.seconds)
172172
job = SolidQueue::Job.last
173173

174174
sleep(0.2)

test/models/solid_queue/job_test.rb

Lines changed: 76 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
116116
NonOverlappingGroupedJob2.new(@result)
117117
]
118118

119-
assert_multi(ready: 5, scheduled: 2, blocked: 2) do
119+
assert_job_counts(ready: 5, scheduled: 2, blocked: 2) do
120120
ActiveJob.perform_all_later(active_jobs)
121121
end
122122

@@ -138,6 +138,75 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
138138
assert blocked_execution.expires_at <= SolidQueue.default_concurrency_control_period.from_now
139139
end
140140

141+
test "discard ready job" do
142+
AddToBufferJob.perform_later(1)
143+
job = SolidQueue::Job.last
144+
145+
assert_job_counts ready: -1 do
146+
job.discard
147+
end
148+
end
149+
150+
test "discard blocked job" do
151+
NonOverlappingJob.perform_later(@result, name: "ready")
152+
NonOverlappingJob.perform_later(@result, name: "blocked")
153+
ready_job, blocked_job = SolidQueue::Job.last(2)
154+
semaphore = SolidQueue::Semaphore.last
155+
156+
travel_to 10.minutes.from_now
157+
158+
assert_no_changes -> { semaphore.value }, -> { semaphore.expires_at } do
159+
assert_job_counts blocked: -1 do
160+
blocked_job.discard
161+
end
162+
end
163+
end
164+
165+
test "try to discard claimed job" do
166+
StoreResultJob.perform_later(42, pause: 2.seconds)
167+
job = SolidQueue::Job.last
168+
169+
worker = SolidQueue::Worker.new(queues: "background").tap(&:start)
170+
sleep(0.2)
171+
172+
assert_no_difference -> { SolidQueue::Job.count }, -> { SolidQueue::ClaimedExecution.count } do
173+
assert_raises SolidQueue::Execution::UndiscardableError do
174+
job.discard
175+
end
176+
end
177+
178+
worker.stop
179+
end
180+
181+
test "discard scheduled job" do
182+
AddToBufferJob.set(wait: 5.minutes).perform_later
183+
job = SolidQueue::Job.last
184+
185+
assert_job_counts scheduled: -1 do
186+
job.discard
187+
end
188+
end
189+
190+
test "release blocked locks when discarding a ready job" do
191+
NonOverlappingJob.perform_later(@result, name: "ready")
192+
NonOverlappingJob.perform_later(@result, name: "blocked")
193+
ready_job, blocked_job = SolidQueue::Job.last(2)
194+
semaphore = SolidQueue::Semaphore.last
195+
196+
assert ready_job.ready?
197+
assert blocked_job.blocked?
198+
199+
travel_to 10.minutes.from_now
200+
201+
assert_changes -> { semaphore.reload.expires_at } do
202+
assert_job_counts blocked: -1 do
203+
ready_job.discard
204+
end
205+
end
206+
207+
assert blocked_job.reload.ready?
208+
end
209+
141210
if ENV["SEPARATE_CONNECTION"] && ENV["TARGET_DB"] != "sqlite"
142211
test "uses a different connection and transaction than the one in use when connects_to is specified" do
143212
assert_difference -> { SolidQueue::Job.count } do
@@ -157,22 +226,20 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
157226

158227
private
159228
def assert_ready(&block)
160-
assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::ReadyExecution.count } => +1, &block
229+
assert_job_counts(ready: 1, &block)
230+
assert SolidQueue::Job.last.ready?
161231
end
162232

163233
def assert_scheduled(&block)
164-
assert_no_difference -> { SolidQueue::ReadyExecution.count } do
165-
assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::ScheduledExecution.count } => +1, &block
166-
end
234+
assert_job_counts(scheduled: 1, &block)
167235
end
168236

169237
def assert_blocked(&block)
170-
assert_no_difference -> { SolidQueue::ReadyExecution.count } do
171-
assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::BlockedExecution.count } => +1, &block
172-
end
238+
assert_job_counts(blocked: 1, &block)
239+
assert SolidQueue::Job.last.blocked?
173240
end
174241

175-
def assert_multi(ready: 0, scheduled: 0, blocked: 0, &block)
242+
def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, &block)
176243
assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked) do
177244
assert_difference -> { SolidQueue::ReadyExecution.count }, +ready do
178245
assert_difference -> { SolidQueue::ScheduledExecution.count }, +scheduled do

0 commit comments

Comments
 (0)