Skip to content

Commit e9626a3

Browse files
authored
Active Job Continuations continued (rails#55174)
Follow up to rails#55127 and rails#55151 - Instrument steps in a block so that runtime is recorded (matching perform/perform_started) - Allow job resumption configuration - max_resumptions, resume_options, resume_errors_after_advancing - Add Active Record Railtie to prevent checkpoints in database transactions - Checkpoint before each step except the first, rather than after each step. - Error if order of completed steps changes when re-running
1 parent 4b7fb1d commit e9626a3

File tree

12 files changed

+431
-110
lines changed

12 files changed

+431
-110
lines changed

activejob/lib/active_job/continuable.rb

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,26 @@ module ActiveJob
1111
module Continuable
1212
extend ActiveSupport::Concern
1313

14-
CONTINUATION_KEY = "continuation"
15-
1614
included do
17-
retry_on Continuation::Interrupt, attempts: :unlimited
18-
retry_on Continuation::AfterAdvancingError, attempts: :unlimited
15+
class_attribute :max_resumptions, instance_writer: false
16+
class_attribute :resume_options, instance_writer: false, default: { wait: 5.seconds }
17+
class_attribute :resume_errors_after_advancing, instance_writer: false, default: true
1918

2019
around_perform :continue
20+
21+
def initialize(...)
22+
super(...)
23+
self.resumptions = 0
24+
self.continuation = Continuation.new(self, {})
25+
end
2126
end
2227

28+
# The number of times the job has been resumed.
29+
attr_accessor :resumptions
30+
31+
attr_accessor :continuation # :nodoc:
32+
33+
# Start a new continuation step
2334
def step(step_name, start: nil, &block)
2435
unless block_given?
2536
step_method = method(step_name)
@@ -32,25 +43,58 @@ def step(step_name, start: nil, &block)
3243

3344
block = step_method.arity == 0 ? -> (_) { step_method.call } : step_method
3445
end
46+
checkpoint! if continuation.advanced?
3547
continuation.step(step_name, start: start, &block)
3648
end
3749

38-
def serialize
39-
super.merge(CONTINUATION_KEY => continuation.to_h)
50+
def serialize # :nodoc:
51+
super.merge("continuation" => continuation.to_h, "resumptions" => resumptions)
4052
end
4153

42-
def deserialize(job_data)
54+
def deserialize(job_data) # :nodoc:
4355
super
44-
@continuation = Continuation.new(self, job_data.fetch(CONTINUATION_KEY, {}))
56+
self.continuation = Continuation.new(self, job_data.fetch("continuation", {}))
57+
self.resumptions = job_data.fetch("resumptions", 0)
58+
end
59+
60+
def checkpoint! # :nodoc:
61+
interrupt! if queue_adapter.stopping?
4562
end
4663

4764
private
48-
def continuation
49-
@continuation ||= Continuation.new(self, {})
65+
def continue(&block)
66+
if continuation.started?
67+
self.resumptions += 1
68+
instrument :resume, **continuation.instrumentation
69+
end
70+
71+
block.call
72+
rescue Continuation::Interrupt => e
73+
resume_job(e)
74+
rescue Continuation::Error
75+
raise
76+
rescue StandardError => e
77+
if resume_errors_after_advancing? && continuation.advanced?
78+
resume_job(exception: e)
79+
else
80+
raise
81+
end
5082
end
5183

52-
def continue(&block)
53-
continuation.continue(&block)
84+
def resume_job(exception) # :nodoc:
85+
executions_for(exception)
86+
if max_resumptions.nil? || resumptions < max_resumptions
87+
retry_job(**self.resume_options)
88+
else
89+
raise Continuation::ResumeLimitError, "Job was resumed a maximum of #{max_resumptions} times"
90+
end
91+
end
92+
93+
def interrupt! # :nodoc:
94+
instrument :interrupt, **continuation.instrumentation
95+
raise Continuation::Interrupt, "Interrupted #{continuation.description}"
5496
end
5597
end
98+
99+
ActiveSupport.run_load_hooks(:active_job_continuable, Continuable)
56100
end

activejob/lib/active_job/continuation.rb

Lines changed: 52 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ module ActiveJob
134134
# +queue_adapter.stopping?+. If it returns true, the job will raise an
135135
# ActiveJob::Continuation::Interrupt exception.
136136
#
137-
# There is an automatic checkpoint at the end of each step. Within a step one is
138-
# created when calling +set!+, +advance!+ or +checkpoint!+.
137+
# There is an automatic checkpoint before the start of each step except for the first for
138+
# each job execution. Within a step one is created when calling +set!+, +advance!+ or +checkpoint!+.
139139
#
140140
# Jobs are not automatically interrupted when the queue adapter is marked as stopping - they
141141
# will continue to run either until the next checkpoint, or when the process is stopped.
@@ -158,49 +158,57 @@ module ActiveJob
158158
# To mitigate this, the job will be automatically retried if it raises an error after it has made progress.
159159
# Making progress is defined as having completed a step or advanced the cursor within the current step.
160160
#
161+
# === Configuration
162+
#
163+
# Continuable jobs have several configuration options:
164+
# * :max_resumptions</tt> - The maximum number of times a job can be resumed. Defaults to +nil+ which means
165+
# unlimited resumptions.
166+
# * <tt>:resume_options</tt> - Options to pass to +retry_job+ when resuming the job.
167+
# Defaults to +{ wait: 5.seconds }+.
168+
# See +ActiveJob::Exceptions#retry_job+ for available options.
169+
# * <tt>:resume_errors_after_advancing</tt> - Whether to resume errors after advancing the continuation.
170+
# Defaults to +true+.
161171
class Continuation
162172
extend ActiveSupport::Autoload
163173

164174
autoload :Step
175+
autoload :Validation
165176

166177
# Raised when a job is interrupted, allowing Active Job to requeue it.
167178
# This inherits from +Exception+ rather than +StandardError+, so it's not
168179
# caught by normal exception handling.
169180
class Interrupt < Exception; end
170181

171-
# Base error class for all Continuation errors.
182+
# Base class for all Continuation errors.
172183
class Error < StandardError; end
173184

174185
# Raised when a step is invalid.
175186
class InvalidStepError < Error; end
176187

188+
# Raised when there is an error with a checkpoint, such as open database transactions.
189+
class CheckpointError < Error; end
190+
177191
# Raised when attempting to advance a cursor that doesn't implement `succ`.
178192
class UnadvanceableCursorError < Error; end
179193

180-
# Raised when an error occurs after a job has made progress.
181-
#
182-
# The job will be automatically retried to ensure that the progress is serialized
183-
# in the retried job.
184-
class AfterAdvancingError < Error; end
194+
# Raised when a job has reached its limit of the number of resumes.
195+
# The limit is defined by the +max_resumes+ class attribute.
196+
class ResumeLimitError < Error; end
185197

186-
def initialize(job, serialized_progress)
198+
include Validation
199+
200+
def initialize(job, serialized_progress) # :nodoc:
187201
@job = job
188202
@completed = serialized_progress.fetch("completed", []).map(&:to_sym)
189203
@current = new_step(*serialized_progress["current"], resumed: true) if serialized_progress.key?("current")
190-
@encountered_step_names = []
204+
@encountered = []
191205
@advanced = false
192206
@running_step = false
193207
end
194208

195-
def continue(&block)
196-
wrapping_errors_after_advancing do
197-
instrument_job :resume if started?
198-
block.call
199-
end
200-
end
201-
202-
def step(name, start:, &block)
209+
def step(name, start:, &block) # :nodoc:
203210
validate_step!(name)
211+
encountered << name
204212

205213
if completed?(name)
206214
skip_step(name)
@@ -209,14 +217,14 @@ def step(name, start:, &block)
209217
end
210218
end
211219

212-
def to_h
220+
def to_h # :nodoc:
213221
{
214222
"completed" => completed.map(&:to_s),
215223
"current" => current&.to_a
216224
}.compact
217225
end
218226

219-
def description
227+
def description # :nodoc:
220228
if current
221229
current.description
222230
elsif completed.any?
@@ -226,36 +234,33 @@ def description
226234
end
227235
end
228236

229-
private
230-
attr_reader :job, :encountered_step_names, :completed, :current
237+
def started?
238+
completed.any? || current.present?
239+
end
231240

232-
def advanced?
233-
@advanced
234-
end
241+
def advanced?
242+
@advanced
243+
end
244+
245+
def instrumentation
246+
{ description: description,
247+
completed_steps: completed,
248+
current_step: current }
249+
end
250+
251+
private
252+
attr_reader :job, :encountered, :completed, :current
235253

236254
def running_step?
237255
@running_step
238256
end
239257

240-
def started?
241-
completed.any? || current.present?
242-
end
243-
244258
def completed?(name)
245259
completed.include?(name)
246260
end
247261

248-
def validate_step!(name)
249-
raise InvalidStepError, "Step '#{name}' must be a Symbol, found '#{name.class}'" unless name.is_a?(Symbol)
250-
raise InvalidStepError, "Step '#{name}' has already been encountered" if encountered_step_names.include?(name)
251-
raise InvalidStepError, "Step '#{name}' is nested inside step '#{current.name}'" if running_step?
252-
raise InvalidStepError, "Step '#{name}' found, expected to resume from '#{current.name}'" if current && current.name != name && !completed?(name)
253-
254-
encountered_step_names << name
255-
end
256-
257262
def new_step(*args, **options)
258-
Step.new(*args, **options) { checkpoint! }
263+
Step.new(*args, job: job, **options)
259264
end
260265

261266
def skip_step(name)
@@ -273,49 +278,24 @@ def run_step(name, start:, &block)
273278
@completed << current.name
274279
@current = nil
275280
@advanced = true
276-
277-
checkpoint!
278281
ensure
279282
@running_step = false
280283
@advanced ||= current&.advanced?
281284
end
282285

283-
def interrupt!
284-
instrument_job :interrupt
285-
raise Interrupt, "Interrupted #{description}"
286-
end
287-
288-
def checkpoint!
289-
interrupt! if job.queue_adapter.stopping?
290-
end
286+
def instrumenting_step(step, &block)
287+
instrument :step, step: step, interrupted: false do |payload|
288+
instrument :step_started, step: step
291289

292-
def wrapping_errors_after_advancing(&block)
293-
block.call
294-
rescue StandardError => e
295-
if !e.is_a?(Error) && advanced?
296-
raise AfterAdvancingError, "Advanced job failed with error: #{e.message}"
297-
else
290+
block.call
291+
rescue Interrupt
292+
payload[:interrupted] = true
298293
raise
299294
end
300295
end
301296

302-
def instrumenting_step(step, &block)
303-
instrument (step.resumed? ? :step_resumed : :step_started), step: step
304-
305-
block.call
306-
307-
instrument :step_completed, step: step
308-
rescue Interrupt
309-
instrument :step_interrupted, step: step
310-
raise
311-
end
312-
313-
def instrument_job(event)
314-
instrument event, description: description, completed_steps: completed, current_step: current
315-
end
316-
317-
def instrument(event, payload = {})
318-
job.instrument event, **payload
297+
def instrument(...)
298+
job.instrument(...)
319299
end
320300
end
321301
end

activejob/lib/active_job/continuation/step.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,18 @@ class Step
2222
# The cursor for the step.
2323
attr_reader :cursor
2424

25-
def initialize(name, cursor, resumed:, &checkpoint_callback)
25+
def initialize(name, cursor, job:, resumed:)
2626
@name = name.to_sym
2727
@initial_cursor = cursor
2828
@cursor = cursor
2929
@resumed = resumed
30-
@checkpoint_callback = checkpoint_callback
30+
@job = job
3131
end
3232

3333
# Check if the job should be interrupted, and if so raise an Interrupt exception.
3434
# The job will be requeued for retry.
3535
def checkpoint!
36-
checkpoint_callback.call
36+
job.checkpoint!
3737
end
3838

3939
# Set the cursor and interrupt the job if necessary.
@@ -77,7 +77,7 @@ def description
7777
end
7878

7979
private
80-
attr_reader :checkpoint_callback, :initial_cursor
80+
attr_reader :initial_cursor, :job
8181
end
8282
end
8383
end

activejob/lib/active_job/continuation/test_helper.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ def interrupt_job_during_step(job, step, cursor: nil, &block)
4040

4141
# Interrupt a job after a step.
4242
#
43+
# Note that there's no checkpoint after the final step so it won't be interrupted.
44+
#
4345
# class MyJob < ApplicationJob
4446
# include ActiveJob::Continuable
4547
#

0 commit comments

Comments
 (0)