Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion app/models/solid_queue/job/concurrency_controls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module ConcurrencyControls
included do
has_one :blocked_execution

delegate :concurrency_limit, :concurrency_duration, to: :job_class
delegate :concurrency_limit, :concurrency_at_limit, :concurrency_duration, to: :job_class

before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? }
end
Expand All @@ -34,8 +34,13 @@ def blocked?
end

private
def discard_concurrent?
concurrency_at_limit == :discard
end

def acquire_concurrency_lock
return true unless concurrency_limited?
return false if Semaphore.at_limit?(self) && discard_concurrent?

Semaphore.wait(self)
end
Expand Down
2 changes: 2 additions & 0 deletions app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def prepare_for_execution

def dispatch
if acquire_concurrency_lock then ready
elsif discard_concurrent?
discard
else
block
end
Expand Down
12 changes: 12 additions & 0 deletions app/models/solid_queue/semaphore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ def wait(job)
Proxy.new(job).wait
end

def at_limit?(job)
Proxy.new(job).at_limit?
end

def signal(job)
Proxy.new(job).signal
end
Expand Down Expand Up @@ -39,6 +43,14 @@ def initialize(job)
@job = job
end

def at_limit?
if semaphore = Semaphore.find_by(key: key)
semaphore.value.zero?
else
false
end
end

def wait
if semaphore = Semaphore.find_by(key: key)
semaphore.value > 0 && attempt_decrement
Expand Down
4 changes: 3 additions & 1 deletion lib/active_job/concurrency_controls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ module ConcurrencyControls
class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false

class_attribute :concurrency_limit
class_attribute :concurrency_at_limit
class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period
end

class_methods do
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period)
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, at_limit: :block)
self.concurrency_key = key
self.concurrency_limit = to
self.concurrency_group = group
self.concurrency_duration = duration
self.concurrency_at_limit = at_limit
end
end

Expand Down
50 changes: 48 additions & 2 deletions test/models/solid_queue/job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ def perform(job_result)
end
end

class DiscardedNonOverlappingJob < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, at_limit: :discard
end

class DiscardedOverlappingJob < NonOverlappingJob
limits_concurrency to: 2, key: ->(job_result, **) { job_result }, at_limit: :discard
end

class NonOverlappingGroupedJob1 < NonOverlappingJob
limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup"
end
Expand Down Expand Up @@ -98,6 +106,40 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
assert_equal active_job.concurrency_key, job.concurrency_key
end

test "enqueue jobs with discarding concurrency controls" do
assert_ready do
active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A")
assert_equal 1, active_job.concurrency_limit
assert_equal "SolidQueue::JobTest::DiscardedNonOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key
end

assert_discarded do
active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A")
assert_equal 1, active_job.concurrency_limit
assert_equal "SolidQueue::JobTest::DiscardedNonOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key
end
end

test "enqueue jobs with discarding concurrency controls when below limit" do
assert_ready do
active_job = DiscardedOverlappingJob.perform_later(@result, name: "A")
assert_equal 2, active_job.concurrency_limit
assert_equal "SolidQueue::JobTest::DiscardedOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key
end

assert_ready do
active_job = DiscardedOverlappingJob.perform_later(@result, name: "A")
assert_equal 2, active_job.concurrency_limit
assert_equal "SolidQueue::JobTest::DiscardedOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key
end

assert_discarded do
active_job = DiscardedOverlappingJob.perform_later(@result, name: "A")
assert_equal 2, active_job.concurrency_limit
assert_equal "SolidQueue::JobTest::DiscardedOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key
end
end

test "enqueue jobs with concurrency controls in the same concurrency group" do
assert_ready do
active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A")
Expand Down Expand Up @@ -289,8 +331,12 @@ def assert_blocked(&block)
assert SolidQueue::Job.last.blocked?
end

def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, &block)
assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked) do
def assert_discarded(&block)
assert_job_counts(discarded: 1, &block)
end

def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, discarded: 0, &block)
assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked + discarded) do
assert_difference -> { SolidQueue::ReadyExecution.count }, +ready do
assert_difference -> { SolidQueue::ScheduledExecution.count }, +scheduled do
assert_difference -> { SolidQueue::BlockedExecution.count }, +blocked, &block
Expand Down
Loading