Skip to content

Commit 3789fe2

Browse files
committed
Merge branch 'main' of github.com:rails/solid_queue into deprecated-retries-option
2 parents 12b4541 + 0ba1b7e commit 3789fe2

File tree

7 files changed

+51
-70
lines changed

7 files changed

+51
-70
lines changed

test/dummy/app/jobs/infinite_recursion_job.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ class InfiniteRecursionJob < ApplicationJob
33

44
def perform
55
start
6+
rescue SystemStackError => e
7+
raise ExpectedTestError, "stack level too deep", e.backtrace
68
end
79

810
private
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
class SequentialUpdateResultJob < UpdateResultJob
1+
class NonOverlappingUpdateResultJob < UpdateResultJob
22
limits_concurrency key: ->(job_result, **) { job_result }
33
end

test/dummy/app/jobs/update_result_job.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
class UpdateResultJob < ApplicationJob
22
def perform(job_result, name:, pause: nil, exception: nil)
3+
job_result.status += " + " unless job_result.status.blank?
34
job_result.status += "s#{name}"
45

56
sleep(pause) if pause

test/integration/concurrency_controls_test.rb

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
66
self.use_transactional_tests = false
77

88
setup do
9-
@result = JobResult.create!(queue_name: "default", status: "seq: ")
9+
@result = JobResult.create!(queue_name: "default", status: "")
1010

1111
default_worker = { queues: "default", polling_interval: 0.1, processes: 3, threads: 2 }
1212
dispatcher = { polling_interval: 0.1, batch_size: 200, concurrency_maintenance_interval: 1 }
@@ -20,13 +20,13 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
2020
terminate_process(@pid) if process_exists?(@pid)
2121
end
2222

23-
test "run several conflicting jobs over the same record sequentially" do
23+
test "run several conflicting jobs over the same record without overlapping" do
2424
("A".."F").each do |name|
25-
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds)
25+
NonOverlappingUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds)
2626
end
2727

2828
("G".."K").each do |name|
29-
SequentialUpdateResultJob.perform_later(@result, name: name)
29+
NonOverlappingUpdateResultJob.perform_later(@result, name: name)
3030
end
3131

3232
wait_for_jobs_to_finish_for(5.seconds)
@@ -39,11 +39,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
3939
UpdateResultJob.set(wait: 0.23.seconds).perform_later(@result, name: "000", pause: 0.1.seconds)
4040

4141
("A".."F").each_with_index do |name, i|
42-
SequentialUpdateResultJob.set(wait: (0.2 + i * 0.01).seconds).perform_later(@result, name: name, pause: 0.3.seconds)
42+
NonOverlappingUpdateResultJob.set(wait: (0.2 + i * 0.01).seconds).perform_later(@result, name: name, pause: 0.3.seconds)
4343
end
4444

4545
("G".."K").each_with_index do |name, i|
46-
SequentialUpdateResultJob.set(wait: (0.3 + i * 0.01).seconds).perform_later(@result, name: name)
46+
NonOverlappingUpdateResultJob.set(wait: (0.3 + i * 0.01).seconds).perform_later(@result, name: name)
4747
end
4848

4949
wait_for_jobs_to_finish_for(5.seconds)
@@ -85,11 +85,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
8585
test "run several jobs over the same record sequentially, with some of them failing" do
8686
("A".."F").each_with_index do |name, i|
8787
# A, C, E will fail, for i= 0, 2, 4
88-
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?))
88+
NonOverlappingUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?))
8989
end
9090

9191
("G".."K").each do |name|
92-
SequentialUpdateResultJob.perform_later(@result, name: name)
92+
NonOverlappingUpdateResultJob.perform_later(@result, name: name)
9393
end
9494

9595
wait_for_jobs_to_finish_for(5.seconds)
@@ -100,7 +100,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
100100

101101
test "rely on dispatcher to unblock blocked executions with an available semaphore" do
102102
# Simulate a scenario where we got an available semaphore and some stuck jobs
103-
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
103+
job = NonOverlappingUpdateResultJob.perform_later(@result, name: "A")
104104

105105
wait_for_jobs_to_finish_for(5.seconds)
106106
assert_no_unfinished_jobs
@@ -114,7 +114,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
114114
# Now enqueue more jobs under that same key. They'll be all locked
115115
assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do
116116
("B".."K").each do |name|
117-
SequentialUpdateResultJob.perform_later(@result, name: name)
117+
NonOverlappingUpdateResultJob.perform_later(@result, name: name)
118118
end
119119
end
120120

@@ -127,14 +127,12 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
127127
wait_for_jobs_to_finish_for(5.seconds)
128128
assert_no_unfinished_jobs
129129

130-
# We can't ensure the order between B and C, because it depends on which worker wins when
131-
# unblocking, as one will try to unblock B and another C
132-
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
130+
assert_stored_sequence @result, ("A".."K").to_a
133131
end
134132

135133
test "rely on dispatcher to unblock blocked executions with an expired semaphore" do
136134
# Simulate a scenario where we got an available semaphore and some stuck jobs
137-
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
135+
job = NonOverlappingUpdateResultJob.perform_later(@result, name: "A")
138136
wait_for_jobs_to_finish_for(5.seconds)
139137
assert_no_unfinished_jobs
140138

@@ -147,7 +145,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
147145
# Now enqueue more jobs under that same key. They'll be all locked
148146
assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do
149147
("B".."K").each do |name|
150-
SequentialUpdateResultJob.perform_later(@result, name: name)
148+
NonOverlappingUpdateResultJob.perform_later(@result, name: name)
151149
end
152150
end
153151

@@ -159,13 +157,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
159157
wait_for_jobs_to_finish_for(5.seconds)
160158
assert_no_unfinished_jobs
161159

162-
# We can't ensure the order between B and C, because it depends on which worker wins when
163-
# unblocking, as one will try to unblock B and another C
164-
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
160+
assert_stored_sequence @result, ("A".."K").to_a
165161
end
166162

167163
test "don't block claimed executions that get released" do
168-
SequentialUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds)
164+
NonOverlappingUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds)
169165
job = SolidQueue::Job.last
170166

171167
sleep(0.2)
@@ -184,8 +180,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
184180
skip if Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 2
185181

186182
ActiveRecord::Base.transaction do
187-
SequentialUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds)
188-
SequentialUpdateResultJob.perform_later(@result, name: "B")
183+
NonOverlappingUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds)
184+
NonOverlappingUpdateResultJob.perform_later(@result, name: "B")
189185

190186
begin
191187
assert_equal 2, SolidQueue::Job.count
@@ -219,7 +215,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
219215
end
220216

221217
test "discard on conflict across different concurrency keys" do
222-
another_result = JobResult.create!(queue_name: "default", status: "seq: ")
218+
another_result = JobResult.create!(queue_name: "default", status: "")
223219
DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.2)
224220
DiscardableUpdateResultJob.perform_later(another_result, name: "2", pause: 0.2)
225221
DiscardableUpdateResultJob.perform_later(@result, name: "3") # Should be discarded
@@ -239,6 +235,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
239235
DiscardableUpdateResultJob.perform_later(@result, name: "2")
240236

241237
wait_for_jobs_to_finish_for(5.seconds)
238+
wait_for_semaphores_to_be_released_for(2.seconds)
239+
242240
assert_no_unfinished_jobs
243241

244242
# Enqueue another job that shouldn't be discarded or blocked
@@ -250,10 +248,18 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
250248
end
251249

252250
private
253-
def assert_stored_sequence(result, *sequences)
254-
expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join }
251+
def assert_stored_sequence(result, sequence)
252+
expected = sequence.sort.map { |name| "s#{name}c#{name}" }.join
255253
skip_active_record_query_cache do
256-
assert_includes expected, result.reload.status
254+
assert_equal expected, result.reload.status.split(" + ").sort.join
255+
end
256+
end
257+
258+
def wait_for_semaphores_to_be_released_for(timeout)
259+
wait_while_with_timeout(timeout) do
260+
skip_active_record_query_cache do
261+
SolidQueue::Semaphore.available.invert_where.any?
262+
end
257263
end
258264
end
259265
end

test/integration/instrumentation_test.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ class InstrumentationTest < ActiveSupport::TestCase
234234
5.times { AddToBufferJob.perform_later("A") }
235235
# 1 ready + 3 blocked
236236
result = JobResult.create!
237-
4.times { SequentialUpdateResultJob.perform_later(result, name: "A") }
237+
4.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") }
238238

239239
events = subscribed("discard_all.solid_queue") do
240240
SolidQueue::ReadyExecution.discard_all_from_jobs(SolidQueue::Job.all)
@@ -261,7 +261,7 @@ class InstrumentationTest < ActiveSupport::TestCase
261261
test "unblocking job emits release_blocked event" do
262262
result = JobResult.create!
263263
# 1 ready, 2 blocked
264-
3.times { SequentialUpdateResultJob.perform_later(result, name: "A") }
264+
3.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") }
265265

266266
# Simulate expiry of the concurrency locks
267267
travel_to 3.days.from_now
@@ -283,11 +283,11 @@ class InstrumentationTest < ActiveSupport::TestCase
283283
test "unblocking jobs in bulk emits release_many_blocked event" do
284284
result = JobResult.create!
285285
# 1 ready, 3 blocked
286-
4.times { SequentialUpdateResultJob.perform_later(result, name: "A") }
286+
4.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") }
287287

288288
# 1 ready, 2 blocked
289289
result = JobResult.create!
290-
3.times { SequentialUpdateResultJob.perform_later(result, name: "B") }
290+
3.times { NonOverlappingUpdateResultJob.perform_later(result, name: "B") }
291291

292292
# Simulate expiry of the concurrency locks
293293
travel_to 3.days.from_now
@@ -309,7 +309,7 @@ class InstrumentationTest < ActiveSupport::TestCase
309309

310310
events = subscribed("enqueue_recurring_task.solid_queue") do
311311
scheduler.start
312-
sleep 1.01
312+
wait_while_with_timeout(1.1.second) { SolidQueue::RecurringExecution.none? }
313313
scheduler.stop
314314
end
315315

test/models/solid_queue/failed_execution_test.rb

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase
1515
end
1616

1717
test "run job that fails with a SystemStackError (stack level too deep)" do
18-
silence_on_thread_error_for(SystemStackError) do
19-
InfiniteRecursionJob.perform_later
20-
@worker.start
18+
InfiniteRecursionJob.perform_later
19+
@worker.start
2120

22-
assert_equal 1, SolidQueue::FailedExecution.count
23-
assert SolidQueue::Job.last.failed?
24-
end
21+
assert_equal 1, SolidQueue::FailedExecution.count
22+
assert SolidQueue::Job.last.failed?
23+
assert_equal "stack level too deep", SolidQueue::FailedExecution.last.message
2524
end
2625

2726
test "retry failed job" do

test/unit/process_recovery_test.rb

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ class ProcessRecoveryTest < ActiveSupport::TestCase
2626
worker_process = SolidQueue::Process.find_by(kind: "Worker")
2727
assert worker_process
2828

29-
# Enqueue a job and manually claim it for the worker to avoid timing races
30-
job = enqueue_store_result_job(42)
31-
claimed_execution = SolidQueue::ReadyExecution.claim("*", 5, worker_process.id).first
29+
# Enqueue a job and wait for it to be claimed
30+
StoreResultJob.perform_later(42, pause: 10.seconds)
31+
wait_while_with_timeout(3.seconds) { SolidQueue::ClaimedExecution.none? }
32+
33+
claimed_execution = SolidQueue::ClaimedExecution.last
3234
assert claimed_execution.present?
3335
assert_equal worker_process.id, claimed_execution.process_id
3436

@@ -40,9 +42,8 @@ class ProcessRecoveryTest < ActiveSupport::TestCase
4042
worker_pid = worker_process.pid
4143
terminate_process(worker_pid, signal: :KILL)
4244

43-
4445
# Wait for the supervisor to reap the worker and fail the job
45-
wait_for_failed_executions(1, timeout: 5.seconds)
46+
wait_while_with_timeout(3.seconds) { SolidQueue::FailedExecution.none? }
4647

4748
# Assert the execution is failed
4849
failed_execution = SolidQueue::FailedExecution.last
@@ -53,32 +54,4 @@ class ProcessRecoveryTest < ActiveSupport::TestCase
5354
wait_for_registered_processes(2, timeout: 5.seconds)
5455
assert_operator SolidQueue::Process.where(kind: "Worker").count, :>=, 1
5556
end
56-
57-
private
58-
def assert_registered_workers_for(*queues, supervisor_pid: nil)
59-
workers = find_processes_registered_as("Worker")
60-
registered_queues = workers.map { |process| process.metadata["queues"] }.compact
61-
assert_equal queues.map(&:to_s).sort, registered_queues.sort
62-
if supervisor_pid
63-
assert_equal [ supervisor_pid ], workers.map { |process| process.supervisor.pid }.uniq
64-
end
65-
end
66-
67-
def enqueue_store_result_job(value, queue_name = :default, **options)
68-
StoreResultJob.set(queue: queue_name).perform_later(value, **options)
69-
end
70-
71-
def assert_no_claimed_jobs
72-
skip_active_record_query_cache do
73-
assert_empty SolidQueue::ClaimedExecution.all
74-
end
75-
end
76-
77-
def wait_for_claimed_executions(count, timeout: 1.second)
78-
wait_for(timeout: timeout) { SolidQueue::ClaimedExecution.count == count }
79-
end
80-
81-
def wait_for_failed_executions(count, timeout: 1.second)
82-
wait_for(timeout: timeout) { SolidQueue::FailedExecution.count == count }
83-
end
8457
end

0 commit comments

Comments
 (0)