6
6
require "active_support/core_ext/object/with"
7
7
require "support/test_logger"
8
8
require "support/do_not_perform_enqueued_jobs"
9
- require "jobs/continuable_array_cursor_job"
10
- require "jobs/continuable_iterating_job"
11
- require "jobs/continuable_linear_job"
12
- require "jobs/continuable_deleting_job"
13
- require "jobs/continuable_duplicate_step_job"
14
- require "jobs/continuable_nested_steps_job"
15
- require "jobs/continuable_string_step_name_job"
16
- require "jobs/continuable_resume_wrong_step_job"
17
- require "jobs/continuable_nested_cursor_job"
18
9
19
10
return unless adapter_is? ( :test )
20
11
21
- class ContinuableJob < ActiveJob ::Base
22
- include ActiveJob ::Continuable
23
- end
24
-
25
12
class ActiveJob ::TestContinuation < ActiveSupport ::TestCase
26
13
include ActiveJob ::Continuation ::TestHelper
27
14
include ActiveSupport ::Testing ::Stream
28
15
include DoNotPerformEnqueuedJobs
29
16
include TestLoggerHelper
30
17
18
+ class ContinuableJob < ActiveJob ::Base
19
+ include ActiveJob ::Continuable
20
+ end
21
+
22
+ ContinuableIteratingRecord = Struct . new ( :id , :name ) do
23
+ cattr_accessor :records
24
+
25
+ def self . find_each ( start : nil )
26
+ records . sort_by ( &:id ) . each do |record |
27
+ next if start && record . id < start
28
+
29
+ yield record
30
+ end
31
+ end
32
+ end
33
+
34
+ class ContinuableIteratingJob < ContinuableJob
35
+ def perform ( raise_when_cursor : nil )
36
+ step :rename do |step |
37
+ ContinuableIteratingRecord . find_each ( start : step . cursor ) do |record |
38
+ raise StandardError , "Cursor error" if raise_when_cursor && step . cursor == raise_when_cursor
39
+ record . name = "new_#{ record . name } "
40
+ step . advance! from : record . id
41
+ end
42
+ end
43
+ end
44
+ end
45
+
31
46
test "iterates" do
32
47
ContinuableIteratingRecord . records = [ 123 , 432 , 6565 , 3243 , 234 , 13 , 22 ] . map { |i | ContinuableIteratingRecord . new ( i , "item_#{ i } " ) }
33
48
@@ -60,6 +75,34 @@ class ActiveJob::TestContinuation < ActiveSupport::TestCase
60
75
assert_equal %w[ new_item_123 new_item_432 new_item_6565 new_item_3243 new_item_234 new_item_13 new_item_22 ] , ContinuableIteratingRecord . records . map ( &:name )
61
76
end
62
77
78
+ class ContinuableLinearJob < ContinuableJob
79
+ cattr_accessor :items
80
+
81
+ def perform
82
+ step :step_one
83
+ step :step_two
84
+ step :step_three
85
+ step :step_four
86
+ end
87
+
88
+ private
89
+ def step_one
90
+ items << "item1"
91
+ end
92
+
93
+ def step_two
94
+ items << "item2"
95
+ end
96
+
97
+ def step_three
98
+ items << "item3"
99
+ end
100
+
101
+ def step_four
102
+ items << "item4"
103
+ end
104
+ end
105
+
63
106
test "linear steps" do
64
107
ContinuableLinearJob . items = [ ]
65
108
ContinuableLinearJob . perform_later
@@ -97,6 +140,20 @@ class ActiveJob::TestContinuation < ActiveSupport::TestCase
97
140
assert_equal %w[ item1 item2 item3 item4 ] , ContinuableLinearJob . items
98
141
end
99
142
143
+ class ContinuableDeletingJob < ContinuableJob
144
+ cattr_accessor :items
145
+
146
+ def perform
147
+ step :delete do |step |
148
+ loop do
149
+ break if items . empty?
150
+ items . shift
151
+ step . checkpoint!
152
+ end
153
+ end
154
+ end
155
+ end
156
+
100
157
test "does not retry jobs that error without updating the cursor" do
101
158
ContinuableDeletingJob . items = 10 . times . map { |i | "item_#{ i } " }
102
159
ContinuableDeletingJob . perform_later
@@ -112,6 +169,25 @@ class ActiveJob::TestContinuation < ActiveSupport::TestCase
112
169
assert_equal %w[ item_1 item_2 item_3 item_4 item_5 item_6 item_7 item_8 item_9 ] , ContinuableDeletingJob . items
113
170
end
114
171
172
+ test "interrupts without cursors" do
173
+ ContinuableDeletingJob . items = 10 . times . map { |i | "item_#{ i } " }
174
+ ContinuableDeletingJob . perform_later
175
+
176
+ interrupt_job_during_step ContinuableDeletingJob , :delete do
177
+ assert_enqueued_jobs 1 , only : ContinuableDeletingJob do
178
+ perform_enqueued_jobs
179
+ end
180
+ end
181
+
182
+ assert_equal 9 , ContinuableDeletingJob . items . count
183
+
184
+ assert_enqueued_jobs 0 do
185
+ perform_enqueued_jobs
186
+ end
187
+
188
+ assert_equal 0 , ContinuableDeletingJob . items . count
189
+ end
190
+
115
191
test "saves progress when there is an error" do
116
192
ContinuableIteratingRecord . records = [ 123 , 432 , 6565 , 3243 , 234 , 13 , 22 ] . map { |i | ContinuableIteratingRecord . new ( i , "item_#{ i } " ) }
117
193
@@ -163,13 +239,13 @@ class ActiveJob::TestContinuation < ActiveSupport::TestCase
163
239
assert_no_match "Resuming" , @logger . messages
164
240
assert_match ( /Step 'step_one' started/ , @logger . messages )
165
241
assert_match ( /Step 'step_one' completed/ , @logger . messages )
166
- assert_match ( /Interrupted ContinuableLinearJob \( Job ID: [0-9a-f-]{36}\) after 'step_one'/ , @logger . messages )
242
+ assert_match ( /Interrupted ActiveJob::TestContinuation:: ContinuableLinearJob \( Job ID: [0-9a-f-]{36}\) after 'step_one'/ , @logger . messages )
167
243
end
168
244
169
245
perform_enqueued_jobs
170
246
171
247
assert_match ( /Step 'step_one' skipped/ , @logger . messages )
172
- assert_match ( /Resuming ContinuableLinearJob \( Job ID: [0-9a-f-]{36}\) after 'step_one'/ , @logger . messages )
248
+ assert_match ( /Resuming ActiveJob::TestContinuation:: ContinuableLinearJob \( Job ID: [0-9a-f-]{36}\) after 'step_one'/ , @logger . messages )
173
249
assert_match ( /Step 'step_two' started/ , @logger . messages )
174
250
assert_match ( /Step 'step_two' completed/ , @logger . messages )
175
251
end
@@ -183,32 +259,22 @@ class ActiveJob::TestContinuation < ActiveSupport::TestCase
183
259
assert_no_match "Resuming" , @logger . messages
184
260
assert_match ( /Step 'rename' started/ , @logger . messages )
185
261
assert_match ( /Step 'rename' interrupted at cursor '433'/ , @logger . messages )
186
- assert_match ( /Interrupted ContinuableIteratingJob \( Job ID: [0-9a-f-]{36}\) at 'rename', cursor '433'/ , @logger . messages )
262
+ assert_match ( /Interrupted ActiveJob::TestContinuation:: ContinuableIteratingJob \( Job ID: [0-9a-f-]{36}\) at 'rename', cursor '433'/ , @logger . messages )
187
263
end
188
264
189
265
perform_enqueued_jobs
190
- assert_match ( /Resuming ContinuableIteratingJob \( Job ID: [0-9a-f-]{36}\) at 'rename', cursor '433'/ , @logger . messages )
266
+ assert_match ( /Resuming ActiveJob::TestContinuation:: ContinuableIteratingJob \( Job ID: [0-9a-f-]{36}\) at 'rename', cursor '433'/ , @logger . messages )
191
267
assert_match ( /Step 'rename' resumed from cursor '433'/ , @logger . messages )
192
268
assert_match ( /Step 'rename' completed/ , @logger . messages )
193
269
end
194
270
195
- test "interrupts without cursors" do
196
- ContinuableDeletingJob . items = 10 . times . map { |i | "item_#{ i } " }
197
- ContinuableDeletingJob . perform_later
198
-
199
- interrupt_job_during_step ContinuableDeletingJob , :delete do
200
- assert_enqueued_jobs 1 , only : ContinuableDeletingJob do
201
- perform_enqueued_jobs
271
+ class ContinuableDuplicateStepJob < ContinuableJob
272
+ def perform
273
+ step :duplicate do |step |
274
+ end
275
+ step :duplicate do |step |
202
276
end
203
277
end
204
-
205
- assert_equal 9 , ContinuableDeletingJob . items . count
206
-
207
- assert_enqueued_jobs 0 do
208
- perform_enqueued_jobs
209
- end
210
-
211
- assert_equal 0 , ContinuableDeletingJob . items . count
212
278
end
213
279
214
280
test "duplicate steps raise an error" do
@@ -221,6 +287,19 @@ class ActiveJob::TestContinuation < ActiveSupport::TestCase
221
287
assert_equal "Step 'duplicate' has already been encountered" , exception . message
222
288
end
223
289
290
+ class ContinuableNestedStepsJob < ContinuableJob
291
+ def perform
292
+ step :outer_step do
293
+ # Not allowed!
294
+ step :inner_step do
295
+ end
296
+ end
297
+ end
298
+
299
+ private
300
+ def inner_step ; end
301
+ end
302
+
224
303
test "nested steps raise an error" do
225
304
ContinuableNestedStepsJob . perform_later
226
305
@@ -231,6 +310,13 @@ class ActiveJob::TestContinuation < ActiveSupport::TestCase
231
310
assert_equal "Step 'inner_step' is nested inside step 'outer_step'" , exception . message
232
311
end
233
312
313
+ class ContinuableStringStepNameJob < ContinuableJob
314
+ def perform
315
+ step "string_step_name" do
316
+ end
317
+ end
318
+ end
319
+
234
320
test "string named steps raise an error" do
235
321
ContinuableStringStepNameJob . perform_later
236
322
@@ -241,6 +327,21 @@ class ActiveJob::TestContinuation < ActiveSupport::TestCase
241
327
assert_equal "Step 'string_step_name' must be a Symbol, found 'String'" , exception . message
242
328
end
243
329
330
+ class ContinuableResumeWrongStepJob < ContinuableJob
331
+ def perform
332
+ if continuation . send ( :started? )
333
+ step :unexpected do |step |
334
+ end
335
+ else
336
+ step :iterating , start : 0 do |step |
337
+ ( ( step . cursor || 1 ) ..4 ) . each do |i |
338
+ step . advance!
339
+ end
340
+ end
341
+ end
342
+ end
343
+ end
344
+
244
345
test "unexpected step on resumption raises an error" do
245
346
ContinuableResumeWrongStepJob . perform_later
246
347
@@ -300,6 +401,24 @@ def perform(start_from, advance_from = nil)
300
401
assert_equal 0 , ContinuableDeletingJob . items . count
301
402
end
302
403
404
+ class ContinuableNestedCursorJob < ContinuableJob
405
+ cattr_accessor :items
406
+
407
+ def perform
408
+ step :updating_sub_items , start : [ 0 , 0 ] do |step |
409
+ items [ step . cursor [ 0 ] ..] . each do |inner_items |
410
+ inner_items [ step . cursor [ 1 ] ..] . each do |item |
411
+ items [ step . cursor [ 0 ] ] [ step . cursor [ 1 ] ] = "new_#{ item } "
412
+
413
+ step . set! [ step . cursor [ 0 ] , step . cursor [ 1 ] + 1 ]
414
+ end
415
+
416
+ step . set! [ step . cursor [ 0 ] + 1 , 0 ]
417
+ end
418
+ end
419
+ end
420
+ end
421
+
303
422
test "nested cursor" do
304
423
ContinuableNestedCursorJob . items = [
305
424
3 . times . map { |i | "subitem_0_#{ i } " } ,
@@ -339,6 +458,19 @@ def perform(start_from, advance_from = nil)
339
458
assert_equal [ %w[ new_subitem_0_0 new_subitem_0_1 new_subitem_0_2 ] , %w[ new_subitem_1_0 ] , %w[ new_subitem_2_0 new_subitem_2_1 ] ] , ContinuableNestedCursorJob . items
340
459
end
341
460
461
+ class ContinuableArrayCursorJob < ContinuableJob
462
+ cattr_accessor :items , default : [ ]
463
+
464
+ def perform ( objects )
465
+ step :iterate_objects , start : 0 do |step |
466
+ objects [ step . cursor ..] . each do |object |
467
+ items << object
468
+ step . advance!
469
+ end
470
+ end
471
+ end
472
+ end
473
+
342
474
test "iterates over array cursor" do
343
475
ContinuableArrayCursorJob . items = [ ]
344
476
0 commit comments