Skip to content

Commit f290af1

Browse files
Add capability to discard duplicate jobs with concurrency configuration
1 parent ca81cc0 commit f290af1

File tree

5 files changed

+74
-5
lines changed

5 files changed

+74
-5
lines changed

app/models/solid_queue/job/concurrency_controls.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ module ConcurrencyControls
88
included do
99
has_one :blocked_execution
1010

11-
delegate :concurrency_limit, :concurrency_duration, to: :job_class
11+
delegate :concurrency_limit, :concurrency_on_duplicate, :concurrency_duration, to: :job_class
1212

1313
before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? }
1414
end
@@ -34,6 +34,14 @@ def blocked?
3434
end
3535

3636
private
37+
def duplicate?
38+
Semaphore.at_limit?(self)
39+
end
40+
41+
def discard_on_duplicate?
42+
concurrency_on_duplicate == :discard && duplicate?
43+
end
44+
3745
def acquire_concurrency_lock
3846
return true unless concurrency_limited?
3947

app/models/solid_queue/job/executable.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ def prepare_for_execution
6565
end
6666

6767
def dispatch
68-
if acquire_concurrency_lock then ready
68+
if discard_on_duplicate? then discard
69+
elsif acquire_concurrency_lock then ready
6970
else
7071
block
7172
end

app/models/solid_queue/semaphore.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ def wait(job)
1010
Proxy.new(job).wait
1111
end
1212

13+
def at_limit?(job)
14+
Proxy.new(job).at_limit?
15+
end
16+
1317
def signal(job)
1418
Proxy.new(job).signal
1519
end
@@ -39,6 +43,14 @@ def initialize(job)
3943
@job = job
4044
end
4145

46+
def at_limit?
47+
if semaphore = Semaphore.find_by(key: key)
48+
semaphore.value.zero?
49+
else
50+
false
51+
end
52+
end
53+
4254
def wait
4355
if semaphore = Semaphore.find_by(key: key)
4456
semaphore.value > 0 && attempt_decrement

lib/active_job/concurrency_controls.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,17 @@ module ConcurrencyControls
1111
class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false
1212

1313
class_attribute :concurrency_limit
14+
class_attribute :concurrency_on_duplicate
1415
class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period
1516
end
1617

1718
class_methods do
18-
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period)
19+
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_duplicate: :block)
1920
self.concurrency_key = key
2021
self.concurrency_limit = to
2122
self.concurrency_group = group
2223
self.concurrency_duration = duration
24+
self.concurrency_on_duplicate = on_duplicate
2325
end
2426
end
2527

test/models/solid_queue/job_test.rb

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,14 @@ def perform(job_result)
1010
end
1111
end
1212

13+
class DiscardedNonOverlappingJob < NonOverlappingJob
14+
limits_concurrency key: ->(job_result, **) { job_result }, on_duplicate: :discard
15+
end
16+
17+
class DiscardedOverlappingJob < NonOverlappingJob
18+
limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_duplicate: :discard
19+
end
20+
1321
class NonOverlappingGroupedJob1 < NonOverlappingJob
1422
limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup"
1523
end
@@ -98,6 +106,40 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
98106
assert_equal active_job.concurrency_key, job.concurrency_key
99107
end
100108

109+
test "enqueue jobs with discarding concurrency controls" do
110+
assert_ready do
111+
active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A")
112+
assert_equal 1, active_job.concurrency_limit
113+
assert_equal "SolidQueue::JobTest::DiscardedNonOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key
114+
end
115+
116+
assert_discarded do
117+
active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A")
118+
assert_equal 1, active_job.concurrency_limit
119+
assert_equal "SolidQueue::JobTest::DiscardedNonOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key
120+
end
121+
end
122+
123+
test "enqueue jobs with discarding concurrency controls when below limit" do
124+
assert_ready do
125+
active_job = DiscardedOverlappingJob.perform_later(@result, name: "A")
126+
assert_equal 2, active_job.concurrency_limit
127+
assert_equal "SolidQueue::JobTest::DiscardedOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key
128+
end
129+
130+
assert_ready do
131+
active_job = DiscardedOverlappingJob.perform_later(@result, name: "A")
132+
assert_equal 2, active_job.concurrency_limit
133+
assert_equal "SolidQueue::JobTest::DiscardedOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key
134+
end
135+
136+
assert_discarded do
137+
active_job = DiscardedOverlappingJob.perform_later(@result, name: "A")
138+
assert_equal 2, active_job.concurrency_limit
139+
assert_equal "SolidQueue::JobTest::DiscardedOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key
140+
end
141+
end
142+
101143
test "enqueue jobs with concurrency controls in the same concurrency group" do
102144
assert_ready do
103145
active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A")
@@ -289,8 +331,12 @@ def assert_blocked(&block)
289331
assert SolidQueue::Job.last.blocked?
290332
end
291333

292-
def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, &block)
293-
assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked) do
334+
def assert_discarded(&block)
335+
assert_job_counts(discarded: 1, &block)
336+
end
337+
338+
def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, discarded: 0, &block)
339+
assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked + discarded) do
294340
assert_difference -> { SolidQueue::ReadyExecution.count }, +ready do
295341
assert_difference -> { SolidQueue::ScheduledExecution.count }, +scheduled do
296342
assert_difference -> { SolidQueue::BlockedExecution.count }, +blocked, &block

0 commit comments

Comments
 (0)