@@ -6,7 +6,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
66  self . use_transactional_tests  =  false 
77
88  setup  do 
9-     @result  =  JobResult . create! ( queue_name : "default" ,  status : "seq:  " ) 
9+     @result  =  JobResult . create! ( queue_name : "default" ,  status : "" ) 
1010
1111    default_worker  =  {  queues : "default" ,  polling_interval : 0.1 ,  processes : 3 ,  threads : 2  } 
1212    dispatcher  =  {  polling_interval : 0.1 ,  batch_size : 200 ,  concurrency_maintenance_interval : 1  } 
@@ -20,13 +20,13 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
2020    terminate_process ( @pid )  if  process_exists? ( @pid ) 
2121  end 
2222
23-   test  "run several conflicting jobs over the same record sequentially "  do 
23+   test  "run several conflicting jobs over the same record without overlapping "  do 
2424    ( "A" .."F" ) . each  do  |name |
25-       SequentialUpdateResultJob . perform_later ( @result ,  name : name ,  pause : 0.2 . seconds ) 
25+       NonOverlappingUpdateResultJob . perform_later ( @result ,  name : name ,  pause : 0.2 . seconds ) 
2626    end 
2727
2828    ( "G" .."K" ) . each  do  |name |
29-       SequentialUpdateResultJob . perform_later ( @result ,  name : name ) 
29+       NonOverlappingUpdateResultJob . perform_later ( @result ,  name : name ) 
3030    end 
3131
3232    wait_for_jobs_to_finish_for ( 5 . seconds ) 
@@ -39,11 +39,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
3939    UpdateResultJob . set ( wait : 0.23 . seconds ) . perform_later ( @result ,  name : "000" ,  pause : 0.1 . seconds ) 
4040
4141    ( "A" .."F" ) . each_with_index  do  |name ,  i |
42-       SequentialUpdateResultJob . set ( wait : ( 0.2  + i  * 0.01 ) . seconds ) . perform_later ( @result ,  name : name ,  pause : 0.3 . seconds ) 
42+       NonOverlappingUpdateResultJob . set ( wait : ( 0.2  + i  * 0.01 ) . seconds ) . perform_later ( @result ,  name : name ,  pause : 0.3 . seconds ) 
4343    end 
4444
4545    ( "G" .."K" ) . each_with_index  do  |name ,  i |
46-       SequentialUpdateResultJob . set ( wait : ( 0.3  + i  * 0.01 ) . seconds ) . perform_later ( @result ,  name : name ) 
46+       NonOverlappingUpdateResultJob . set ( wait : ( 0.3  + i  * 0.01 ) . seconds ) . perform_later ( @result ,  name : name ) 
4747    end 
4848
4949    wait_for_jobs_to_finish_for ( 5 . seconds ) 
@@ -85,11 +85,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
8585  test  "run several jobs over the same record sequentially, with some of them failing"  do 
8686    ( "A" .."F" ) . each_with_index  do  |name ,  i |
8787      # A, C, E will fail, for i= 0, 2, 4 
88-       SequentialUpdateResultJob . perform_later ( @result ,  name : name ,  pause : 0.2 . seconds ,  exception : ( ExpectedTestError  if  i . even? ) ) 
88+       NonOverlappingUpdateResultJob . perform_later ( @result ,  name : name ,  pause : 0.2 . seconds ,  exception : ( ExpectedTestError  if  i . even? ) ) 
8989    end 
9090
9191    ( "G" .."K" ) . each  do  |name |
92-       SequentialUpdateResultJob . perform_later ( @result ,  name : name ) 
92+       NonOverlappingUpdateResultJob . perform_later ( @result ,  name : name ) 
9393    end 
9494
9595    wait_for_jobs_to_finish_for ( 5 . seconds ) 
@@ -100,7 +100,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
100100
101101  test  "rely on dispatcher to unblock blocked executions with an available semaphore"  do 
102102    # Simulate a scenario where we got an available semaphore and some stuck jobs 
103-     job  =  SequentialUpdateResultJob . perform_later ( @result ,  name : "A" ) 
103+     job  =  NonOverlappingUpdateResultJob . perform_later ( @result ,  name : "A" ) 
104104
105105    wait_for_jobs_to_finish_for ( 5 . seconds ) 
106106    assert_no_unfinished_jobs 
@@ -114,7 +114,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
114114    # Now enqueue more jobs under that same key. They'll be all locked 
115115    assert_difference  ->  {  SolidQueue ::BlockedExecution . count  } ,  +10  do 
116116      ( "B" .."K" ) . each  do  |name |
117-         SequentialUpdateResultJob . perform_later ( @result ,  name : name ) 
117+         NonOverlappingUpdateResultJob . perform_later ( @result ,  name : name ) 
118118      end 
119119    end 
120120
@@ -127,14 +127,12 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
127127    wait_for_jobs_to_finish_for ( 5 . seconds ) 
128128    assert_no_unfinished_jobs 
129129
130-     # We can't ensure the order between B and C, because it depends on which worker wins when 
131-     # unblocking, as one will try to unblock B and another C 
132-     assert_stored_sequence  @result ,  ( "A" .."K" ) . to_a ,  [  "A" ,  "C" ,  "B"  ]  + ( "D" .."K" ) . to_a 
130+     assert_stored_sequence  @result ,  ( "A" .."K" ) . to_a 
133131  end 
134132
135133  test  "rely on dispatcher to unblock blocked executions with an expired semaphore"  do 
136134    # Simulate a scenario where we got an available semaphore and some stuck jobs 
137-     job  =  SequentialUpdateResultJob . perform_later ( @result ,  name : "A" ) 
135+     job  =  NonOverlappingUpdateResultJob . perform_later ( @result ,  name : "A" ) 
138136    wait_for_jobs_to_finish_for ( 5 . seconds ) 
139137    assert_no_unfinished_jobs 
140138
@@ -147,7 +145,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
147145    # Now enqueue more jobs under that same key. They'll be all locked 
148146    assert_difference  ->  {  SolidQueue ::BlockedExecution . count  } ,  +10  do 
149147      ( "B" .."K" ) . each  do  |name |
150-         SequentialUpdateResultJob . perform_later ( @result ,  name : name ) 
148+         NonOverlappingUpdateResultJob . perform_later ( @result ,  name : name ) 
151149      end 
152150    end 
153151
@@ -159,13 +157,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
159157    wait_for_jobs_to_finish_for ( 5 . seconds ) 
160158    assert_no_unfinished_jobs 
161159
162-     # We can't ensure the order between B and C, because it depends on which worker wins when 
163-     # unblocking, as one will try to unblock B and another C 
164-     assert_stored_sequence  @result ,  ( "A" .."K" ) . to_a ,  [  "A" ,  "C" ,  "B"  ]  + ( "D" .."K" ) . to_a 
160+     assert_stored_sequence  @result ,  ( "A" .."K" ) . to_a 
165161  end 
166162
167163  test  "don't block claimed executions that get released"  do 
168-     SequentialUpdateResultJob . perform_later ( @result ,  name : "I'll be released to ready" ,  pause : SolidQueue . shutdown_timeout  + 10 . seconds ) 
164+     NonOverlappingUpdateResultJob . perform_later ( @result ,  name : "I'll be released to ready" ,  pause : SolidQueue . shutdown_timeout  + 10 . seconds ) 
169165    job  =  SolidQueue ::Job . last 
170166
171167    sleep ( 0.2 ) 
@@ -184,8 +180,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
184180    skip  if  Rails ::VERSION ::MAJOR  == 7  && Rails ::VERSION ::MINOR  == 2 
185181
186182    ActiveRecord ::Base . transaction  do 
187-       SequentialUpdateResultJob . perform_later ( @result ,  name : "A" ,  pause : 0.2 . seconds ) 
188-       SequentialUpdateResultJob . perform_later ( @result ,  name : "B" ) 
183+       NonOverlappingUpdateResultJob . perform_later ( @result ,  name : "A" ,  pause : 0.2 . seconds ) 
184+       NonOverlappingUpdateResultJob . perform_later ( @result ,  name : "B" ) 
189185
190186      begin 
191187        assert_equal  2 ,  SolidQueue ::Job . count 
@@ -219,7 +215,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
219215  end 
220216
221217  test  "discard on conflict across different concurrency keys"  do 
222-     another_result  =  JobResult . create! ( queue_name : "default" ,  status : "seq:  " ) 
218+     another_result  =  JobResult . create! ( queue_name : "default" ,  status : "" ) 
223219    DiscardableUpdateResultJob . perform_later ( @result ,  name : "1" ,  pause : 0.2 ) 
224220    DiscardableUpdateResultJob . perform_later ( another_result ,  name : "2" ,  pause : 0.2 ) 
225221    DiscardableUpdateResultJob . perform_later ( @result ,  name : "3" )  # Should be discarded 
@@ -239,6 +235,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
239235    DiscardableUpdateResultJob . perform_later ( @result ,  name : "2" ) 
240236
241237    wait_for_jobs_to_finish_for ( 5 . seconds ) 
238+     wait_for_semaphores_to_be_released_for ( 2 . seconds ) 
239+ 
242240    assert_no_unfinished_jobs 
243241
244242    # Enqueue another job that shouldn't be discarded or blocked 
@@ -250,10 +248,18 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
250248  end 
251249
252250  private 
253-     def  assert_stored_sequence ( result ,  * sequences ) 
254-       expected  =  sequences . map   {  | sequence |  "seq: "  +  sequence . map  {  |name | "s#{ name }  c#{ name }  "  } . join   } 
251+     def  assert_stored_sequence ( result ,  sequence ) 
252+       expected  =  sequence . sort . map  {  |name | "s#{ name }  c#{ name }  "  } . join 
255253      skip_active_record_query_cache  do 
256-         assert_includes  expected ,  result . reload . status 
254+         assert_equal  expected ,  result . reload . status . split ( " + " ) . sort . join 
255+       end 
256+     end 
257+ 
258+     def  wait_for_semaphores_to_be_released_for ( timeout ) 
259+       wait_while_with_timeout ( timeout )  do 
260+         skip_active_record_query_cache  do 
261+           SolidQueue ::Semaphore . available . invert_where . any? 
262+         end 
257263      end 
258264    end 
259265end 
0 commit comments