Skip to content

Commit 0ac3fba

Browse files
authored
Active Job Continuations (rails#55127)
Continuations provide a mechanism for interrupting and resuming jobs. This allows long running jobs to make progress across application restarts. Jobs should include the `ActiveJob::Continuable` module to enable continuations. Continuable jobs are automatically retried when interrupted. Use the `step` method to define the steps in your job. Steps can use an optional cursor to track progress in the step. Steps are executed as soon as they are encountered. If a job is interrupted, previously completed steps will be skipped. If a step is in progress, it will be resumed with the last recorded cursor. Code that is not part of a step will be executed on each job execution. You can pass a block or a method name to the step method. The block will be called with the step object as an argument. Methods can either take no arguments or a single argument for the step object. ```ruby class ProcessImportJob < ApplicationJob include ActiveJob::Continuable def perform(import_id) # This always runs, even if the job is resumed. @import = Import.find(import_id) step :validate do @import.validate! end step :process_records do |step| @import.records.find_each(start: step.cursor) record.process step.advance! from: record.id end end step :reprocess_records step :finalize end def reprocess_records(step) @import.records.find_each(start: step.cursor) record.reprocess step.advance! from: record.id end end def finalize @import.finalize! end end ``` **Cursors** Cursors are used to track progress within a step. The cursor can be any object that is serializable as an argument to `ActiveJob::Base.serialize`. It defaults to `nil`. When a step is resumed, the last cursor value is restored. The code in the step is responsible for using the cursor to continue from the right point. `set!` sets the cursor to a specific value. ```ruby step :iterate_items do |step| items[step.cursor..].each do |item| process(item) step.set! (step.cursor || 0) + 1 end end ``` A starting value for the cursor can be set when defining the step: ```ruby step :iterate_items, start: 0 do |step| items[step.cursor..].each do |item| process(item) step.set! step.cursor + 1 end end ``` The cursor can be advanced with `advance!`. This calls `succ` on the current cursor value. It raises an `ActiveJob::Continuation::UnadvanceableCursorError` if the cursor does not implement `succ`. ```ruby step :iterate_items, start: 0 do |step| items[step.cursor..].each do |item| process(item) step.advance! end end ``` You can optionally pass a `from` argument to `advance!`. This is useful when iterating over a collection of records where IDs may not be contiguous. ```ruby step :process_records do |step| import.records.find_each(start: step.cursor) record.process step.advance! from: record.id end end ``` You can use an array to iterate over nested records: ```ruby step :process_nested_records, start: [ 0, 0 ] do |step| Account.find_each(start: step.cursor[0]) do |account| account.records.find_each(start: step.cursor[1]) do |record| record.process step.set! [ account.id, record.id + 1 ] end step.set! [ account.id + 1, 0 ] end end ``` Setting or advancing the cursor creates a checkpoint. You can also create a checkpoint manually by calling the `checkpoint!` method on the step. This is useful if you want to allow interruptions, but don't need to update the cursor. ```ruby step :destroy_records do |step| import.records.find_each do |record| record.destroy! step.checkpoint! end end ``` **Checkpoints** A checkpoint is where a job can be interrupted. At a checkpoint the job will call `queue_adapter.stopping?`. If it returns true, the job will raise an `ActiveJob::Continuation::Interrupt` exception. There is an automatic checkpoint at the end of each step. Within a step calling one is created when calling `set!`, `advance!` or `checkpoint!`. Jobs are not automatically interrupted when the queue adapter is marked as stopping - they will continue to run either until the next checkpoint, or when the process is stopped. This is to allow jobs to be interrupted at a safe point, but it also means that the jobs should checkpoint more frequently than the shutdown timeout to ensure a graceful restart. When interrupted, the job will automatically retry with the progress serialized in the job data under the `continuation` key. The serialized progress contains: - a list of the completed steps - the current step and its cursor value (if one is in progress) **Errors** If a job raises an error and is not retried via ActiveJob, it will be passed back to the queue adapter and any progress in this execution will be lost. To mitigate this, the job will automatically retried if it raises an error after it has made progress. Making progress is defined as having completed a step or advanced the cursor within the current step. **Queue Adapter support** Active Job Continuations call the `stopping?` method on the queue adapter to check if we are in the shutdown phase. By default this will return false, so the adapters will need to be updated to implement this method. This implements the `stopping?` method in the test and Sidekiq adapters. It would be possible to add support to Delayed Job via a plugin, but it would probably be better to add a new lifecycle callback to DJ for when it is shutting down. Resque also will require a new hook before it can be supported. Solid Queue's adapter is not part of Rails, but support can be added there via the `on_worker_stop` hook. **Inspiration** This took a lot inspiration from Shopify's [job-iteration](https://github.com/Shopify/job-iteration) gem. The main differences are: - Continuations are Active Job only, so they don't provide the custom enumerators that job-iteration does. - They allow multi-step flows - They don't intercept the perform method - Continuations are a sharp knife - you need to manually checkpoint and update the cursor. But you could build a job-iteration-like API on top of them. **Future work** It would be a good exercise to see if the job-iteration gem could be adapted to run on top of Active Job Continuations to highlight any missing features - we'd want to add things like max iteration time, max job runtime and forcing a job to stop. Another thing to consider is a mechanism for checking whether it is safe to call a checkpoint. Ideally you wouldn't allow them within database transactions as they'll cause a rollback. We can maybe inject checkpoint safety handlers and add a default one that checks whether we are in any active transactions.
1 parent b5b6900 commit 0ac3fba

29 files changed

+1294
-53
lines changed

activejob/CHANGELOG.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,42 @@
1+
* Allow jobs to the interrupted and resumed with Continuations
2+
3+
A job can use Continuations by including the `ActiveJob::Continuable`
4+
concern. Continuations split jobs into steps. When the queuing system
5+
is shutting down jobs can be interrupted and their progress saved.
6+
7+
```ruby
8+
class ProcessImportJob
9+
include ActiveJob::Continuable
10+
11+
def perform(import_id)
12+
@import = Import.find(import_id)
13+
14+
# block format
15+
step :initialize do
16+
@import.initialize
17+
end
18+
19+
# step with cursor, the cursor is saved when the job is interrupted
20+
step :process do |step|
21+
@import.records.find_each(start: step.cursor) do |record|
22+
record.process
23+
step.advance! from: record.id
24+
end
25+
end
26+
27+
# method format
28+
step :finalize
29+
30+
private
31+
def finalize
32+
@import.finalize
33+
end
34+
end
35+
end
36+
```
37+
38+
*Donal McBreen*
39+
140
* Defer invocation of ActiveJob enqueue callbacks until after commit when
241
`enqueue_after_transaction_commit` is enabled.
342

activejob/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ their gem, or as a stand-alone gem. For discussion about this see the
9595
following PRs: [23311](https://github.com/rails/rails/issues/23311#issuecomment-176275718),
9696
[21406](https://github.com/rails/rails/pull/21406#issuecomment-138813484), and [#32285](https://github.com/rails/rails/pull/32285).
9797

98+
## Continuations
99+
100+
Continuations allow jobs to be interrupted and resumed. See more at ActiveJob::Continuation.
101+
98102

99103
## Download and installation
100104

activejob/lib/active_job.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ module ActiveJob
4141
autoload :SerializationError, "active_job/arguments"
4242
autoload :UnknownJobClassError, "active_job/core"
4343
autoload :EnqueueAfterTransactionCommit
44+
autoload :Continuation
4445

4546
eager_autoload do
4647
autoload :Serializers
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# frozen_string_literal: true
2+
3+
module ActiveJob
4+
# = Active Job Continuable
5+
#
6+
# Mix ActiveJob::Continuable into your job to enable continuations.
7+
#
8+
# See +ActiveJob::Continuation+ for usage. # The Continuable module provides the ability to track the progress of your jobs,
9+
# and continue from where they left off if interrupted.
10+
#
11+
module Continuable
12+
extend ActiveSupport::Concern
13+
14+
CONTINUATION_KEY = "continuation"
15+
16+
included do
17+
retry_on Continuation::Interrupt, attempts: :unlimited
18+
retry_on Continuation::AfterAdvancingError, attempts: :unlimited
19+
20+
around_perform :continue
21+
end
22+
23+
def step(step_name, start: nil, &block)
24+
continuation.step(step_name, start: start) do |step|
25+
if block_given?
26+
block.call(step)
27+
else
28+
step_method = method(step_name)
29+
30+
raise ArgumentError, "Step method '#{step_name}' must accept 0 or 1 arguments" if step_method.arity > 1
31+
32+
if step_method.parameters.any? { |type, name| type == :key || type == :keyreq }
33+
raise ArgumentError, "Step method '#{step_name}' must not accept keyword arguments"
34+
end
35+
36+
step_method.arity == 0 ? step_method.call : step_method.call(step)
37+
end
38+
end
39+
end
40+
41+
def serialize
42+
super.merge(CONTINUATION_KEY => continuation.to_h)
43+
end
44+
45+
def deserialize(job_data)
46+
super
47+
@continuation = Continuation.new(self, job_data.fetch(CONTINUATION_KEY, {}))
48+
end
49+
50+
private
51+
def continuation
52+
@continuation ||= Continuation.new(self, {})
53+
end
54+
55+
def continue(&block)
56+
continuation.continue(&block)
57+
end
58+
end
59+
end

0 commit comments

Comments
 (0)