Skip to content

Commit 74b12c8

Browse files
committed
Make concurrency tests less flaky
Don't try to enforce sequential order for jobs, given that the order is not even guaranteed by Solid Queue. Rename the test job to reflect that. Fix also another test where we waited for a job to finish but not for it to release the semaphore, so it happened most of the times but sometimes it didn't.
1 parent f433a99 commit 74b12c8

File tree

4 files changed

+37
-30
lines changed

4 files changed

+37
-30
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
class SequentialUpdateResultJob < UpdateResultJob
1+
class NonOverlappingUpdateResultJob < UpdateResultJob
22
limits_concurrency key: ->(job_result, **) { job_result }
33
end

test/dummy/app/jobs/update_result_job.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
class UpdateResultJob < ApplicationJob
22
def perform(job_result, name:, pause: nil, exception: nil)
3+
job_result.status += " + " unless job_result.status.blank?
34
job_result.status += "s#{name}"
45

56
sleep(pause) if pause

test/integration/concurrency_controls_test.rb

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
66
self.use_transactional_tests = false
77

88
setup do
9-
@result = JobResult.create!(queue_name: "default", status: "seq: ")
9+
@result = JobResult.create!(queue_name: "default", status: "")
1010

1111
default_worker = { queues: "default", polling_interval: 0.1, processes: 3, threads: 2 }
1212
dispatcher = { polling_interval: 0.1, batch_size: 200, concurrency_maintenance_interval: 1 }
@@ -20,13 +20,13 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
2020
terminate_process(@pid) if process_exists?(@pid)
2121
end
2222

23-
test "run several conflicting jobs over the same record sequentially" do
23+
test "run several conflicting jobs over the same record without overlapping" do
2424
("A".."F").each do |name|
25-
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds)
25+
NonOverlappingUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds)
2626
end
2727

2828
("G".."K").each do |name|
29-
SequentialUpdateResultJob.perform_later(@result, name: name)
29+
NonOverlappingUpdateResultJob.perform_later(@result, name: name)
3030
end
3131

3232
wait_for_jobs_to_finish_for(5.seconds)
@@ -39,11 +39,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
3939
UpdateResultJob.set(wait: 0.23.seconds).perform_later(@result, name: "000", pause: 0.1.seconds)
4040

4141
("A".."F").each_with_index do |name, i|
42-
SequentialUpdateResultJob.set(wait: (0.2 + i * 0.01).seconds).perform_later(@result, name: name, pause: 0.3.seconds)
42+
NonOverlappingUpdateResultJob.set(wait: (0.2 + i * 0.01).seconds).perform_later(@result, name: name, pause: 0.3.seconds)
4343
end
4444

4545
("G".."K").each_with_index do |name, i|
46-
SequentialUpdateResultJob.set(wait: (0.3 + i * 0.01).seconds).perform_later(@result, name: name)
46+
NonOverlappingUpdateResultJob.set(wait: (0.3 + i * 0.01).seconds).perform_later(@result, name: name)
4747
end
4848

4949
wait_for_jobs_to_finish_for(5.seconds)
@@ -85,11 +85,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
8585
test "run several jobs over the same record sequentially, with some of them failing" do
8686
("A".."F").each_with_index do |name, i|
8787
# A, C, E will fail, for i= 0, 2, 4
88-
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?))
88+
NonOverlappingUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?))
8989
end
9090

9191
("G".."K").each do |name|
92-
SequentialUpdateResultJob.perform_later(@result, name: name)
92+
NonOverlappingUpdateResultJob.perform_later(@result, name: name)
9393
end
9494

9595
wait_for_jobs_to_finish_for(5.seconds)
@@ -100,7 +100,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
100100

101101
test "rely on dispatcher to unblock blocked executions with an available semaphore" do
102102
# Simulate a scenario where we got an available semaphore and some stuck jobs
103-
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
103+
job = NonOverlappingUpdateResultJob.perform_later(@result, name: "A")
104104

105105
wait_for_jobs_to_finish_for(5.seconds)
106106
assert_no_unfinished_jobs
@@ -114,7 +114,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
114114
# Now enqueue more jobs under that same key. They'll be all locked
115115
assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do
116116
("B".."K").each do |name|
117-
SequentialUpdateResultJob.perform_later(@result, name: name)
117+
NonOverlappingUpdateResultJob.perform_later(@result, name: name)
118118
end
119119
end
120120

@@ -127,14 +127,12 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
127127
wait_for_jobs_to_finish_for(5.seconds)
128128
assert_no_unfinished_jobs
129129

130-
# We can't ensure the order between B and C, because it depends on which worker wins when
131-
# unblocking, as one will try to unblock B and another C
132-
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
130+
assert_stored_sequence @result, ("A".."K").to_a
133131
end
134132

135133
test "rely on dispatcher to unblock blocked executions with an expired semaphore" do
136134
# Simulate a scenario where we got an available semaphore and some stuck jobs
137-
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
135+
job = NonOverlappingUpdateResultJob.perform_later(@result, name: "A")
138136
wait_for_jobs_to_finish_for(5.seconds)
139137
assert_no_unfinished_jobs
140138

@@ -147,7 +145,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
147145
# Now enqueue more jobs under that same key. They'll be all locked
148146
assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do
149147
("B".."K").each do |name|
150-
SequentialUpdateResultJob.perform_later(@result, name: name)
148+
NonOverlappingUpdateResultJob.perform_later(@result, name: name)
151149
end
152150
end
153151

@@ -159,13 +157,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
159157
wait_for_jobs_to_finish_for(5.seconds)
160158
assert_no_unfinished_jobs
161159

162-
# We can't ensure the order between B and C, because it depends on which worker wins when
163-
# unblocking, as one will try to unblock B and another C
164-
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
160+
assert_stored_sequence @result, ("A".."K").to_a
165161
end
166162

167163
test "don't block claimed executions that get released" do
168-
SequentialUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds)
164+
NonOverlappingUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds)
169165
job = SolidQueue::Job.last
170166

171167
sleep(0.2)
@@ -184,8 +180,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
184180
skip if Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 2
185181

186182
ActiveRecord::Base.transaction do
187-
SequentialUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds)
188-
SequentialUpdateResultJob.perform_later(@result, name: "B")
183+
NonOverlappingUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds)
184+
NonOverlappingUpdateResultJob.perform_later(@result, name: "B")
189185

190186
begin
191187
assert_equal 2, SolidQueue::Job.count
@@ -219,7 +215,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
219215
end
220216

221217
test "discard on conflict across different concurrency keys" do
222-
another_result = JobResult.create!(queue_name: "default", status: "seq: ")
218+
another_result = JobResult.create!(queue_name: "default", status: "")
223219
DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.2)
224220
DiscardableUpdateResultJob.perform_later(another_result, name: "2", pause: 0.2)
225221
DiscardableUpdateResultJob.perform_later(@result, name: "3") # Should be discarded
@@ -239,6 +235,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
239235
DiscardableUpdateResultJob.perform_later(@result, name: "2")
240236

241237
wait_for_jobs_to_finish_for(5.seconds)
238+
wait_for_semaphores_to_be_released_for(2.seconds)
239+
242240
assert_no_unfinished_jobs
243241

244242
# Enqueue another job that shouldn't be discarded or blocked
@@ -250,10 +248,18 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
250248
end
251249

252250
private
253-
def assert_stored_sequence(result, *sequences)
254-
expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join }
251+
def assert_stored_sequence(result, sequence)
252+
expected = sequence.sort.map { |name| "s#{name}c#{name}" }.join
255253
skip_active_record_query_cache do
256-
assert_includes expected, result.reload.status
254+
assert_equal expected, result.reload.status.split(" + ").sort.join
255+
end
256+
end
257+
258+
def wait_for_semaphores_to_be_released_for(timeout)
259+
wait_while_with_timeout(timeout) do
260+
skip_active_record_query_cache do
261+
SolidQueue::Semaphore.available.invert_where.any?
262+
end
257263
end
258264
end
259265
end

test/integration/instrumentation_test.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ class InstrumentationTest < ActiveSupport::TestCase
234234
5.times { AddToBufferJob.perform_later("A") }
235235
# 1 ready + 3 blocked
236236
result = JobResult.create!
237-
4.times { SequentialUpdateResultJob.perform_later(result, name: "A") }
237+
4.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") }
238238

239239
events = subscribed("discard_all.solid_queue") do
240240
SolidQueue::ReadyExecution.discard_all_from_jobs(SolidQueue::Job.all)
@@ -261,7 +261,7 @@ class InstrumentationTest < ActiveSupport::TestCase
261261
test "unblocking job emits release_blocked event" do
262262
result = JobResult.create!
263263
# 1 ready, 2 blocked
264-
3.times { SequentialUpdateResultJob.perform_later(result, name: "A") }
264+
3.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") }
265265

266266
# Simulate expiry of the concurrency locks
267267
travel_to 3.days.from_now
@@ -283,11 +283,11 @@ class InstrumentationTest < ActiveSupport::TestCase
283283
test "unblocking jobs in bulk emits release_many_blocked event" do
284284
result = JobResult.create!
285285
# 1 ready, 3 blocked
286-
4.times { SequentialUpdateResultJob.perform_later(result, name: "A") }
286+
4.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") }
287287

288288
# 1 ready, 2 blocked
289289
result = JobResult.create!
290-
3.times { SequentialUpdateResultJob.perform_later(result, name: "B") }
290+
3.times { NonOverlappingUpdateResultJob.perform_later(result, name: "B") }
291291

292292
# Simulate expiry of the concurrency locks
293293
travel_to 3.days.from_now

0 commit comments

Comments
 (0)