Skip to content

Commit 8398293

Browse files
committed
Release next blocked job when discarding a job that holds a concurrency lock
And make sure we lock the ready execution in this case, to avoid the job being claimed while we're discarding it.
1 parent 8ddb028 commit 8398293

File tree

4 files changed

+88
-11
lines changed

4 files changed

+88
-11
lines changed

app/models/solid_queue/job/concurrency_controls.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ def concurrency_limited?
2121
concurrency_key.present?
2222
end
2323

24+
def blocked?
25+
blocked_execution.present?
26+
end
27+
2428
private
2529
def acquire_concurrency_lock
2630
return true unless concurrency_limited?

app/models/solid_queue/job/executable.rb

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ def finished?
9090
end
9191

9292
def discard
93-
destroy unless claimed?
93+
unless claimed?
94+
try_to_discard_while_ready || destroy
95+
end
9496
end
9597

9698
def retry
@@ -106,6 +108,14 @@ def ready
106108
ReadyExecution.create_or_find_by!(job_id: id)
107109
end
108110

111+
def try_to_discard_while_ready
112+
# Prevent the job from being polled while being discarded
113+
ready_execution&.with_lock do
114+
unblock_next_blocked_job
115+
destroy
116+
end
117+
end
118+
109119

110120
def preserve_finished_jobs?
111121
SolidQueue.preserve_finished_jobs

test/integration/concurrency_controls_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
168168
end
169169

170170
test "don't block claimed executions that get released" do
171-
SequentialUpdateResultJob.perform_later(@result, name: name, pause: SolidQueue.shutdown_timeout + 3.seconds)
171+
SequentialUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 3.seconds)
172172
job = SolidQueue::Job.last
173173

174174
sleep(0.2)

test/models/solid_queue/job_test.rb

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
116116
NonOverlappingGroupedJob2.new(@result)
117117
]
118118

119-
assert_multi(ready: 5, scheduled: 2, blocked: 2) do
119+
assert_job_counts(ready: 5, scheduled: 2, blocked: 2) do
120120
ActiveJob.perform_all_later(active_jobs)
121121
end
122122

@@ -138,6 +138,73 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
138138
assert blocked_execution.expires_at <= SolidQueue.default_concurrency_control_period.from_now
139139
end
140140

141+
test "discard ready job" do
142+
AddToBufferJob.perform_later(1)
143+
job = SolidQueue::Job.last
144+
145+
assert_job_counts ready: -1 do
146+
job.discard
147+
end
148+
end
149+
150+
test "discard blocked job" do
151+
NonOverlappingJob.perform_later(@result, name: "ready")
152+
NonOverlappingJob.perform_later(@result, name: "blocked")
153+
ready_job, blocked_job = SolidQueue::Job.last(2)
154+
semaphore = SolidQueue::Semaphore.last
155+
156+
travel_to 10.minutes.from_now
157+
158+
assert_no_changes -> { semaphore.value }, -> { semaphore.expires_at } do
159+
assert_job_counts blocked: -1 do
160+
blocked_job.discard
161+
end
162+
end
163+
end
164+
165+
test "try to discard claimed job" do
166+
StoreResultJob.perform_later(42, pause: 2.seconds)
167+
job = SolidQueue::Job.last
168+
169+
worker = SolidQueue::Worker.new(queues: "background").tap(&:start)
170+
sleep(0.2)
171+
172+
assert_no_difference -> { SolidQueue::Job.count }, -> { SolidQueue::ClaimedExecution.count } do
173+
job.discard
174+
end
175+
176+
worker.stop
177+
end
178+
179+
test "discard scheduled job" do
180+
AddToBufferJob.set(wait: 5.minutes).perform_later
181+
job = SolidQueue::Job.last
182+
183+
assert_job_counts scheduled: -1 do
184+
job.discard
185+
end
186+
end
187+
188+
test "release blocked locks when discarding a ready job" do
189+
NonOverlappingJob.perform_later(@result, name: "ready")
190+
NonOverlappingJob.perform_later(@result, name: "blocked")
191+
ready_job, blocked_job = SolidQueue::Job.last(2)
192+
semaphore = SolidQueue::Semaphore.last
193+
194+
assert ready_job.ready?
195+
assert blocked_job.blocked?
196+
197+
travel_to 10.minutes.from_now
198+
199+
assert_changes -> { semaphore.reload.expires_at } do
200+
assert_job_counts blocked: -1 do
201+
ready_job.discard
202+
end
203+
end
204+
205+
assert blocked_job.reload.ready?
206+
end
207+
141208
if ENV["SEPARATE_CONNECTION"] && ENV["TARGET_DB"] != "sqlite"
142209
test "uses a different connection and transaction than the one in use when connects_to is specified" do
143210
assert_difference -> { SolidQueue::Job.count } do
@@ -157,22 +224,18 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
157224

158225
private
159226
def assert_ready(&block)
160-
assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::ReadyExecution.count } => +1, &block
227+
assert_job_counts(ready: 1, &block)
161228
end
162229

163230
def assert_scheduled(&block)
164-
assert_no_difference -> { SolidQueue::ReadyExecution.count } do
165-
assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::ScheduledExecution.count } => +1, &block
166-
end
231+
assert_job_counts(scheduled: 1, &block)
167232
end
168233

169234
def assert_blocked(&block)
170-
assert_no_difference -> { SolidQueue::ReadyExecution.count } do
171-
assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::BlockedExecution.count } => +1, &block
172-
end
235+
assert_job_counts(blocked: 1, &block)
173236
end
174237

175-
def assert_multi(ready: 0, scheduled: 0, blocked: 0, &block)
238+
def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, &block)
176239
assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked) do
177240
assert_difference -> { SolidQueue::ReadyExecution.count }, +ready do
178241
assert_difference -> { SolidQueue::ScheduledExecution.count }, +scheduled do

0 commit comments

Comments
 (0)