Skip to content

Commit 52b3d7b

Browse files
committed
Prevent blocked jobs from being released while job is still executing
When a job runs longer than its concurrency_duration, the semaphore expires and gets deleted by the dispatcher's concurrency maintenance. Previously, BlockedExecution.releasable would consider a concurrency key releasable when its semaphore was missing, causing blocked jobs to be released and run concurrently with the still-executing job. This violated the concurrency guarantee: with limits_concurrency set to 1, two jobs with the same concurrency key could run simultaneously if the first job exceeded its concurrency_duration. Fix: Check for claimed executions before marking a concurrency key as releasable. A key is only releasable when: - Its semaphore is missing OR has available slots, AND - No jobs with that key are currently being executed (claimed) This ensures concurrency limits are respected even when jobs exceed their configured duration.
1 parent 839ba64 commit 52b3d7b

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

app/models/solid_queue/blocked_execution.rb

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,17 @@ def release_one(concurrency_key)
3636
def releasable(concurrency_keys)
3737
semaphores = Semaphore.where(key: concurrency_keys).pluck(:key, :value).to_h
3838

39+
# Concurrency keys that have jobs currently being executed should not be released,
40+
# even if their semaphore expired. This prevents duplicate job execution when
41+
# jobs run longer than their concurrency_duration.
42+
executing_keys = Job.joins(:claimed_execution)
43+
.where(concurrency_key: concurrency_keys)
44+
.distinct
45+
.pluck(:concurrency_key)
46+
3947
# Concurrency keys without semaphore + concurrency keys with open semaphore
40-
(concurrency_keys - semaphores.keys) | semaphores.select { |_key, value| value > 0 }.keys
48+
# MINUS keys that have jobs currently executing
49+
((concurrency_keys - semaphores.keys) | semaphores.select { |_key, value| value > 0 }.keys) - executing_keys
4150
end
4251
end
4352

test/integration/concurrency_controls_test.rb

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,49 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
162162
assert_stored_sequence @result, ("A".."K").to_a
163163
end
164164

165+
test "don't unblock jobs when semaphore expires but job is still executing" do
166+
# This tests a race condition where a job runs longer than concurrency_duration,
167+
# causing the semaphore to expire while the job is still running. Without the fix,
168+
# blocked jobs would be released, causing duplicate concurrent executions.
169+
170+
# Start a long-running job
171+
NonOverlappingUpdateResultJob.perform_later(@result, name: "A", pause: 3.seconds)
172+
173+
# Wait for it to be claimed
174+
wait_while_with_timeout(2.seconds) { SolidQueue::ClaimedExecution.count == 0 }
175+
first_job = SolidQueue::Job.last
176+
assert first_job.claimed?, "First job should be claimed"
177+
178+
# Enqueue more jobs with the same concurrency key - they'll be blocked
179+
assert_difference -> { SolidQueue::BlockedExecution.count }, +3 do
180+
("B".."D").each do |name|
181+
NonOverlappingUpdateResultJob.perform_later(@result, name: name)
182+
end
183+
end
184+
185+
# Simulate the semaphore and blocked executions expiring while job A is still running
186+
skip_active_record_query_cache do
187+
SolidQueue::Semaphore.where(key: first_job.concurrency_key).update_all(expires_at: 15.minutes.ago)
188+
SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago)
189+
end
190+
191+
# Give the dispatcher time to run concurrency maintenance
192+
sleep(1.5)
193+
194+
# The blocked jobs should NOT have been released because job A is still executing
195+
skip_active_record_query_cache do
196+
assert first_job.reload.claimed?, "First job should still be claimed"
197+
assert_equal 3, SolidQueue::BlockedExecution.count, "Blocked jobs should not be released while job is executing"
198+
end
199+
200+
# Now wait for everything to complete normally
201+
wait_for_jobs_to_finish_for(10.seconds)
202+
assert_no_unfinished_jobs
203+
204+
# All jobs should have executed in sequence
205+
assert_stored_sequence @result, ("A".."D").to_a
206+
end
207+
165208
test "don't block claimed executions that get released" do
166209
NonOverlappingUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds)
167210
job = SolidQueue::Job.last

0 commit comments

Comments
 (0)