Skip to content

Commit 1b4484c

Browse files
authored
Active Job Continuation isolated steps (rails#55212)
Add an isolated option to steps. Defaults to false. Isolated steps are always run in their own job execution. We do this by interrupting the job before running a step if both: 1. It is not the first step we've run in this execution 2. Either this step or the previous one was marked as isolated. The first to be run in an execution is always run inline so we don't end up in an infinite loop of interruptions. This allows you to execute a long running step separately which is useful to ensure that progress is saved before it runs.
1 parent 2994a6c commit 1b4484c

File tree

4 files changed

+112
-15
lines changed

4 files changed

+112
-15
lines changed

activejob/lib/active_job/continuable.rb

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def initialize(...)
3333
attr_accessor :continuation # :nodoc:
3434

3535
# Start a new continuation step
36-
def step(step_name, start: nil, &block)
36+
def step(step_name, start: nil, isolated: false, &block)
3737
unless block_given?
3838
step_method = method(step_name)
3939

@@ -46,7 +46,7 @@ def step(step_name, start: nil, &block)
4646
block = step_method.arity == 0 ? -> (_) { step_method.call } : step_method
4747
end
4848
checkpoint! if continuation.advanced?
49-
continuation.step(step_name, start: start, &block)
49+
continuation.step(step_name, start: start, isolated: isolated, &block)
5050
end
5151

5252
def serialize # :nodoc:
@@ -60,7 +60,12 @@ def deserialize(job_data) # :nodoc:
6060
end
6161

6262
def checkpoint! # :nodoc:
63-
interrupt! if queue_adapter.stopping?
63+
interrupt!(reason: :stopping) if queue_adapter.stopping?
64+
end
65+
66+
def interrupt!(reason:) # :nodoc:
67+
instrument :interrupt, reason: reason, **continuation.instrumentation
68+
raise Continuation::Interrupt, "Interrupted #{continuation.description} (#{reason})"
6469
end
6570

6671
private
@@ -91,11 +96,6 @@ def resume_job(exception) # :nodoc:
9196
raise Continuation::ResumeLimitError, "Job was resumed a maximum of #{max_resumptions} times"
9297
end
9398
end
94-
95-
def interrupt! # :nodoc:
96-
instrument :interrupt, **continuation.instrumentation
97-
raise Continuation::Interrupt, "Interrupted #{continuation.description}"
98-
end
9999
end
100100

101101
ActiveSupport.run_load_hooks(:active_job_continuable, Continuable)

activejob/lib/active_job/continuation.rb

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,21 @@ module ActiveJob
150150
# - a list of the completed steps
151151
# - the current step and its cursor value (if one is in progress)
152152
#
153+
# === Isolated Steps
154+
#
155+
# Steps run sequentially in a single job execution, unless the job is interrupted.
156+
#
157+
# You can specify that a step should always run in its own execution by passing the +isolated: true+ option.
158+
#
159+
# This is useful for long-running steps where it may not be possible to checkpoint within
160+
# the job grace period - it ensures that progress is serialized back into the job data before
161+
# the step starts.
162+
#
163+
# step :quick_step1
164+
# step :slow_step, isolated: true
165+
# step :quick_step2
166+
# step :quick_step3
167+
#
153168
# === Errors
154169
#
155170
# If a job raises an error and is not retried via Active Job, it will be passed back to the underlying
@@ -204,23 +219,24 @@ def initialize(job, serialized_progress) # :nodoc:
204219
@encountered = []
205220
@advanced = false
206221
@running_step = false
222+
@isolating = false
207223
end
208224

209-
def step(name, start:, &block) # :nodoc:
225+
def step(name, **options, &block) # :nodoc:
210226
validate_step!(name)
211227
encountered << name
212228

213229
if completed?(name)
214230
skip_step(name)
215231
else
216-
run_step(name, start: start, &block)
232+
run_step(name, **options, &block)
217233
end
218234
end
219235

220236
def to_h # :nodoc:
221237
{
222238
"completed" => completed.map(&:to_s),
223-
"current" => current&.to_a
239+
"current" => current&.to_a,
224240
}.compact
225241
end
226242

@@ -255,6 +271,10 @@ def running_step?
255271
@running_step
256272
end
257273

274+
def isolating?
275+
@isolating
276+
end
277+
258278
def completed?(name)
259279
completed.include?(name)
260280
end
@@ -267,7 +287,17 @@ def skip_step(name)
267287
instrument :step_skipped, step: name
268288
end
269289

270-
def run_step(name, start:, &block)
290+
def run_step(name, start:, isolated:, &block)
291+
@isolating ||= isolated
292+
293+
if isolating? && advanced?
294+
job.interrupt!(reason: :isolating)
295+
else
296+
run_step_inline(name, start: start, &block)
297+
end
298+
end
299+
300+
def run_step_inline(name, start:, **options, &block)
271301
@running_step = true
272302
@current ||= new_step(name, start, resumed: false)
273303

activejob/lib/active_job/log_subscriber.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def discard(event)
141141
def interrupt(event)
142142
job = event.payload[:job]
143143
info do
144-
"Interrupted #{job.class} (Job ID: #{job.job_id}) #{event.payload[:description]}"
144+
"Interrupted #{job.class} (Job ID: #{job.job_id}) #{event.payload[:description]} (#{event.payload[:reason]})"
145145
end
146146
end
147147
subscribe_log_level :interrupt, :info

activejob/test/cases/continuation_test.rb

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ def perform
258258
assert_no_match "Resuming", @logger.messages
259259
assert_match(/Step 'step_one' started/, @logger.messages)
260260
assert_match(/Step 'step_one' completed/, @logger.messages)
261-
assert_match(/Interrupted ActiveJob::TestContinuation::LinearJob \(Job ID: [0-9a-f-]{36}\) after 'step_one'/, @logger.messages)
261+
assert_match(/Interrupted ActiveJob::TestContinuation::LinearJob \(Job ID: [0-9a-f-]{36}\) after 'step_one' \(stopping\)/, @logger.messages)
262262
end
263263

264264
perform_enqueued_jobs
@@ -278,7 +278,7 @@ def perform
278278
assert_no_match "Resuming", @logger.messages
279279
assert_match(/Step 'rename' started/, @logger.messages)
280280
assert_match(/Step 'rename' interrupted at cursor '433'/, @logger.messages)
281-
assert_match(/Interrupted ActiveJob::TestContinuation::IteratingJob \(Job ID: [0-9a-f-]{36}\) at 'rename', cursor '433'/, @logger.messages)
281+
assert_match(/Interrupted ActiveJob::TestContinuation::IteratingJob \(Job ID: [0-9a-f-]{36}\) at 'rename', cursor '433' \(stopping\)/, @logger.messages)
282282
end
283283

284284
perform_enqueued_jobs
@@ -651,6 +651,73 @@ def perform(iterations)
651651
end
652652
end
653653

654+
class IsolatedStepsJob < ContinuableJob
655+
cattr_accessor :items, default: []
656+
657+
def perform(*isolated)
658+
step :step_one, isolated: isolated.include?(:step_one) do |step|
659+
items << "step_one"
660+
end
661+
step :step_two, isolated: isolated.include?(:step_two) do |step|
662+
items << "step_two"
663+
end
664+
step :step_three, isolated: isolated.include?(:step_three) do |step|
665+
items << "step_three"
666+
end
667+
step :step_four, isolated: isolated.include?(:step_four) do |step|
668+
items << "step_four"
669+
end
670+
end
671+
end
672+
673+
test "runs isolated step separately" do
674+
IsolatedStepsJob.items = []
675+
IsolatedStepsJob.perform_later(:step_three)
676+
677+
assert_enqueued_jobs 1, only: IsolatedStepsJob do
678+
perform_enqueued_jobs
679+
end
680+
681+
assert_equal [ "step_one", "step_two" ], IsolatedStepsJob.items
682+
683+
assert_enqueued_jobs 1 do
684+
perform_enqueued_jobs
685+
end
686+
687+
assert_equal [ "step_one", "step_two", "step_three" ], IsolatedStepsJob.items
688+
689+
assert_enqueued_jobs 0 do
690+
perform_enqueued_jobs
691+
end
692+
693+
assert_equal [ "step_one", "step_two", "step_three", "step_four" ], IsolatedStepsJob.items
694+
assert_match(/Interrupted ActiveJob::TestContinuation::IsolatedStepsJob \(Job ID: [0-9a-f-]{36}\) after 'step_two' \(isolating\)/, @logger.messages)
695+
assert_match(/Interrupted ActiveJob::TestContinuation::IsolatedStepsJob \(Job ID: [0-9a-f-]{36}\) after 'step_three' \(isolating\)/, @logger.messages)
696+
end
697+
698+
test "runs initial and final isolated steps separately" do
699+
IsolatedStepsJob.items = []
700+
IsolatedStepsJob.perform_later(:step_one, :step_four)
701+
702+
assert_enqueued_jobs 1, only: IsolatedStepsJob do
703+
perform_enqueued_jobs
704+
end
705+
706+
assert_equal [ "step_one" ], IsolatedStepsJob.items
707+
708+
assert_enqueued_jobs 1 do
709+
perform_enqueued_jobs
710+
end
711+
712+
assert_equal [ "step_one", "step_two", "step_three" ], IsolatedStepsJob.items
713+
714+
assert_enqueued_jobs 0 do
715+
perform_enqueued_jobs
716+
end
717+
718+
assert_equal [ "step_one", "step_two", "step_three", "step_four" ], IsolatedStepsJob.items
719+
end
720+
654721
private
655722
def capture_info_stdout(&block)
656723
ActiveJob::Base.logger.with(level: :info) do

0 commit comments

Comments
 (0)