From 32e4ab8c99ca0b398ea0db425a6f06d4c91ac970 Mon Sep 17 00:00:00 2001 From: Pissardo Date: Fri, 15 Aug 2025 16:11:42 -0300 Subject: [PATCH] feat: improving usage memory --- lib/active_job/jobs_relation.rb | 31 ++++++++++++++++++-- test/active_job/jobs_relation_memory_test.rb | 22 ++++++++++++++ 2 files changed, 50 insertions(+), 3 deletions(-) create mode 100644 test/active_job/jobs_relation_memory_test.rb diff --git a/lib/active_job/jobs_relation.rb b/lib/active_job/jobs_relation.rb index 0d71e5e4..6d51561f 100644 --- a/lib/active_job/jobs_relation.rb +++ b/lib/active_job/jobs_relation.rb @@ -28,7 +28,8 @@ class ActiveJob::JobsRelation PROPERTIES = %i[ queue_name status offset_value limit_value job_class_name worker_id recurring_task_id finished_at ] attr_reader *PROPERTIES, :default_page_size - delegate :last, :[], :reverse, to: :to_a + # delegate :last, :[], :reverse, to: :to_a + # No delegation to :to_a until we define it below. delegate :logger, to: MissionControl::Jobs ALL_JOBS_LIMIT = 100_000_000 # When no limit value it defaults to "all jobs" @@ -121,10 +122,33 @@ def to_s alias inspect to_s + # Enumerates over the jobs. If the relation has been materialized previously + # (via +to_a+ or explicit caching) we reuse the cached collection. Otherwise + # we stream the jobs without retaining them, reducing memory consumption. def each(&block) - loaded_jobs&.each(&block) || load_jobs(&block) + if loaded? + loaded_jobs.each(&block) + else + perform_each(&block) + end + end + + # Materializes the relation into an Array **and** caches the result so later + # calls don't need to hit the adapter again. This maintains backwards + # compatibility with existing code that relied on this caching behaviour. + def to_a + return loaded_jobs if loaded? + + @loaded_jobs = [] + perform_each { |job| @loaded_jobs << job } + @loaded_jobs end + # With the custom #to_a we can now safely delegate additional Enumerable + # methods that rely on materialization without affecting memory when not + # needed. + delegate :last, :[], :reverse, to: :to_a + # Retry all the jobs in the queue. # # This operation is only valid for sets of failed jobs. It will @@ -239,10 +263,11 @@ def query_count end def load_jobs + # Cache the jobs only when an explicit materialization is requested (e.g. via to_a) @loaded_jobs = [] perform_each do |job| @loaded_jobs << job - yield job + yield job if block_given? end end diff --git a/test/active_job/jobs_relation_memory_test.rb b/test/active_job/jobs_relation_memory_test.rb new file mode 100644 index 00000000..cc07a62b --- /dev/null +++ b/test/active_job/jobs_relation_memory_test.rb @@ -0,0 +1,22 @@ +require "test_helper" + +class ActiveJob::JobsRelationMemoryTest < ActiveSupport::TestCase + setup do + @jobs = ActiveJob::JobsRelation.new + end + + test "does not cache jobs when iterating with each" do + # Stub adapter calls + ActiveJob::Base.queue_adapter.expects(:fetch_jobs).twice.returns([ :job_1, :job_2 ], []) + ActiveJob::Base.queue_adapter.expects(:supports_job_filter?).at_least_once.returns(true) + + jobs = @jobs.where(queue_name: "my_queue") + + collected = [] + jobs.each { |job| collected << job } + assert_equal [ :job_1, :job_2 ], collected + + # Ensure the internal loaded_jobs cache is still nil to confirm no caching happened + assert_nil jobs.instance_variable_get(:@loaded_jobs) + end +end