From a2e2e4305928b8f79098237292a7ec9b8231a57a Mon Sep 17 00:00:00 2001 From: Joel Warrington Date: Tue, 25 Feb 2025 01:40:07 -0700 Subject: [PATCH 1/6] Add capability to discard duplicate jobs with concurrency configuration Remove 'duplicate' verbiage and use concurrency limits instead, simplify control flow Fix race condition vulnerability by changing logic to enqueue Add assertions when bulk enqueuing jobs with concurrency controls Dispatch jobs in the order they were enqueued Set ActiveJob successfully_enqueued for both enqueued/blocked and discarded jobs Change concurrency 'at_limit' -> 'on_conflict' Update discard logic to trigger an ActiveRecord rollback when attempting dispatch to prevent discarded job creation Change default on_conflict concurrency option to old behaviour (blocking execution) Add concurrent on_conflict documentation to README Add test for discarding grouped concurrent jobs Fix tests which expect raising enqueue errors Add test to confirm scheduled jobs are also discarded --- README.md | 36 ++++- app/models/solid_queue/job.rb | 23 ++-- .../solid_queue/job/concurrency_controls.rb | 6 +- app/models/solid_queue/job/executable.rb | 13 +- lib/active_job/concurrency_controls.rb | 4 +- test/models/solid_queue/job_test.rb | 124 +++++++++++++++++- 6 files changed, 187 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 5c18516a..9a24bc50 100644 --- a/README.md +++ b/README.md @@ -428,11 +428,11 @@ In the case of recurring tasks, if such error is raised when enqueuing the job c ## Concurrency controls -Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs are never discarded or lost, only blocked. +Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs can can be configured to either be discarded or blocked. ```ruby class MyJob < ApplicationJob - limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group + limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group, on_conflict: conflict_behaviour # ... ``` @@ -440,6 +440,9 @@ class MyJob < ApplicationJob - `to` is `1` by default. - `duration` is set to `SolidQueue.default_concurrency_control_period` by default, which itself defaults to `3 minutes`, but that you can configure as well. - `group` is used to control the concurrency of different job classes together. It defaults to the job class name. +- `on_conflict` controls behaviour when enqueuing a job which is above the max concurrent executions for your configuration. + - (default) `:block`; the job is blocked and is dispatched until another job completes and unblocks it + - `:discard`; the job is discarded When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping). @@ -482,6 +485,31 @@ Jobs are unblocked in order of priority but queue order is not taken into accoun Finally, failed jobs that are automatically or manually retried work in the same way as new jobs that get enqueued: they get in the queue for getting an open semaphore, and whenever they get it, they'll be run. It doesn't matter if they had already gotten an open semaphore in the past. +### Discarding conflicting jobs + +When configuring `on_conflict` with `:discard`, jobs enqueued above the concurrent execution limit are discarded and failed to be enqueued. + +```ruby +class ConcurrentJob < ApplicationJob + limits_concurrency key: ->(record) { record }, on_conflict: :discard + + def perform(user); end +end + +enqueued_job = ConcurrentJob.perform_later(record) +# => instance of ConcurrentJob +enqueued_job.successfully_enqueued? +# => true + +second_enqueued_job = ConcurrentJob.perform_later(record) do |job| + job.successfully_enqueued? + # => false +end + +second_enqueued_job +# => false +``` + ### Performance considerations Concurrency controls introduce significant overhead (blocked executions need to be created and promoted to ready, semaphores need to be created and updated) so you should consider carefully whether you need them. For throttling purposes, where you plan to have `limit` significantly larger than 1, I'd encourage relying on a limited number of workers per queue instead. For example: @@ -505,6 +533,10 @@ production: Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues). +### Discarding concurrent jobs + + + ## Failed jobs and retries Solid Queue doesn't include any automatic retry mechanism, it [relies on Active Job for this](https://edgeguides.rubyonrails.org/active_job_basics.html#retrying-or-discarding-failed-jobs). Jobs that fail will be kept in the system, and a _failed execution_ (a record in the `solid_queue_failed_executions` table) will be created for these. The job will stay there until manually discarded or re-enqueued. You can do this in a console as: diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index 8574c1ec..df4ab405 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -2,7 +2,7 @@ module SolidQueue class Job < Record - class EnqueueError < StandardError; end + class EnqueueError < ActiveJob::EnqueueError; end include Executable, Clearable, Recurrable @@ -10,19 +10,24 @@ class EnqueueError < StandardError; end class << self def enqueue_all(active_jobs) - active_jobs_by_job_id = active_jobs.index_by(&:job_id) + enqueued_jobs_count = 0 transaction do jobs = create_all_from_active_jobs(active_jobs) - prepare_all_for_execution(jobs).tap do |enqueued_jobs| - enqueued_jobs.each do |enqueued_job| - active_jobs_by_job_id[enqueued_job.active_job_id].provider_job_id = enqueued_job.id - active_jobs_by_job_id[enqueued_job.active_job_id].successfully_enqueued = true - end + prepare_all_for_execution(jobs) + jobs_by_active_job_id = jobs.index_by(&:active_job_id) + + active_jobs.each do |active_job| + job = jobs_by_active_job_id[active_job.job_id] + + active_job.provider_job_id = job&.id + active_job.enqueue_error = job&.enqueue_error + active_job.successfully_enqueued = job.present? && job.enqueue_error.nil? + enqueued_jobs_count += 1 if active_job.successfully_enqueued? end end - active_jobs.count(&:successfully_enqueued?) + enqueued_jobs_count end def enqueue(active_job, scheduled_at: Time.current) @@ -49,7 +54,7 @@ def create_from_active_job(active_job) def create_all_from_active_jobs(active_jobs) job_rows = active_jobs.map { |job| attributes_from_active_job(job) } insert_all(job_rows) - where(active_job_id: active_jobs.map(&:job_id)) + where(active_job_id: active_jobs.map(&:job_id)).order(id: :asc) end def attributes_from_active_job(active_job) diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 6ae12e28..6c37cab4 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -8,7 +8,7 @@ module ConcurrencyControls included do has_one :blocked_execution - delegate :concurrency_limit, :concurrency_duration, to: :job_class + delegate :concurrency_limit, :concurrency_on_conflict, :concurrency_duration, to: :job_class before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? } end @@ -34,6 +34,10 @@ def blocked? end private + def discard_concurrent? + concurrency_on_conflict == :discard + end + def acquire_concurrency_lock return true unless concurrency_limited? diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index e2146a67..28c1ac35 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -13,6 +13,8 @@ module Executable after_create :prepare_for_execution + attr_accessor :enqueue_error + scope :finished, -> { where.not(finished_at: nil) } end @@ -37,7 +39,13 @@ def dispatch_all_at_once(jobs) end def dispatch_all_one_by_one(jobs) - jobs.each(&:dispatch) + jobs.each do |job| + begin + job.dispatch + rescue EnqueueError => e + job.enqueue_error = e + end + end end def successfully_dispatched(jobs) @@ -66,6 +74,9 @@ def prepare_for_execution def dispatch if acquire_concurrency_lock then ready + elsif discard_concurrent? + discard + raise EnqueueError.new("Dispatched job discarded due to concurrent configuration.") else block end diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index 0ea290f6..ceb03e7e 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -11,15 +11,17 @@ module ConcurrencyControls class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false class_attribute :concurrency_limit + class_attribute :concurrency_on_conflict class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period end class_methods do - def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period) + def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block) self.concurrency_key = key self.concurrency_limit = to self.concurrency_group = group self.concurrency_duration = duration + self.concurrency_on_conflict = on_conflict end end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 486756ab..cb2fb377 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -10,6 +10,14 @@ def perform(job_result) end end + class DiscardedNonOverlappingJob < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard + end + + class DiscardedOverlappingJob < NonOverlappingJob + limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :discard + end + class NonOverlappingGroupedJob1 < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup" end @@ -18,8 +26,19 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup" end + class DiscardedNonOverlappingGroupedJob1 < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, group: "DiscardingGroup", on_conflict: :discard + end + + class DiscardedNonOverlappingGroupedJob2 < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, group: "DiscardingGroup", on_conflict: :discard + end + setup do @result = JobResult.create!(queue_name: "default") + @discarded_concurrent_error = SolidQueue::Job::EnqueueError.new( + "Dispatched job discarded due to concurrent configuration." + ) end test "enqueue active job to be executed right away" do @@ -98,6 +117,78 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob assert_equal active_job.concurrency_key, job.concurrency_key end + test "enqueue jobs with discarding concurrency controls" do + assert_ready do + active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A") + assert active_job.successfully_enqueued? + + assert_not DiscardedNonOverlappingJob.perform_later(@result, name: "B") do |overlapping_active_job| + assert_not overlapping_active_job.successfully_enqueued? + assert_equal @discarded_concurrent_error, overlapping_active_job.enqueue_error + end + end + end + + test "enqueue scheduled job with discarding concurrency controls" do + assert_ready do + active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A") + assert active_job.successfully_enqueued? + end + + scheduled_job_id = nil + + assert_scheduled do + scheduled_active_job = DiscardedNonOverlappingJob.set(wait: 0.5.seconds).perform_later(@result, name: "B") + assert scheduled_active_job.successfully_enqueued? + assert_nil scheduled_active_job.enqueue_error + + scheduled_job_id = scheduled_active_job.provider_job_id + end + + scheduled_job = SolidQueue::Job.find(scheduled_job_id) + wait_for { scheduled_job.due? } + + dispatched = SolidQueue::ScheduledExecution.dispatch_next_batch(10) + assert_equal 0, dispatched + assert_raises(ActiveRecord::RecordNotFound) { scheduled_job.reload } + end + + test "enqueues jobs in bulk with discarding concurrency controls" do + jobs = [ + job_1 = DiscardedNonOverlappingJob.new(@result, name: "A"), + job_2 = DiscardedNonOverlappingJob.new(@result, name: "B") + ] + + assert_job_counts(ready: 1, discarded: 1) do + enqueued_jobs_count = SolidQueue::Job.enqueue_all(jobs) + assert_equal enqueued_jobs_count, 1 + end + + assert job_1.successfully_enqueued? + assert_not job_2.successfully_enqueued? + assert_equal SolidQueue::Job::EnqueueError, job_2.enqueue_error.class + assert_equal @discarded_concurrent_error.message, job_2.enqueue_error.message + end + + test "enqueue jobs with discarding concurrency controls when below limit" do + assert_job_counts(ready: 2) do + assert_ready do + active_job = DiscardedOverlappingJob.perform_later(@result, name: "A") + assert active_job.successfully_enqueued? + end + + assert_ready do + active_job = DiscardedOverlappingJob.perform_later(@result, name: "B") + assert active_job.successfully_enqueued? + end + + assert_not DiscardedOverlappingJob.perform_later(@result, name: "C") do |overlapping_active_job| + assert_not overlapping_active_job.successfully_enqueued? + assert_equal @discarded_concurrent_error, overlapping_active_job.enqueue_error + end + end + end + test "enqueue jobs with concurrency controls in the same concurrency group" do assert_ready do active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A") @@ -112,6 +203,23 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob end end + test "enqueue jobs with discarding concurrency controls in the same concurrency group" do + assert_job_counts(ready: 1) do + assert_ready do + active_job = DiscardedNonOverlappingGroupedJob1.perform_later(@result, name: "A") + assert active_job.successfully_enqueued? + assert_equal 1, active_job.concurrency_limit + assert_equal "DiscardingGroup/JobResult/#{@result.id}", active_job.concurrency_key + end + + assert_not DiscardedNonOverlappingGroupedJob2.perform_later(@result, name: "B") do |blocked_active_job| + assert_not blocked_active_job.successfully_enqueued? + assert_equal 1, blocked_active_job.concurrency_limit + assert_equal "DiscardingGroup/JobResult/#{@result.id}", blocked_active_job.concurrency_key + end + end + end + test "enqueue multiple jobs" do active_jobs = [ AddToBufferJob.new(2), @@ -249,13 +357,15 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob test "raise EnqueueError when there's an ActiveRecordError" do SolidQueue::Job.stubs(:create!).raises(ActiveRecord::Deadlocked) - active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test") assert_raises SolidQueue::Job::EnqueueError do + active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test") SolidQueue::Job.enqueue(active_job) end - assert_raises SolidQueue::Job::EnqueueError do - AddToBufferJob.perform_later(1) + # #perform_later doesn't raise ActiveJob::EnqueueError, and instead set's successfully_enqueued? to false + assert_not AddToBufferJob.perform_later(1) do |active_job| + assert_not active_job.successfully_enqueued? + assert_equal SolidQueue::Job::EnqueueError, active_job.enqueue_error.class end end @@ -291,8 +401,12 @@ def assert_blocked(&block) assert SolidQueue::Job.last.blocked? end - def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, &block) - assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked) do + def assert_discarded(&block) + assert_job_counts(discarded: 1, &block) + end + + def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, discarded: 0, &block) + assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked + discarded) do assert_difference -> { SolidQueue::ReadyExecution.count }, +ready do assert_difference -> { SolidQueue::ScheduledExecution.count }, +scheduled do assert_difference -> { SolidQueue::BlockedExecution.count }, +blocked, &block From f0ea9683a7be59914cbc4d7be26e31512df34ef0 Mon Sep 17 00:00:00 2001 From: Joel Warrington Date: Mon, 23 Jun 2025 07:39:44 -0600 Subject: [PATCH 2/6] Fix typo Co-authored-by: Philippe Tring --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9a24bc50..339aea03 100644 --- a/README.md +++ b/README.md @@ -428,7 +428,7 @@ In the case of recurring tasks, if such error is raised when enqueuing the job c ## Concurrency controls -Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs can can be configured to either be discarded or blocked. +Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs can be configured to either be discarded or blocked. ```ruby class MyJob < ApplicationJob From d8e0bd11027d00f72b0a6e3d955d891595452300 Mon Sep 17 00:00:00 2001 From: mhenrixon Date: Wed, 25 Jun 2025 09:24:54 +0300 Subject: [PATCH 3/6] Allow duplicate jobs to be discarded --- .../solid_queue/job/concurrency_controls.rb | 2 +- app/models/solid_queue/job/executable.rb | 11 +- lib/active_job/concurrency_controls.rb | 1 + .../dummy/app/jobs/discard_on_conflict_job.rb | 7 + test/dummy/app/jobs/limited_discard_job.rb | 8 + test/integration/concurrency_discard_test.rb | 137 ++++++++++++++++ test/models/solid_queue/job_test.rb | 76 +++++++++ test/unit/concurrency_discard_test.rb | 154 ++++++++++++++++++ 8 files changed, 394 insertions(+), 2 deletions(-) create mode 100644 test/dummy/app/jobs/discard_on_conflict_job.rb create mode 100644 test/dummy/app/jobs/limited_discard_job.rb create mode 100644 test/integration/concurrency_discard_test.rb create mode 100644 test/unit/concurrency_discard_test.rb diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 6c37cab4..7295bc3f 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -8,7 +8,7 @@ module ConcurrencyControls included do has_one :blocked_execution - delegate :concurrency_limit, :concurrency_on_conflict, :concurrency_duration, to: :job_class + delegate :concurrency_limit, :concurrency_duration, :concurrency_on_conflict, to: :job_class before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? } end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index 28c1ac35..5869367b 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -78,7 +78,12 @@ def dispatch discard raise EnqueueError.new("Dispatched job discarded due to concurrent configuration.") else - block + case job_class.concurrency_on_conflict + when :discard + discard_on_conflict + else + block + end end end @@ -115,6 +120,10 @@ def ready ReadyExecution.create_or_find_by!(job_id: id) end + def discard_on_conflict + finished! + end + def execution %w[ ready claimed failed ].reduce(nil) { |acc, status| acc || public_send("#{status}_execution") } end diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index ceb03e7e..4fc609bd 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -13,6 +13,7 @@ module ConcurrencyControls class_attribute :concurrency_limit class_attribute :concurrency_on_conflict class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period + class_attribute :concurrency_on_conflict, default: :block end class_methods do diff --git a/test/dummy/app/jobs/discard_on_conflict_job.rb b/test/dummy/app/jobs/discard_on_conflict_job.rb new file mode 100644 index 00000000..64da39b3 --- /dev/null +++ b/test/dummy/app/jobs/discard_on_conflict_job.rb @@ -0,0 +1,7 @@ +class DiscardOnConflictJob < ApplicationJob + limits_concurrency to: 1, key: ->(value) { value }, on_conflict: :discard + + def perform(value) + Rails.logger.info "Performing DiscardOnConflictJob with value: #{value}" + end +end diff --git a/test/dummy/app/jobs/limited_discard_job.rb b/test/dummy/app/jobs/limited_discard_job.rb new file mode 100644 index 00000000..ee7174c2 --- /dev/null +++ b/test/dummy/app/jobs/limited_discard_job.rb @@ -0,0 +1,8 @@ +class LimitedDiscardJob < ApplicationJob + limits_concurrency to: 2, key: ->(group, id) { group }, on_conflict: :discard + + def perform(group, id) + Rails.logger.info "Performing LimitedDiscardJob with group: #{group}, id: #{id}" + sleep 0.1 + end +end diff --git a/test/integration/concurrency_discard_test.rb b/test/integration/concurrency_discard_test.rb new file mode 100644 index 00000000..1ced7d4d --- /dev/null +++ b/test/integration/concurrency_discard_test.rb @@ -0,0 +1,137 @@ +# frozen_string_literal: true + +require "test_helper" + +class ConcurrencyDiscardTest < ActiveSupport::TestCase + setup do + @job_result = JobResult.create!(queue_name: "default", status: "test") + end + + test "discard jobs when concurrency limit is reached with on_conflict: :discard" do + # Enqueue first job - should be executed + job1 = DiscardOnConflictJob.perform_later(@job_result.id) + + # Enqueue second job - should be discarded due to concurrency limit + job2 = DiscardOnConflictJob.perform_later(@job_result.id) + + # Enqueue third job - should also be discarded + job3 = DiscardOnConflictJob.perform_later(@job_result.id) + + # Check that first job was ready + solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) + assert solid_job1.ready? + assert solid_job1.ready_execution.present? + + # Check that second and third jobs were discarded + solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) + assert solid_job2.finished? + assert_nil solid_job2.ready_execution + assert_nil solid_job2.blocked_execution + + solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id) + assert solid_job3.finished? + assert_nil solid_job3.ready_execution + assert_nil solid_job3.blocked_execution + end + + test "block jobs when concurrency limit is reached without on_conflict option" do + # Using SequentialUpdateResultJob which has default blocking behavior + # Enqueue first job - should be executed + job1 = SequentialUpdateResultJob.perform_later(@job_result, name: "A") + + # Enqueue second job - should be blocked due to concurrency limit + job2 = SequentialUpdateResultJob.perform_later(@job_result, name: "B") + + # Check that second job is blocked + solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) + assert solid_job2.blocked? + assert solid_job2.blocked_execution.present? + end + + test "respect concurrency limit with discard option" do + # Enqueue jobs with limit of 2 + job1 = LimitedDiscardJob.perform_later("group1", 1) + job2 = LimitedDiscardJob.perform_later("group1", 2) + job3 = LimitedDiscardJob.perform_later("group1", 3) # Should be discarded + job4 = LimitedDiscardJob.perform_later("group1", 4) # Should be discarded + + # Check that first two jobs are ready + solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) + solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) + assert solid_job1.ready? + assert solid_job2.ready? + + # Check that third and fourth jobs are discarded + solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id) + solid_job4 = SolidQueue::Job.find_by(active_job_id: job4.job_id) + assert solid_job3.finished? + assert solid_job4.finished? + assert_nil solid_job3.ready_execution + assert_nil solid_job4.ready_execution + end + + test "discard option works with different concurrency keys" do + # These should not conflict because they have different keys + job1 = DiscardOnConflictJob.perform_later("key1") + job2 = DiscardOnConflictJob.perform_later("key2") + job3 = DiscardOnConflictJob.perform_later("key1") # Should be discarded + + # Check that first two jobs are ready (different keys) + solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) + solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) + assert solid_job1.ready? + assert solid_job2.ready? + + # Check that third job is discarded (same key as first) + solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id) + assert solid_job3.finished? + assert_nil solid_job3.ready_execution + end + + test "discarded jobs do not unblock other jobs" do + # Enqueue a job that will be executed + job1 = DiscardOnConflictJob.perform_later(@job_result.id) + + # Enqueue a job that will be discarded + job2 = DiscardOnConflictJob.perform_later(@job_result.id) + + # The first job should be ready + solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) + assert solid_job1.ready? + + # The second job should be discarded immediately + solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) + assert solid_job2.finished? + + # Complete the first job and release its lock + solid_job1.unblock_next_blocked_job + solid_job1.finished! + + # Enqueue another job - it should be ready since the lock is released + job3 = DiscardOnConflictJob.perform_later(@job_result.id) + solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id) + assert solid_job3.ready? + end + + test "discarded jobs are marked as finished without execution" do + # Enqueue a job that will be ready + job1 = DiscardOnConflictJob.perform_later("test_key") + + # Enqueue a job that will be discarded + job2 = DiscardOnConflictJob.perform_later("test_key") + + solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) + solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) + + # First job should be ready + assert solid_job1.ready? + assert solid_job1.ready_execution.present? + + # Second job should be finished without any execution + assert solid_job2.finished? + assert_nil solid_job2.ready_execution + assert_nil solid_job2.claimed_execution + assert_nil solid_job2.failed_execution + assert_nil solid_job2.blocked_execution + end +end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index cb2fb377..9cdc1b6f 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -64,6 +64,82 @@ class DiscardedNonOverlappingGroupedJob2 < NonOverlappingJob assert_equal 8, execution.priority end + test "enqueue jobs with on_conflict discard" do + # First job should be ready + active_job1 = NonOverlappingDiscardJob.new(@result) + assert_ready do + SolidQueue::Job.enqueue(active_job1) + end + job1 = SolidQueue::Job.find_by(active_job_id: active_job1.job_id) + assert job1.ready? + + # Second job should be discarded (finished without execution) + active_job2 = NonOverlappingDiscardJob.new(@result) + assert_no_difference -> { SolidQueue::ReadyExecution.count } do + assert_no_difference -> { SolidQueue::BlockedExecution.count } do + SolidQueue::Job.enqueue(active_job2) + end + end + job2 = SolidQueue::Job.find_by(active_job_id: active_job2.job_id) + + assert job2.finished? + assert_nil job2.ready_execution + assert_nil job2.blocked_execution + assert_nil job2.claimed_execution + assert_nil job2.failed_execution + + # Third job with same key should also be discarded + active_job3 = NonOverlappingDiscardJob.new(@result) + assert_no_difference -> { SolidQueue::ReadyExecution.count } do + SolidQueue::Job.enqueue(active_job3) + end + job3 = SolidQueue::Job.find_by(active_job_id: active_job3.job_id) + + assert job3.finished? + end + + test "compare blocking vs discard behavior" do + # Test default blocking behavior + blocking_job1 = NonOverlappingJob.new(@result) + assert_ready do + SolidQueue::Job.enqueue(blocking_job1) + end + job1 = SolidQueue::Job.find_by(active_job_id: blocking_job1.job_id) + assert job1.ready? + + # Second job should be blocked (not discarded) + blocking_job2 = NonOverlappingJob.new(@result) + assert_difference -> { SolidQueue::BlockedExecution.count }, +1 do + SolidQueue::Job.enqueue(blocking_job2) + end + job2 = SolidQueue::Job.find_by(active_job_id: blocking_job2.job_id) + assert job2.blocked? + assert job2.blocked_execution.present? + assert_not job2.finished? + + # Clean up for discard test + SolidQueue::Job.destroy_all + SolidQueue::Semaphore.destroy_all + + # Test discard behavior + discard_job1 = NonOverlappingDiscardJob.new(@result) + assert_ready do + SolidQueue::Job.enqueue(discard_job1) + end + job3 = SolidQueue::Job.find_by(active_job_id: discard_job1.job_id) + assert job3.ready? + + # Second job should be discarded (not blocked) + discard_job2 = NonOverlappingDiscardJob.new(@result) + assert_no_difference -> { SolidQueue::BlockedExecution.count } do + SolidQueue::Job.enqueue(discard_job2) + end + job4 = SolidQueue::Job.find_by(active_job_id: discard_job2.job_id) + assert job4.finished? + assert_nil job4.blocked_execution + assert_nil job4.ready_execution + end + test "enqueue active job to be scheduled in the future" do active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test") diff --git a/test/unit/concurrency_discard_test.rb b/test/unit/concurrency_discard_test.rb new file mode 100644 index 00000000..ae31fa55 --- /dev/null +++ b/test/unit/concurrency_discard_test.rb @@ -0,0 +1,154 @@ +# frozen_string_literal: true + +require "test_helper" + +module SolidQueue + class ConcurrencyDiscardTest < ActiveSupport::TestCase + class DiscardOnConflictJob < ApplicationJob + limits_concurrency to: 1, key: ->(value) { value }, on_conflict: :discard + + def perform(value) + # Job implementation + end + end + + class DefaultBlockingJob < ApplicationJob + limits_concurrency to: 1, key: ->(value) { value } + + def perform(value) + # Job implementation + end + end + + test "job with on_conflict: :discard is finished when concurrency limit is reached" do + # Create first job that will acquire the lock + active_job1 = DiscardOnConflictJob.new("test_key") + active_job1.job_id = "job1" + Job.enqueue(active_job1) + job1 = Job.find_by(active_job_id: active_job1.job_id) + + # First job should be ready + assert job1.ready? + assert job1.ready_execution.present? + + # Create second job that should be discarded + active_job2 = DiscardOnConflictJob.new("test_key") + active_job2.job_id = "job2" + Job.enqueue(active_job2) + job2 = Job.find_by(active_job_id: active_job2.job_id) + + # Second job should be finished without any execution + assert job2.finished? + assert_nil job2.ready_execution + assert_nil job2.blocked_execution + assert_nil job2.claimed_execution + assert_nil job2.failed_execution + end + + test "job without on_conflict option is blocked when concurrency limit is reached" do + # Create first job that will acquire the lock + active_job1 = DefaultBlockingJob.new("test_key") + active_job1.job_id = "job1" + Job.enqueue(active_job1) + job1 = Job.find_by(active_job_id: active_job1.job_id) + + # First job should be ready + assert job1.ready? + assert job1.ready_execution.present? + + # Create second job that should be blocked + active_job2 = DefaultBlockingJob.new("test_key") + active_job2.job_id = "job2" + Job.enqueue(active_job2) + job2 = Job.find_by(active_job_id: active_job2.job_id) + + # Second job should be blocked + assert job2.blocked? + assert job2.blocked_execution.present? + assert_nil job2.ready_execution + assert_not job2.finished? + end + + test "concurrency_on_conflict attribute is properly set" do + assert_equal :discard, DiscardOnConflictJob.concurrency_on_conflict + assert_equal :block, DefaultBlockingJob.concurrency_on_conflict + end + + test "multiple jobs with same key are discarded when using on_conflict: :discard" do + # Create first job + active_job1 = DiscardOnConflictJob.new("shared_key") + active_job1.job_id = "job1" + Job.enqueue(active_job1) + job1 = Job.find_by(active_job_id: active_job1.job_id) + + # Create multiple jobs that should all be discarded + discarded_jobs = [] + 5.times do |i| + active_job = DiscardOnConflictJob.new("shared_key") + active_job.job_id = "job#{i + 2}" + Job.enqueue(active_job) + job = Job.find_by(active_job_id: active_job.job_id) + discarded_jobs << job + end + + # First job should be ready + assert job1.ready? + + # All other jobs should be finished (discarded) + discarded_jobs.each do |job| + assert job.finished? + assert_nil job.ready_execution + assert_nil job.blocked_execution + end + end + + test "jobs with different keys are not affected by discard" do + # Create jobs with different keys - they should all be ready + jobs = [] + 3.times do |i| + active_job = DiscardOnConflictJob.new("key_#{i}") + active_job.job_id = "job#{i}" + Job.enqueue(active_job) + job = Job.find_by(active_job_id: active_job.job_id) + jobs << job + end + + # All jobs should be ready since they have different keys + jobs.each do |job| + assert job.ready? + assert job.ready_execution.present? + assert_not job.finished? + end + end + + test "discarded job does not prevent future jobs after lock is released" do + # Create and finish first job + active_job1 = DiscardOnConflictJob.new("test_key") + active_job1.job_id = "job1" + Job.enqueue(active_job1) + job1 = Job.find_by(active_job_id: active_job1.job_id) + + # Create second job that gets discarded + active_job2 = DiscardOnConflictJob.new("test_key") + active_job2.job_id = "job2" + Job.enqueue(active_job2) + job2 = Job.find_by(active_job_id: active_job2.job_id) + + assert job1.ready? + assert job2.finished? # discarded + + # Release the lock by finishing the first job + job1.unblock_next_blocked_job + job1.finished! + + # Create third job - should be ready now + active_job3 = DiscardOnConflictJob.new("test_key") + active_job3.job_id = "job3" + Job.enqueue(active_job3) + job3 = Job.find_by(active_job_id: active_job3.job_id) + + assert job3.ready? + assert job3.ready_execution.present? + end + end +end From bd2b655a16bebb6751b12fdec47de515a3344841 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 7 Jul 2025 12:02:15 -0400 Subject: [PATCH 4/6] Tidy up after merging two branches implementing the same feature Brought together #523 and #586, and took some parts from each. Clean and enhance the README as well. --- README.md | 40 ++++--------------- app/models/solid_queue/job.rb | 2 +- .../solid_queue/job/concurrency_controls.rb | 14 +++++-- app/models/solid_queue/job/executable.rb | 14 +------ lib/active_job/concurrency_controls.rb | 4 +- 5 files changed, 22 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index 339aea03..665339a1 100644 --- a/README.md +++ b/README.md @@ -428,11 +428,11 @@ In the case of recurring tasks, if such error is raised when enqueuing the job c ## Concurrency controls -Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs can be configured to either be discarded or blocked. +Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, by default, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Alternatively, jobs can be configured to be discarded instead of blocked. This means that if a job with certain arguments has already been enqueued, other jobs with the same characteristics (in the same concurrency _class_) won't be enqueued, they'll silently complete. ```ruby class MyJob < ApplicationJob - limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group, on_conflict: conflict_behaviour + limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group, on_conflict: on_conflict_behaviour # ... ``` @@ -440,13 +440,13 @@ class MyJob < ApplicationJob - `to` is `1` by default. - `duration` is set to `SolidQueue.default_concurrency_control_period` by default, which itself defaults to `3 minutes`, but that you can configure as well. - `group` is used to control the concurrency of different job classes together. It defaults to the job class name. -- `on_conflict` controls behaviour when enqueuing a job which is above the max concurrent executions for your configuration. - - (default) `:block`; the job is blocked and is dispatched until another job completes and unblocks it - - `:discard`; the job is discarded +- `on_conflict` controls behaviour when enqueuing a job that conflicts with the concurrency limits configured. It can be set to one of the following: + - (default) `:block`: the job is blocked and is dispatched when another job completes and unblocks it, or when the duration expires. + - `:discard`: the job is discarded (silently finishes). When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded up to the interval defined by `duration` has elapsed. When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping). -The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than duration are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting. It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated. +The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. If you're using the `discard` behaviour for `on_conflict`, jobs enqueued while the semaphore is closed will be discarded. Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than duration are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting. It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated. In a similar way, when using `discard` as the behaviour to handle conflicts, you might have jobs discarded for up to the `duration` interval if something happens and a running job fails to release the semaphore. For example: @@ -485,31 +485,6 @@ Jobs are unblocked in order of priority but queue order is not taken into accoun Finally, failed jobs that are automatically or manually retried work in the same way as new jobs that get enqueued: they get in the queue for getting an open semaphore, and whenever they get it, they'll be run. It doesn't matter if they had already gotten an open semaphore in the past. -### Discarding conflicting jobs - -When configuring `on_conflict` with `:discard`, jobs enqueued above the concurrent execution limit are discarded and failed to be enqueued. - -```ruby -class ConcurrentJob < ApplicationJob - limits_concurrency key: ->(record) { record }, on_conflict: :discard - - def perform(user); end -end - -enqueued_job = ConcurrentJob.perform_later(record) -# => instance of ConcurrentJob -enqueued_job.successfully_enqueued? -# => true - -second_enqueued_job = ConcurrentJob.perform_later(record) do |job| - job.successfully_enqueued? - # => false -end - -second_enqueued_job -# => false -``` - ### Performance considerations Concurrency controls introduce significant overhead (blocked executions need to be created and promoted to ready, semaphores need to be created and updated) so you should consider carefully whether you need them. For throttling purposes, where you plan to have `limit` significantly larger than 1, I'd encourage relying on a limited number of workers per queue instead. For example: @@ -533,9 +508,8 @@ production: Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues). -### Discarding concurrent jobs - +In addition, mixing concurrency controls with bulk enqueuing (Active Job's `perform_all_later`) is not a good idea because concurrency controlled job needs to be enqueued one by one to ensure concurrency limits are respected, so you lose all the benefits of bulk enqueuing. ## Failed jobs and retries diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index df4ab405..7d9b60d9 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -2,7 +2,7 @@ module SolidQueue class Job < Record - class EnqueueError < ActiveJob::EnqueueError; end + class EnqueueError < StandardError; end include Executable, Clearable, Recurrable diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 7295bc3f..0aa3696f 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -8,7 +8,7 @@ module ConcurrencyControls included do has_one :blocked_execution - delegate :concurrency_limit, :concurrency_duration, :concurrency_on_conflict, to: :job_class + delegate :concurrency_limit, :concurrency_duration, to: :job_class before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? } end @@ -34,8 +34,8 @@ def blocked? end private - def discard_concurrent? - concurrency_on_conflict == :discard + def concurrency_on_conflict + job_class.concurrency_on_conflict.to_s.inquiry end def acquire_concurrency_lock @@ -50,6 +50,14 @@ def release_concurrency_lock Semaphore.signal(self) end + def handle_concurrency_conflict + if concurrency_on_conflict.discard? + finished! + else + block + end + end + def block BlockedExecution.create_or_find_by!(job_id: id) end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index 5869367b..661f0f0d 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -74,16 +74,8 @@ def prepare_for_execution def dispatch if acquire_concurrency_lock then ready - elsif discard_concurrent? - discard - raise EnqueueError.new("Dispatched job discarded due to concurrent configuration.") else - case job_class.concurrency_on_conflict - when :discard - discard_on_conflict - else - block - end + handle_concurrency_conflict end end @@ -120,10 +112,6 @@ def ready ReadyExecution.create_or_find_by!(job_id: id) end - def discard_on_conflict - finished! - end - def execution %w[ ready claimed failed ].reduce(nil) { |acc, status| acc || public_send("#{status}_execution") } end diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index 4fc609bd..47b0f5ee 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -5,13 +5,13 @@ module ConcurrencyControls extend ActiveSupport::Concern DEFAULT_CONCURRENCY_GROUP = ->(*) { self.class.name } + CONCURRENCY_ON_CONFLICT_BEHAVIOUR = %i[ block discard ] included do class_attribute :concurrency_key, instance_accessor: false class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false class_attribute :concurrency_limit - class_attribute :concurrency_on_conflict class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period class_attribute :concurrency_on_conflict, default: :block end @@ -22,7 +22,7 @@ def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: self.concurrency_limit = to self.concurrency_group = group self.concurrency_duration = duration - self.concurrency_on_conflict = on_conflict + self.concurrency_on_conflict = on_conflict.presence_in(CONCURRENCY_ON_CONFLICT_BEHAVIOUR) || :block end end From ddfe7196da37f0a07922349a150175ad7079f55b Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 7 Jul 2025 13:09:43 -0400 Subject: [PATCH 5/6] Clean up tests after unifying discard behaviour --- README.md | 2 +- .../dummy/app/jobs/discard_on_conflict_job.rb | 7 - .../app/jobs/discardable_update_result_job.rb | 3 + test/dummy/app/jobs/limited_discard_job.rb | 8 - test/integration/concurrency_controls_test.rb | 55 +++++ test/integration/concurrency_discard_test.rb | 137 ----------- test/models/solid_queue/job_test.rb | 230 +++++------------- 7 files changed, 124 insertions(+), 318 deletions(-) delete mode 100644 test/dummy/app/jobs/discard_on_conflict_job.rb create mode 100644 test/dummy/app/jobs/discardable_update_result_job.rb delete mode 100644 test/dummy/app/jobs/limited_discard_job.rb delete mode 100644 test/integration/concurrency_discard_test.rb diff --git a/README.md b/README.md index 665339a1..fd84f119 100644 --- a/README.md +++ b/README.md @@ -442,7 +442,7 @@ class MyJob < ApplicationJob - `group` is used to control the concurrency of different job classes together. It defaults to the job class name. - `on_conflict` controls behaviour when enqueuing a job that conflicts with the concurrency limits configured. It can be set to one of the following: - (default) `:block`: the job is blocked and is dispatched when another job completes and unblocks it, or when the duration expires. - - `:discard`: the job is discarded (silently finishes). When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded up to the interval defined by `duration` has elapsed. + - `:discard`: the job is discarded (silently finishes). When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded up to the interval defined by `duration` has elapsed. Additionally, Active Jobs that are discarded will have `successfully_enqueued` set to `false`. When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping). diff --git a/test/dummy/app/jobs/discard_on_conflict_job.rb b/test/dummy/app/jobs/discard_on_conflict_job.rb deleted file mode 100644 index 64da39b3..00000000 --- a/test/dummy/app/jobs/discard_on_conflict_job.rb +++ /dev/null @@ -1,7 +0,0 @@ -class DiscardOnConflictJob < ApplicationJob - limits_concurrency to: 1, key: ->(value) { value }, on_conflict: :discard - - def perform(value) - Rails.logger.info "Performing DiscardOnConflictJob with value: #{value}" - end -end diff --git a/test/dummy/app/jobs/discardable_update_result_job.rb b/test/dummy/app/jobs/discardable_update_result_job.rb new file mode 100644 index 00000000..74f46fbd --- /dev/null +++ b/test/dummy/app/jobs/discardable_update_result_job.rb @@ -0,0 +1,3 @@ +class DiscardableUpdateResultJob < UpdateResultJob + limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard +end diff --git a/test/dummy/app/jobs/limited_discard_job.rb b/test/dummy/app/jobs/limited_discard_job.rb deleted file mode 100644 index ee7174c2..00000000 --- a/test/dummy/app/jobs/limited_discard_job.rb +++ /dev/null @@ -1,8 +0,0 @@ -class LimitedDiscardJob < ApplicationJob - limits_concurrency to: 2, key: ->(group, id) { group }, on_conflict: :discard - - def perform(group, id) - Rails.logger.info "Performing LimitedDiscardJob with group: #{group}, id: #{id}" - sleep 0.1 - end -end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index f04c87fa..a5ac349a 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -196,6 +196,61 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end end + test "discard jobs when concurrency limit is reached with on_conflict: :discard" do + # Enqueue first job - should be executed + job1 = DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.2) + # Enqueue second job - should be discarded due to concurrency limit + job2 = DiscardableUpdateResultJob.perform_later(@result, name: "2") + # Enqueue third job - should also be discarded + job3 = DiscardableUpdateResultJob.perform_later(@result, name: "3") + + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs + + # Only the first job did something + assert_stored_sequence(@result, [ "1" ]) + + # All jobs have finished and have no blocked executions + jobs = SolidQueue::Job.where(active_job_id: [ job1, job2, job3 ].map(&:job_id)) + assert_equal 3, jobs.count + + jobs.each do |job| + assert job.finished? + assert_nil job.blocked_execution + end + end + + test "discard on conflict across different concurrency keys" do + another_result = JobResult.create!(queue_name: "default", status: "seq: ") + DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.2) + DiscardableUpdateResultJob.perform_later(another_result, name: "2", pause: 0.2) + DiscardableUpdateResultJob.perform_later(@result, name: "3") # Should be discarded + DiscardableUpdateResultJob.perform_later(another_result, name: "4") # Should be discarded + + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs + + # Only the first 2 jobs did something + assert_stored_sequence(@result, [ "1" ]) + assert_stored_sequence(another_result, [ "2" ]) + end + + test "discard on conflict and release semaphore" do + DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.1) + # will be discarded + DiscardableUpdateResultJob.perform_later(@result, name: "2") + + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs + + # Enqueue another job that shouldn't be discarded or blocked + DiscardableUpdateResultJob.perform_later(@result, name: "3") + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs + + assert_stored_sequence(@result, [ "1", "3" ]) + end + private def assert_stored_sequence(result, *sequences) expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join } diff --git a/test/integration/concurrency_discard_test.rb b/test/integration/concurrency_discard_test.rb deleted file mode 100644 index 1ced7d4d..00000000 --- a/test/integration/concurrency_discard_test.rb +++ /dev/null @@ -1,137 +0,0 @@ -# frozen_string_literal: true - -require "test_helper" - -class ConcurrencyDiscardTest < ActiveSupport::TestCase - setup do - @job_result = JobResult.create!(queue_name: "default", status: "test") - end - - test "discard jobs when concurrency limit is reached with on_conflict: :discard" do - # Enqueue first job - should be executed - job1 = DiscardOnConflictJob.perform_later(@job_result.id) - - # Enqueue second job - should be discarded due to concurrency limit - job2 = DiscardOnConflictJob.perform_later(@job_result.id) - - # Enqueue third job - should also be discarded - job3 = DiscardOnConflictJob.perform_later(@job_result.id) - - # Check that first job was ready - solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) - assert solid_job1.ready? - assert solid_job1.ready_execution.present? - - # Check that second and third jobs were discarded - solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) - assert solid_job2.finished? - assert_nil solid_job2.ready_execution - assert_nil solid_job2.blocked_execution - - solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id) - assert solid_job3.finished? - assert_nil solid_job3.ready_execution - assert_nil solid_job3.blocked_execution - end - - test "block jobs when concurrency limit is reached without on_conflict option" do - # Using SequentialUpdateResultJob which has default blocking behavior - # Enqueue first job - should be executed - job1 = SequentialUpdateResultJob.perform_later(@job_result, name: "A") - - # Enqueue second job - should be blocked due to concurrency limit - job2 = SequentialUpdateResultJob.perform_later(@job_result, name: "B") - - # Check that second job is blocked - solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) - assert solid_job2.blocked? - assert solid_job2.blocked_execution.present? - end - - test "respect concurrency limit with discard option" do - # Enqueue jobs with limit of 2 - job1 = LimitedDiscardJob.perform_later("group1", 1) - job2 = LimitedDiscardJob.perform_later("group1", 2) - job3 = LimitedDiscardJob.perform_later("group1", 3) # Should be discarded - job4 = LimitedDiscardJob.perform_later("group1", 4) # Should be discarded - - # Check that first two jobs are ready - solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) - solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) - assert solid_job1.ready? - assert solid_job2.ready? - - # Check that third and fourth jobs are discarded - solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id) - solid_job4 = SolidQueue::Job.find_by(active_job_id: job4.job_id) - assert solid_job3.finished? - assert solid_job4.finished? - assert_nil solid_job3.ready_execution - assert_nil solid_job4.ready_execution - end - - test "discard option works with different concurrency keys" do - # These should not conflict because they have different keys - job1 = DiscardOnConflictJob.perform_later("key1") - job2 = DiscardOnConflictJob.perform_later("key2") - job3 = DiscardOnConflictJob.perform_later("key1") # Should be discarded - - # Check that first two jobs are ready (different keys) - solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) - solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) - assert solid_job1.ready? - assert solid_job2.ready? - - # Check that third job is discarded (same key as first) - solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id) - assert solid_job3.finished? - assert_nil solid_job3.ready_execution - end - - test "discarded jobs do not unblock other jobs" do - # Enqueue a job that will be executed - job1 = DiscardOnConflictJob.perform_later(@job_result.id) - - # Enqueue a job that will be discarded - job2 = DiscardOnConflictJob.perform_later(@job_result.id) - - # The first job should be ready - solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) - assert solid_job1.ready? - - # The second job should be discarded immediately - solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) - assert solid_job2.finished? - - # Complete the first job and release its lock - solid_job1.unblock_next_blocked_job - solid_job1.finished! - - # Enqueue another job - it should be ready since the lock is released - job3 = DiscardOnConflictJob.perform_later(@job_result.id) - solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id) - assert solid_job3.ready? - end - - test "discarded jobs are marked as finished without execution" do - # Enqueue a job that will be ready - job1 = DiscardOnConflictJob.perform_later("test_key") - - # Enqueue a job that will be discarded - job2 = DiscardOnConflictJob.perform_later("test_key") - - solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id) - solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id) - - # First job should be ready - assert solid_job1.ready? - assert solid_job1.ready_execution.present? - - # Second job should be finished without any execution - assert solid_job2.finished? - assert_nil solid_job2.ready_execution - assert_nil solid_job2.claimed_execution - assert_nil solid_job2.failed_execution - assert_nil solid_job2.blocked_execution - end -end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 9cdc1b6f..c667cf88 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -10,11 +10,11 @@ def perform(job_result) end end - class DiscardedNonOverlappingJob < NonOverlappingJob + class DiscardableNonOverlappingJob < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard end - class DiscardedOverlappingJob < NonOverlappingJob + class DiscardableThrottledJob < NonOverlappingJob limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :discard end @@ -26,11 +26,11 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup" end - class DiscardedNonOverlappingGroupedJob1 < NonOverlappingJob + class DiscardableNonOverlappingGroupedJob1 < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, group: "DiscardingGroup", on_conflict: :discard end - class DiscardedNonOverlappingGroupedJob2 < NonOverlappingJob + class DiscardableNonOverlappingGroupedJob2 < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, group: "DiscardingGroup", on_conflict: :discard end @@ -64,82 +64,6 @@ class DiscardedNonOverlappingGroupedJob2 < NonOverlappingJob assert_equal 8, execution.priority end - test "enqueue jobs with on_conflict discard" do - # First job should be ready - active_job1 = NonOverlappingDiscardJob.new(@result) - assert_ready do - SolidQueue::Job.enqueue(active_job1) - end - job1 = SolidQueue::Job.find_by(active_job_id: active_job1.job_id) - assert job1.ready? - - # Second job should be discarded (finished without execution) - active_job2 = NonOverlappingDiscardJob.new(@result) - assert_no_difference -> { SolidQueue::ReadyExecution.count } do - assert_no_difference -> { SolidQueue::BlockedExecution.count } do - SolidQueue::Job.enqueue(active_job2) - end - end - job2 = SolidQueue::Job.find_by(active_job_id: active_job2.job_id) - - assert job2.finished? - assert_nil job2.ready_execution - assert_nil job2.blocked_execution - assert_nil job2.claimed_execution - assert_nil job2.failed_execution - - # Third job with same key should also be discarded - active_job3 = NonOverlappingDiscardJob.new(@result) - assert_no_difference -> { SolidQueue::ReadyExecution.count } do - SolidQueue::Job.enqueue(active_job3) - end - job3 = SolidQueue::Job.find_by(active_job_id: active_job3.job_id) - - assert job3.finished? - end - - test "compare blocking vs discard behavior" do - # Test default blocking behavior - blocking_job1 = NonOverlappingJob.new(@result) - assert_ready do - SolidQueue::Job.enqueue(blocking_job1) - end - job1 = SolidQueue::Job.find_by(active_job_id: blocking_job1.job_id) - assert job1.ready? - - # Second job should be blocked (not discarded) - blocking_job2 = NonOverlappingJob.new(@result) - assert_difference -> { SolidQueue::BlockedExecution.count }, +1 do - SolidQueue::Job.enqueue(blocking_job2) - end - job2 = SolidQueue::Job.find_by(active_job_id: blocking_job2.job_id) - assert job2.blocked? - assert job2.blocked_execution.present? - assert_not job2.finished? - - # Clean up for discard test - SolidQueue::Job.destroy_all - SolidQueue::Semaphore.destroy_all - - # Test discard behavior - discard_job1 = NonOverlappingDiscardJob.new(@result) - assert_ready do - SolidQueue::Job.enqueue(discard_job1) - end - job3 = SolidQueue::Job.find_by(active_job_id: discard_job1.job_id) - assert job3.ready? - - # Second job should be discarded (not blocked) - discard_job2 = NonOverlappingDiscardJob.new(@result) - assert_no_difference -> { SolidQueue::BlockedExecution.count } do - SolidQueue::Job.enqueue(discard_job2) - end - job4 = SolidQueue::Job.find_by(active_job_id: discard_job2.job_id) - assert job4.finished? - assert_nil job4.blocked_execution - assert_nil job4.ready_execution - end - test "enqueue active job to be scheduled in the future" do active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test") @@ -193,79 +117,27 @@ class DiscardedNonOverlappingGroupedJob2 < NonOverlappingJob assert_equal active_job.concurrency_key, job.concurrency_key end - test "enqueue jobs with discarding concurrency controls" do - assert_ready do - active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A") - assert active_job.successfully_enqueued? - - assert_not DiscardedNonOverlappingJob.perform_later(@result, name: "B") do |overlapping_active_job| - assert_not overlapping_active_job.successfully_enqueued? - assert_equal @discarded_concurrent_error, overlapping_active_job.enqueue_error - end - end - end - - test "enqueue scheduled job with discarding concurrency controls" do + test "block jobs when concurrency limits are reached" do assert_ready do - active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A") - assert active_job.successfully_enqueued? - end - - scheduled_job_id = nil - - assert_scheduled do - scheduled_active_job = DiscardedNonOverlappingJob.set(wait: 0.5.seconds).perform_later(@result, name: "B") - assert scheduled_active_job.successfully_enqueued? - assert_nil scheduled_active_job.enqueue_error - - scheduled_job_id = scheduled_active_job.provider_job_id + NonOverlappingJob.perform_later(@result, name: "A") end - scheduled_job = SolidQueue::Job.find(scheduled_job_id) - wait_for { scheduled_job.due? } - - dispatched = SolidQueue::ScheduledExecution.dispatch_next_batch(10) - assert_equal 0, dispatched - assert_raises(ActiveRecord::RecordNotFound) { scheduled_job.reload } - end - - test "enqueues jobs in bulk with discarding concurrency controls" do - jobs = [ - job_1 = DiscardedNonOverlappingJob.new(@result, name: "A"), - job_2 = DiscardedNonOverlappingJob.new(@result, name: "B") - ] - - assert_job_counts(ready: 1, discarded: 1) do - enqueued_jobs_count = SolidQueue::Job.enqueue_all(jobs) - assert_equal enqueued_jobs_count, 1 + assert_blocked do + NonOverlappingJob.perform_later(@result, name: "B") end - assert job_1.successfully_enqueued? - assert_not job_2.successfully_enqueued? - assert_equal SolidQueue::Job::EnqueueError, job_2.enqueue_error.class - assert_equal @discarded_concurrent_error.message, job_2.enqueue_error.message + blocked_execution = SolidQueue::BlockedExecution.last + assert blocked_execution.expires_at <= SolidQueue.default_concurrency_control_period.from_now end - test "enqueue jobs with discarding concurrency controls when below limit" do - assert_job_counts(ready: 2) do - assert_ready do - active_job = DiscardedOverlappingJob.perform_later(@result, name: "A") - assert active_job.successfully_enqueued? - end - - assert_ready do - active_job = DiscardedOverlappingJob.perform_later(@result, name: "B") - assert active_job.successfully_enqueued? - end - - assert_not DiscardedOverlappingJob.perform_later(@result, name: "C") do |overlapping_active_job| - assert_not overlapping_active_job.successfully_enqueued? - assert_equal @discarded_concurrent_error, overlapping_active_job.enqueue_error - end + test "skips jobs with on_conflict set to discard when concurrency limits are reached" do + assert_job_counts(ready: 1) do + DiscardableNonOverlappingJob.perform_later(@result, name: "A") + DiscardableNonOverlappingJob.perform_later(@result, name: "B") end end - test "enqueue jobs with concurrency controls in the same concurrency group" do + test "block jobs in the same concurrency group when concurrency limits are reached" do assert_ready do active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A") assert_equal 1, active_job.concurrency_limit @@ -279,24 +151,39 @@ class DiscardedNonOverlappingGroupedJob2 < NonOverlappingJob end end - test "enqueue jobs with discarding concurrency controls in the same concurrency group" do + test "skips jobs with on_conflict set to discard in the same concurrency group when concurrency limits are reached" do assert_job_counts(ready: 1) do - assert_ready do - active_job = DiscardedNonOverlappingGroupedJob1.perform_later(@result, name: "A") - assert active_job.successfully_enqueued? - assert_equal 1, active_job.concurrency_limit - assert_equal "DiscardingGroup/JobResult/#{@result.id}", active_job.concurrency_key - end + active_job = DiscardableNonOverlappingGroupedJob1.perform_later(@result, name: "A") + assert_equal 1, active_job.concurrency_limit + assert_equal "DiscardingGroup/JobResult/#{@result.id}", active_job.concurrency_key - assert_not DiscardedNonOverlappingGroupedJob2.perform_later(@result, name: "B") do |blocked_active_job| - assert_not blocked_active_job.successfully_enqueued? - assert_equal 1, blocked_active_job.concurrency_limit - assert_equal "DiscardingGroup/JobResult/#{@result.id}", blocked_active_job.concurrency_key - end + active_job = DiscardableNonOverlappingGroupedJob2.perform_later(@result, name: "B") + assert_equal 1, active_job.concurrency_limit + assert_equal "DiscardingGroup/JobResult/#{@result.id}", active_job.concurrency_key end end - test "enqueue multiple jobs" do + test "enqueue scheduled job with concurrency controls and on_conflict set to discard" do + assert_ready do + DiscardableNonOverlappingJob.perform_later(@result, name: "A") + end + + assert_scheduled do + DiscardableNonOverlappingJob.set(wait: 5.minutes).perform_later(@result, name: "B") + end + + scheduled_job = SolidQueue::Job.last + + travel_to 10.minutes.from_now + + count = SolidQueue::ScheduledExecution.dispatch_next_batch(10) + + assert_not scheduled_job.reload.scheduled? + assert_not scheduled_job.ready? + assert scheduled_job.finished? + end + + test "enqueue jobs in bulk" do active_jobs = [ AddToBufferJob.new(2), AddToBufferJob.new(6).set(wait: 2.minutes), @@ -318,17 +205,23 @@ class DiscardedNonOverlappingGroupedJob2 < NonOverlappingJob assert active_jobs.all?(&:successfully_enqueued?) end - test "block jobs when concurrency limits are reached" do - assert_ready do - NonOverlappingJob.perform_later(@result, name: "A") - end + test "enqueues jobs in bulk with concurrency controls and some set to discard" do + active_jobs = [ + AddToBufferJob.new(2), + DiscardableNonOverlappingJob.new(@result), + NonOverlappingJob.new(@result), + AddToBufferJob.new(6).set(wait: 2.minutes), + NonOverlappingJob.new(@result), + DiscardableNonOverlappingJob.new(@result) + ] - assert_blocked do - NonOverlappingJob.perform_later(@result, name: "B") + assert_job_counts(ready: 3, scheduled: 1, blocked: 1, discarded: 1) do + ActiveJob.perform_all_later(active_jobs) end - blocked_execution = SolidQueue::BlockedExecution.last - assert blocked_execution.expires_at <= SolidQueue.default_concurrency_control_period.from_now + jobs = SolidQueue::Job.last(6) + assert_equal active_jobs.map(&:provider_job_id).sort, jobs.pluck(:id).sort + assert active_jobs.all?(&:successfully_enqueued?) end test "discard ready job" do @@ -438,7 +331,12 @@ class DiscardedNonOverlappingGroupedJob2 < NonOverlappingJob SolidQueue::Job.enqueue(active_job) end - # #perform_later doesn't raise ActiveJob::EnqueueError, and instead set's successfully_enqueued? to false + assert_raises SolidQueue::Job::EnqueueError do + AddToBufferJob.perform_later(1) + end + + # #perform_later with a block doesn't raise ActiveJob::EnqueueError, + # and instead set's successfully_enqueued? to false assert_not AddToBufferJob.perform_later(1) do |active_job| assert_not active_job.successfully_enqueued? assert_equal SolidQueue::Job::EnqueueError, active_job.enqueue_error.class @@ -470,6 +368,7 @@ def assert_ready(&block) def assert_scheduled(&block) assert_job_counts(scheduled: 1, &block) + assert SolidQueue::Job.last.scheduled? end def assert_blocked(&block) @@ -479,6 +378,7 @@ def assert_blocked(&block) def assert_discarded(&block) assert_job_counts(discarded: 1, &block) + assert SolidQueue::Job.last.finished? end def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, discarded: 0, &block) From d23be3cf29c3f391c087acbd7ad53f79714479e3 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 9 Jul 2025 19:12:33 -0400 Subject: [PATCH 6/6] Change the approach, destroying the job when it can't be enqueued Otherwise we'd get inconsistent behaviour with other adapters like Sidekiq, where only jobs that are actually enqueued get assigned a provider_job_id. Turns out, issuing a DELETE query for a record that got inserted within a transaction works perfectly, even with bulk INSERT, so let's just avoid creating those jobs altogether. --- README.md | 18 +- app/models/solid_queue/job.rb | 21 +-- .../solid_queue/job/concurrency_controls.rb | 2 +- app/models/solid_queue/job/executable.rb | 10 +- test/integration/concurrency_controls_test.rb | 16 +- test/models/solid_queue/job_test.rb | 39 ++--- test/unit/concurrency_discard_test.rb | 154 ------------------ 7 files changed, 45 insertions(+), 215 deletions(-) delete mode 100644 test/unit/concurrency_discard_test.rb diff --git a/README.md b/README.md index fd84f119..2c386747 100644 --- a/README.md +++ b/README.md @@ -428,7 +428,7 @@ In the case of recurring tasks, if such error is raised when enqueuing the job c ## Concurrency controls -Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, by default, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Alternatively, jobs can be configured to be discarded instead of blocked. This means that if a job with certain arguments has already been enqueued, other jobs with the same characteristics (in the same concurrency _class_) won't be enqueued, they'll silently complete. +Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, by default, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Alternatively, jobs can be configured to be discarded instead of blocked. This means that if a job with certain arguments has already been enqueued, other jobs with the same characteristics (in the same concurrency _class_) won't be enqueued. ```ruby class MyJob < ApplicationJob @@ -442,11 +442,17 @@ class MyJob < ApplicationJob - `group` is used to control the concurrency of different job classes together. It defaults to the job class name. - `on_conflict` controls behaviour when enqueuing a job that conflicts with the concurrency limits configured. It can be set to one of the following: - (default) `:block`: the job is blocked and is dispatched when another job completes and unblocks it, or when the duration expires. - - `:discard`: the job is discarded (silently finishes). When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded up to the interval defined by `duration` has elapsed. Additionally, Active Jobs that are discarded will have `successfully_enqueued` set to `false`. + - `:discard`: the job is discarded. When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded up to the interval defined by `duration` has elapsed. When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping). -The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. If you're using the `discard` behaviour for `on_conflict`, jobs enqueued while the semaphore is closed will be discarded. Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than duration are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting. It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated. In a similar way, when using `discard` as the behaviour to handle conflicts, you might have jobs discarded for up to the `duration` interval if something happens and a running job fails to release the semaphore. +The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. If you're using the `discard` behaviour for `on_conflict`, jobs enqueued while the semaphore is closed will be discarded. + +Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than `duration` are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting, or about the jobs that would get discarded while the semaphore is closed. + +It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated. + +When using `discard` as the behaviour to handle conflicts, you might have jobs discarded for up to the `duration` interval if something happens and a running job fails to release the semaphore. For example: @@ -481,7 +487,7 @@ In this case, if we have a `Box::MovePostingsByContactToDesignatedBoxJob` job en Note that the `duration` setting depends indirectly on the value for `concurrency_maintenance_interval` that you set for your dispatcher(s), as that'd be the frequency with which blocked jobs are checked and unblocked (at which point, only one job per concurrency key, at most, is unblocked). In general, you should set `duration` in a way that all your jobs would finish well under that duration and think of the concurrency maintenance task as a failsafe in case something goes wrong. -Jobs are unblocked in order of priority but queue order is not taken into account for unblocking jobs. That means that if you have a group of jobs that share a concurrency group but are in different queues, or jobs of the same class that you enqueue in different queues, the queue order you set for a worker is not taken into account when unblocking blocked ones. The reason is that a job that runs unblocks the next one, and the job itself doesn't know about a particular worker's queue order (you could even have different workers with different queue orders), it can only know about priority. Once blocked jobs are unblocked and available for polling, they'll be picked up by a worker following its queue order. +Jobs are unblocked in order of priority but **queue order is not taken into account for unblocking jobs**. That means that if you have a group of jobs that share a concurrency group but are in different queues, or jobs of the same class that you enqueue in different queues, the queue order you set for a worker is not taken into account when unblocking blocked ones. The reason is that a job that runs unblocks the next one, and the job itself doesn't know about a particular worker's queue order (you could even have different workers with different queue orders), it can only know about priority. Once blocked jobs are unblocked and available for polling, they'll be picked up by a worker following its queue order. Finally, failed jobs that are automatically or manually retried work in the same way as new jobs that get enqueued: they get in the queue for getting an open semaphore, and whenever they get it, they'll be run. It doesn't matter if they had already gotten an open semaphore in the past. @@ -509,7 +515,9 @@ production: Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues). -In addition, mixing concurrency controls with bulk enqueuing (Active Job's `perform_all_later`) is not a good idea because concurrency controlled job needs to be enqueued one by one to ensure concurrency limits are respected, so you lose all the benefits of bulk enqueuing. +In addition, mixing concurrency controls with **bulk enqueuing** (Active Job's `perform_all_later`) is not a good idea because concurrency controlled job needs to be enqueued one by one to ensure concurrency limits are respected, so you lose all the benefits of bulk enqueuing. + +When jobs that have concurrency controls and `on_conflict: :discard` are enqueued in bulk, the ones that fail to be enqueued and are discarded would have `successfully_enqueued` set to `false`. The total count of jobs enqueued returned by `perform_all_later` will exclude these jobs as expected. ## Failed jobs and retries diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index 7d9b60d9..234e5eb4 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -10,31 +10,26 @@ class EnqueueError < StandardError; end class << self def enqueue_all(active_jobs) - enqueued_jobs_count = 0 + active_jobs_by_job_id = active_jobs.index_by(&:job_id) transaction do jobs = create_all_from_active_jobs(active_jobs) - prepare_all_for_execution(jobs) - jobs_by_active_job_id = jobs.index_by(&:active_job_id) - - active_jobs.each do |active_job| - job = jobs_by_active_job_id[active_job.job_id] - - active_job.provider_job_id = job&.id - active_job.enqueue_error = job&.enqueue_error - active_job.successfully_enqueued = job.present? && job.enqueue_error.nil? - enqueued_jobs_count += 1 if active_job.successfully_enqueued? + prepare_all_for_execution(jobs).tap do |enqueued_jobs| + enqueued_jobs.each do |enqueued_job| + active_jobs_by_job_id[enqueued_job.active_job_id].provider_job_id = enqueued_job.id + active_jobs_by_job_id[enqueued_job.active_job_id].successfully_enqueued = true + end end end - enqueued_jobs_count + active_jobs.count(&:successfully_enqueued?) end def enqueue(active_job, scheduled_at: Time.current) active_job.scheduled_at = scheduled_at create_from_active_job(active_job).tap do |enqueued_job| - active_job.provider_job_id = enqueued_job.id + active_job.provider_job_id = enqueued_job.id if enqueued_job.persisted? end end diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 0aa3696f..b7410b08 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -52,7 +52,7 @@ def release_concurrency_lock def handle_concurrency_conflict if concurrency_on_conflict.discard? - finished! + destroy else block end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index 661f0f0d..b0a4cb93 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -13,8 +13,6 @@ module Executable after_create :prepare_for_execution - attr_accessor :enqueue_error - scope :finished, -> { where.not(finished_at: nil) } end @@ -39,13 +37,7 @@ def dispatch_all_at_once(jobs) end def dispatch_all_one_by_one(jobs) - jobs.each do |job| - begin - job.dispatch - rescue EnqueueError => e - job.enqueue_error = e - end - end + jobs.each(&:dispatch) end def successfully_dispatched(jobs) diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index a5ac349a..f0984078 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -197,11 +197,10 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end test "discard jobs when concurrency limit is reached with on_conflict: :discard" do - # Enqueue first job - should be executed - job1 = DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.2) - # Enqueue second job - should be discarded due to concurrency limit + job1 = DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 3) + # should be discarded due to concurrency limit job2 = DiscardableUpdateResultJob.perform_later(@result, name: "2") - # Enqueue third job - should also be discarded + # should also be discarded job3 = DiscardableUpdateResultJob.perform_later(@result, name: "3") wait_for_jobs_to_finish_for(5.seconds) @@ -212,12 +211,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # All jobs have finished and have no blocked executions jobs = SolidQueue::Job.where(active_job_id: [ job1, job2, job3 ].map(&:job_id)) - assert_equal 3, jobs.count + assert_equal 1, jobs.count - jobs.each do |job| - assert job.finished? - assert_nil job.blocked_execution - end + assert_equal job1.provider_job_id, jobs.first.id + assert_nil job2.provider_job_id + assert_nil job3.provider_job_id end test "discard on conflict across different concurrency keys" do diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index c667cf88..013e8511 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -176,11 +176,10 @@ class DiscardableNonOverlappingGroupedJob2 < NonOverlappingJob travel_to 10.minutes.from_now - count = SolidQueue::ScheduledExecution.dispatch_next_batch(10) - - assert_not scheduled_job.reload.scheduled? - assert_not scheduled_job.ready? - assert scheduled_job.finished? + # The scheduled job is not enqueued because it conflicts with + # the first one and is discarded + assert_equal 0, SolidQueue::ScheduledExecution.dispatch_next_batch(10) + assert_nil SolidQueue::Job.find_by(id: scheduled_job.id) end test "enqueue jobs in bulk" do @@ -212,16 +211,20 @@ class DiscardableNonOverlappingGroupedJob2 < NonOverlappingJob NonOverlappingJob.new(@result), AddToBufferJob.new(6).set(wait: 2.minutes), NonOverlappingJob.new(@result), - DiscardableNonOverlappingJob.new(@result) + DiscardableNonOverlappingJob.new(@result) # this one won't be enqueued ] + not_enqueued = active_jobs.last - assert_job_counts(ready: 3, scheduled: 1, blocked: 1, discarded: 1) do + assert_job_counts(ready: 3, scheduled: 1, blocked: 1) do ActiveJob.perform_all_later(active_jobs) end - jobs = SolidQueue::Job.last(6) - assert_equal active_jobs.map(&:provider_job_id).sort, jobs.pluck(:id).sort - assert active_jobs.all?(&:successfully_enqueued?) + jobs = SolidQueue::Job.last(5) + assert_equal active_jobs.without(not_enqueued).map(&:provider_job_id).sort, jobs.pluck(:id).sort + assert active_jobs.without(not_enqueued).all?(&:successfully_enqueued?) + + assert_nil not_enqueued.provider_job_id + assert_not not_enqueued.successfully_enqueued? end test "discard ready job" do @@ -334,13 +337,6 @@ class DiscardableNonOverlappingGroupedJob2 < NonOverlappingJob assert_raises SolidQueue::Job::EnqueueError do AddToBufferJob.perform_later(1) end - - # #perform_later with a block doesn't raise ActiveJob::EnqueueError, - # and instead set's successfully_enqueued? to false - assert_not AddToBufferJob.perform_later(1) do |active_job| - assert_not active_job.successfully_enqueued? - assert_equal SolidQueue::Job::EnqueueError, active_job.enqueue_error.class - end end test "enqueue successfully inside a rolled-back transaction in the app DB" do @@ -376,13 +372,8 @@ def assert_blocked(&block) assert SolidQueue::Job.last.blocked? end - def assert_discarded(&block) - assert_job_counts(discarded: 1, &block) - assert SolidQueue::Job.last.finished? - end - - def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, discarded: 0, &block) - assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked + discarded) do + def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, &block) + assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked) do assert_difference -> { SolidQueue::ReadyExecution.count }, +ready do assert_difference -> { SolidQueue::ScheduledExecution.count }, +scheduled do assert_difference -> { SolidQueue::BlockedExecution.count }, +blocked, &block diff --git a/test/unit/concurrency_discard_test.rb b/test/unit/concurrency_discard_test.rb deleted file mode 100644 index ae31fa55..00000000 --- a/test/unit/concurrency_discard_test.rb +++ /dev/null @@ -1,154 +0,0 @@ -# frozen_string_literal: true - -require "test_helper" - -module SolidQueue - class ConcurrencyDiscardTest < ActiveSupport::TestCase - class DiscardOnConflictJob < ApplicationJob - limits_concurrency to: 1, key: ->(value) { value }, on_conflict: :discard - - def perform(value) - # Job implementation - end - end - - class DefaultBlockingJob < ApplicationJob - limits_concurrency to: 1, key: ->(value) { value } - - def perform(value) - # Job implementation - end - end - - test "job with on_conflict: :discard is finished when concurrency limit is reached" do - # Create first job that will acquire the lock - active_job1 = DiscardOnConflictJob.new("test_key") - active_job1.job_id = "job1" - Job.enqueue(active_job1) - job1 = Job.find_by(active_job_id: active_job1.job_id) - - # First job should be ready - assert job1.ready? - assert job1.ready_execution.present? - - # Create second job that should be discarded - active_job2 = DiscardOnConflictJob.new("test_key") - active_job2.job_id = "job2" - Job.enqueue(active_job2) - job2 = Job.find_by(active_job_id: active_job2.job_id) - - # Second job should be finished without any execution - assert job2.finished? - assert_nil job2.ready_execution - assert_nil job2.blocked_execution - assert_nil job2.claimed_execution - assert_nil job2.failed_execution - end - - test "job without on_conflict option is blocked when concurrency limit is reached" do - # Create first job that will acquire the lock - active_job1 = DefaultBlockingJob.new("test_key") - active_job1.job_id = "job1" - Job.enqueue(active_job1) - job1 = Job.find_by(active_job_id: active_job1.job_id) - - # First job should be ready - assert job1.ready? - assert job1.ready_execution.present? - - # Create second job that should be blocked - active_job2 = DefaultBlockingJob.new("test_key") - active_job2.job_id = "job2" - Job.enqueue(active_job2) - job2 = Job.find_by(active_job_id: active_job2.job_id) - - # Second job should be blocked - assert job2.blocked? - assert job2.blocked_execution.present? - assert_nil job2.ready_execution - assert_not job2.finished? - end - - test "concurrency_on_conflict attribute is properly set" do - assert_equal :discard, DiscardOnConflictJob.concurrency_on_conflict - assert_equal :block, DefaultBlockingJob.concurrency_on_conflict - end - - test "multiple jobs with same key are discarded when using on_conflict: :discard" do - # Create first job - active_job1 = DiscardOnConflictJob.new("shared_key") - active_job1.job_id = "job1" - Job.enqueue(active_job1) - job1 = Job.find_by(active_job_id: active_job1.job_id) - - # Create multiple jobs that should all be discarded - discarded_jobs = [] - 5.times do |i| - active_job = DiscardOnConflictJob.new("shared_key") - active_job.job_id = "job#{i + 2}" - Job.enqueue(active_job) - job = Job.find_by(active_job_id: active_job.job_id) - discarded_jobs << job - end - - # First job should be ready - assert job1.ready? - - # All other jobs should be finished (discarded) - discarded_jobs.each do |job| - assert job.finished? - assert_nil job.ready_execution - assert_nil job.blocked_execution - end - end - - test "jobs with different keys are not affected by discard" do - # Create jobs with different keys - they should all be ready - jobs = [] - 3.times do |i| - active_job = DiscardOnConflictJob.new("key_#{i}") - active_job.job_id = "job#{i}" - Job.enqueue(active_job) - job = Job.find_by(active_job_id: active_job.job_id) - jobs << job - end - - # All jobs should be ready since they have different keys - jobs.each do |job| - assert job.ready? - assert job.ready_execution.present? - assert_not job.finished? - end - end - - test "discarded job does not prevent future jobs after lock is released" do - # Create and finish first job - active_job1 = DiscardOnConflictJob.new("test_key") - active_job1.job_id = "job1" - Job.enqueue(active_job1) - job1 = Job.find_by(active_job_id: active_job1.job_id) - - # Create second job that gets discarded - active_job2 = DiscardOnConflictJob.new("test_key") - active_job2.job_id = "job2" - Job.enqueue(active_job2) - job2 = Job.find_by(active_job_id: active_job2.job_id) - - assert job1.ready? - assert job2.finished? # discarded - - # Release the lock by finishing the first job - job1.unblock_next_blocked_job - job1.finished! - - # Create third job - should be ready now - active_job3 = DiscardOnConflictJob.new("test_key") - active_job3.job_id = "job3" - Job.enqueue(active_job3) - job3 = Job.find_by(active_job_id: active_job3.job_id) - - assert job3.ready? - assert job3.ready_execution.present? - end - end -end