diff --git a/test/dummy/app/jobs/sequential_update_result_job.rb b/test/dummy/app/jobs/non_overlapping_update_result_job.rb similarity index 54% rename from test/dummy/app/jobs/sequential_update_result_job.rb rename to test/dummy/app/jobs/non_overlapping_update_result_job.rb index a3afa33f..79ab98e5 100644 --- a/test/dummy/app/jobs/sequential_update_result_job.rb +++ b/test/dummy/app/jobs/non_overlapping_update_result_job.rb @@ -1,3 +1,3 @@ -class SequentialUpdateResultJob < UpdateResultJob +class NonOverlappingUpdateResultJob < UpdateResultJob limits_concurrency key: ->(job_result, **) { job_result } end diff --git a/test/dummy/app/jobs/update_result_job.rb b/test/dummy/app/jobs/update_result_job.rb index 04571eb6..fb016de0 100644 --- a/test/dummy/app/jobs/update_result_job.rb +++ b/test/dummy/app/jobs/update_result_job.rb @@ -1,5 +1,6 @@ class UpdateResultJob < ApplicationJob def perform(job_result, name:, pause: nil, exception: nil) + job_result.status += " + " unless job_result.status.blank? job_result.status += "s#{name}" sleep(pause) if pause diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index f0984078..178c796d 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -6,7 +6,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase self.use_transactional_tests = false setup do - @result = JobResult.create!(queue_name: "default", status: "seq: ") + @result = JobResult.create!(queue_name: "default", status: "") default_worker = { queues: "default", polling_interval: 0.1, processes: 3, threads: 2 } dispatcher = { polling_interval: 0.1, batch_size: 200, concurrency_maintenance_interval: 1 } @@ -20,13 +20,13 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase terminate_process(@pid) if process_exists?(@pid) end - test "run several conflicting jobs over the same record sequentially" do + test "run several conflicting jobs over the same record without overlapping" do ("A".."F").each do |name| - SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds) + NonOverlappingUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds) end ("G".."K").each do |name| - SequentialUpdateResultJob.perform_later(@result, name: name) + NonOverlappingUpdateResultJob.perform_later(@result, name: name) end wait_for_jobs_to_finish_for(5.seconds) @@ -39,11 +39,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase UpdateResultJob.set(wait: 0.23.seconds).perform_later(@result, name: "000", pause: 0.1.seconds) ("A".."F").each_with_index do |name, i| - SequentialUpdateResultJob.set(wait: (0.2 + i * 0.01).seconds).perform_later(@result, name: name, pause: 0.3.seconds) + NonOverlappingUpdateResultJob.set(wait: (0.2 + i * 0.01).seconds).perform_later(@result, name: name, pause: 0.3.seconds) end ("G".."K").each_with_index do |name, i| - SequentialUpdateResultJob.set(wait: (0.3 + i * 0.01).seconds).perform_later(@result, name: name) + NonOverlappingUpdateResultJob.set(wait: (0.3 + i * 0.01).seconds).perform_later(@result, name: name) end wait_for_jobs_to_finish_for(5.seconds) @@ -85,11 +85,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase test "run several jobs over the same record sequentially, with some of them failing" do ("A".."F").each_with_index do |name, i| # A, C, E will fail, for i= 0, 2, 4 - SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?)) + NonOverlappingUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?)) end ("G".."K").each do |name| - SequentialUpdateResultJob.perform_later(@result, name: name) + NonOverlappingUpdateResultJob.perform_later(@result, name: name) end wait_for_jobs_to_finish_for(5.seconds) @@ -100,7 +100,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase test "rely on dispatcher to unblock blocked executions with an available semaphore" do # Simulate a scenario where we got an available semaphore and some stuck jobs - job = SequentialUpdateResultJob.perform_later(@result, name: "A") + job = NonOverlappingUpdateResultJob.perform_later(@result, name: "A") wait_for_jobs_to_finish_for(5.seconds) assert_no_unfinished_jobs @@ -114,7 +114,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # Now enqueue more jobs under that same key. They'll be all locked assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do ("B".."K").each do |name| - SequentialUpdateResultJob.perform_later(@result, name: name) + NonOverlappingUpdateResultJob.perform_later(@result, name: name) end end @@ -127,14 +127,12 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase wait_for_jobs_to_finish_for(5.seconds) assert_no_unfinished_jobs - # We can't ensure the order between B and C, because it depends on which worker wins when - # unblocking, as one will try to unblock B and another C - assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a + assert_stored_sequence @result, ("A".."K").to_a end test "rely on dispatcher to unblock blocked executions with an expired semaphore" do # Simulate a scenario where we got an available semaphore and some stuck jobs - job = SequentialUpdateResultJob.perform_later(@result, name: "A") + job = NonOverlappingUpdateResultJob.perform_later(@result, name: "A") wait_for_jobs_to_finish_for(5.seconds) assert_no_unfinished_jobs @@ -147,7 +145,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # Now enqueue more jobs under that same key. They'll be all locked assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do ("B".."K").each do |name| - SequentialUpdateResultJob.perform_later(@result, name: name) + NonOverlappingUpdateResultJob.perform_later(@result, name: name) end end @@ -159,13 +157,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase wait_for_jobs_to_finish_for(5.seconds) assert_no_unfinished_jobs - # We can't ensure the order between B and C, because it depends on which worker wins when - # unblocking, as one will try to unblock B and another C - assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a + assert_stored_sequence @result, ("A".."K").to_a end test "don't block claimed executions that get released" do - SequentialUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds) + NonOverlappingUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds) job = SolidQueue::Job.last sleep(0.2) @@ -184,8 +180,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase skip if Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 2 ActiveRecord::Base.transaction do - SequentialUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds) - SequentialUpdateResultJob.perform_later(@result, name: "B") + NonOverlappingUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds) + NonOverlappingUpdateResultJob.perform_later(@result, name: "B") begin assert_equal 2, SolidQueue::Job.count @@ -219,7 +215,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end test "discard on conflict across different concurrency keys" do - another_result = JobResult.create!(queue_name: "default", status: "seq: ") + another_result = JobResult.create!(queue_name: "default", status: "") DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.2) DiscardableUpdateResultJob.perform_later(another_result, name: "2", pause: 0.2) DiscardableUpdateResultJob.perform_later(@result, name: "3") # Should be discarded @@ -239,6 +235,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase DiscardableUpdateResultJob.perform_later(@result, name: "2") wait_for_jobs_to_finish_for(5.seconds) + wait_for_semaphores_to_be_released_for(2.seconds) + assert_no_unfinished_jobs # Enqueue another job that shouldn't be discarded or blocked @@ -250,10 +248,18 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end private - def assert_stored_sequence(result, *sequences) - expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join } + def assert_stored_sequence(result, sequence) + expected = sequence.sort.map { |name| "s#{name}c#{name}" }.join skip_active_record_query_cache do - assert_includes expected, result.reload.status + assert_equal expected, result.reload.status.split(" + ").sort.join + end + end + + def wait_for_semaphores_to_be_released_for(timeout) + wait_while_with_timeout(timeout) do + skip_active_record_query_cache do + SolidQueue::Semaphore.available.invert_where.any? + end end end end diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index 95dadb19..d6a039dd 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -234,7 +234,7 @@ class InstrumentationTest < ActiveSupport::TestCase 5.times { AddToBufferJob.perform_later("A") } # 1 ready + 3 blocked result = JobResult.create! - 4.times { SequentialUpdateResultJob.perform_later(result, name: "A") } + 4.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") } events = subscribed("discard_all.solid_queue") do SolidQueue::ReadyExecution.discard_all_from_jobs(SolidQueue::Job.all) @@ -261,7 +261,7 @@ class InstrumentationTest < ActiveSupport::TestCase test "unblocking job emits release_blocked event" do result = JobResult.create! # 1 ready, 2 blocked - 3.times { SequentialUpdateResultJob.perform_later(result, name: "A") } + 3.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") } # Simulate expiry of the concurrency locks travel_to 3.days.from_now @@ -283,11 +283,11 @@ class InstrumentationTest < ActiveSupport::TestCase test "unblocking jobs in bulk emits release_many_blocked event" do result = JobResult.create! # 1 ready, 3 blocked - 4.times { SequentialUpdateResultJob.perform_later(result, name: "A") } + 4.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") } # 1 ready, 2 blocked result = JobResult.create! - 3.times { SequentialUpdateResultJob.perform_later(result, name: "B") } + 3.times { NonOverlappingUpdateResultJob.perform_later(result, name: "B") } # Simulate expiry of the concurrency locks travel_to 3.days.from_now