Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions lib/active_job/jobs_relation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class ActiveJob::JobsRelation
PROPERTIES = %i[ queue_name status offset_value limit_value job_class_name worker_id recurring_task_id finished_at ]
attr_reader *PROPERTIES, :default_page_size

delegate :last, :[], :reverse, to: :to_a
# delegate :last, :[], :reverse, to: :to_a
# No delegation to :to_a until we define it below.
delegate :logger, to: MissionControl::Jobs

ALL_JOBS_LIMIT = 100_000_000 # When no limit value it defaults to "all jobs"
Expand Down Expand Up @@ -121,10 +122,33 @@ def to_s

alias inspect to_s

# Enumerates over the jobs. If the relation has been materialized previously
# (via +to_a+ or explicit caching) we reuse the cached collection. Otherwise
# we stream the jobs without retaining them, reducing memory consumption.
def each(&block)
loaded_jobs&.each(&block) || load_jobs(&block)
if loaded?
loaded_jobs.each(&block)
else
perform_each(&block)
end
end

# Materializes the relation into an Array **and** caches the result so later
# calls don't need to hit the adapter again. This maintains backwards
# compatibility with existing code that relied on this caching behaviour.
def to_a
return loaded_jobs if loaded?

@loaded_jobs = []
perform_each { |job| @loaded_jobs << job }
@loaded_jobs
end

# With the custom #to_a we can now safely delegate additional Enumerable
# methods that rely on materialization without affecting memory when not
# needed.
delegate :last, :[], :reverse, to: :to_a

# Retry all the jobs in the queue.
#
# This operation is only valid for sets of failed jobs. It will
Expand Down Expand Up @@ -239,10 +263,11 @@ def query_count
end

def load_jobs
# Cache the jobs only when an explicit materialization is requested (e.g. via to_a)
@loaded_jobs = []
perform_each do |job|
@loaded_jobs << job
yield job
yield job if block_given?
end
end

Expand Down
22 changes: 22 additions & 0 deletions test/active_job/jobs_relation_memory_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
require "test_helper"

class ActiveJob::JobsRelationMemoryTest < ActiveSupport::TestCase
setup do
@jobs = ActiveJob::JobsRelation.new
end

test "does not cache jobs when iterating with each" do
# Stub adapter calls
ActiveJob::Base.queue_adapter.expects(:fetch_jobs).twice.returns([ :job_1, :job_2 ], [])
ActiveJob::Base.queue_adapter.expects(:supports_job_filter?).at_least_once.returns(true)

jobs = @jobs.where(queue_name: "my_queue")

collected = []
jobs.each { |job| collected << job }
assert_equal [ :job_1, :job_2 ], collected

# Ensure the internal loaded_jobs cache is still nil to confirm no caching happened
assert_nil jobs.instance_variable_get(:@loaded_jobs)
end
end