Skip to content

Commit 82bfef9

Browse files
committed
Don't go through the general dispatch flow when releasing claimed executions
That's it, don't try to gain the concurrency lock, because claimed executions with concurrency limits that are released would most likely be holding the semaphore themselves, as it's released after completing. This means these claimed executions would go to blocked upon release, leaving the semaphore busy. Just assume that if a job has a claimed execution, it's because it already gained the lock when going to ready.
1 parent 3c70451 commit 82bfef9

File tree

4 files changed

+24
-3
lines changed

4 files changed

+24
-3
lines changed

app/models/solid_queue/claimed_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def perform
3939

4040
def release
4141
transaction do
42-
job.prepare_for_execution
42+
job.dispatch_bypassing_concurrency_limits
4343
destroy!
4444
end
4545
end

app/models/solid_queue/job/executable.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ def dispatch
7373
end
7474
end
7575

76+
def dispatch_bypassing_concurrency_limits
77+
ready
78+
end
79+
7680
def finished!
7781
if preserve_finished_jobs?
7882
touch(:finished_at)

test/dummy/config/environments/test.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,6 @@
5151
logger = ActiveSupport::Logger.new(STDOUT)
5252
config.solid_queue.on_thread_error = ->(exception) { logger.error("#{exception.class.name}: #{exception.message}\n#{exception.backtrace.join("\n")}") }
5353
config.solid_queue.logger = ActiveSupport::Logger.new(nil)
54+
55+
config.solid_queue.shutdown_timeout = 2.seconds
5456
end

test/integration/concurrency_controls_test.rb

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
77
setup do
88
@result = JobResult.create!(queue_name: "default", status: "seq: ")
99

10-
default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 }
11-
dispatcher = { polling_interval: 1, batch_size: 200, concurrency_maintenance_interval: 1 }
10+
default_worker = { queues: "default", polling_interval: 0.1, processes: 3, threads: 2 }
11+
dispatcher = { polling_interval: 0.1, batch_size: 200, concurrency_maintenance_interval: 1 }
1212

1313
@pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], dispatchers: [ dispatcher ] })
1414

@@ -167,6 +167,21 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
167167
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
168168
end
169169

170+
test "don't block claimed executions that get released" do
171+
SequentialUpdateResultJob.perform_later(@result, name: name, pause: SolidQueue.shutdown_timeout + 3.seconds)
172+
job = SolidQueue::Job.last
173+
174+
sleep(0.2)
175+
assert job.claimed?
176+
177+
# This won't leave time to the job to finish
178+
signal_process(@pid, :TERM, wait: 0.1.second)
179+
sleep(SolidQueue.shutdown_timeout + 0.2.seconds)
180+
181+
assert_not job.reload.finished?
182+
assert job.reload.ready?
183+
end
184+
170185
private
171186
def assert_stored_sequence(result, *sequences)
172187
expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join }

0 commit comments

Comments
 (0)