diff --git a/README.md b/README.md index cc3d4332..4f27bfc5 100644 --- a/README.md +++ b/README.md @@ -229,6 +229,14 @@ ActiveJob.jobs.finished.where(job_class_name: "SomeJob") # For adapters that support filtering by worker: # All jobs in progress being run by a given worker ActiveJob.jobs.in_progress.where(worker_id: 42) + +# Using date filters +# You can filter by: enqueued_at, scheduled_at or finished_at +ActiveJob.jobs.pending.where(enqueued_at: 2.days.ago) +ActiveJob.jobs.pending.where(scheduled_at: Date.today) + +date_range = (Time.parse("2024-11-01")..Time.parse("2024-12-01")) +ActiveJob.jobs.finished.where(finished_at: date_range) ``` Some examples of bulk operations: diff --git a/lib/active_job/jobs_relation.rb b/lib/active_job/jobs_relation.rb index 60cb1abc..42503b9f 100644 --- a/lib/active_job/jobs_relation.rb +++ b/lib/active_job/jobs_relation.rb @@ -23,9 +23,9 @@ class ActiveJob::JobsRelation include Enumerable STATUSES = %i[ pending failed in_progress blocked scheduled finished ] - FILTERS = %i[ queue_name job_class_name ] + FILTERS = %i[ queue_name job_class_name finished_at scheduled_at enqueued_at ] - PROPERTIES = %i[ queue_name status offset_value limit_value job_class_name worker_id recurring_task_id ] + PROPERTIES = %i[ queue_name status offset_value limit_value job_class_name worker_id recurring_task_id finished_at scheduled_at enqueued_at ] attr_reader *PROPERTIES, :default_page_size delegate :last, :[], :reverse, to: :to_a @@ -51,12 +51,15 @@ def initialize(queue_adapter: ActiveJob::Base.queue_adapter, default_page_size: # * :queue_name - To only include the jobs in the provided queue. # * :worker_id - To only include the jobs processed by the provided worker. # * :recurring_task_id - To only include the jobs corresponding to runs of a recurring task. - def where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil) + def where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil, finished_at: nil, scheduled_at: nil, enqueued_at: nil) # Remove nil arguments to avoid overriding parameters when concatenating +where+ clauses arguments = { job_class_name: job_class_name, queue_name: queue_name, worker_id: worker_id, - recurring_task_id: recurring_task_id + recurring_task_id: recurring_task_id, + finished_at: finished_at, + scheduled_at: scheduled_at, + enqueued_at: enqueued_at }.compact.collect { |key, value| [ key, value.to_s ] }.to_h clone_with **arguments @@ -264,8 +267,31 @@ def filter(jobs) jobs.filter { |job| satisfy_filter?(job) } end + def satisfy_date_filter?(filter_value, job_value) + return false if job_value.nil? + + # Treat date ranges + if filter_value.include?("..") + start_date, end_date = filter_value.split("..").map { |date| Time.zone.parse(date) } + filter_range = (start_date..end_date) + return filter_range.cover?(job_value) + end + + filter = Time.zone.parse(filter_value) + job_value >= filter + end + def satisfy_filter?(job) - filters.all? { |property| public_send(property) == job.public_send(property) } + filters.all? do |property| + filter_value = public_send(property) + job_value = job.public_send(property) + + is_date_filter?(property) ? satisfy_date_filter?(filter_value, job_value) : filter_value == job_value + end + end + + def is_date_filter?(property) + [ :finished_at, :scheduled_at, :enqueued_at ].include?(property) end def filters