Skip to content

Commit a2e2e43

Browse files
joelzwarringtonrosa
authored andcommitted
Add capability to discard duplicate jobs with concurrency configuration
Remove 'duplicate' verbiage and use concurrency limits instead, simplify control flow Fix race condition vulnerability by changing logic to enqueue Add assertions when bulk enqueuing jobs with concurrency controls Dispatch jobs in the order they were enqueued Set ActiveJob successfully_enqueued for both enqueued/blocked and discarded jobs Change concurrency 'at_limit' -> 'on_conflict' Update discard logic to trigger an ActiveRecord rollback when attempting dispatch to prevent discarded job creation Change default on_conflict concurrency option to old behaviour (blocking execution) Add concurrent on_conflict documentation to README Add test for discarding grouped concurrent jobs Fix tests which expect raising enqueue errors Add test to confirm scheduled jobs are also discarded
1 parent 37ae2ce commit a2e2e43

File tree

6 files changed

+187
-19
lines changed

6 files changed

+187
-19
lines changed

README.md

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,18 +428,21 @@ In the case of recurring tasks, if such error is raised when enqueuing the job c
428428

429429
## Concurrency controls
430430

431-
Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs are never discarded or lost, only blocked.
431+
Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs can can be configured to either be discarded or blocked.
432432

433433
```ruby
434434
class MyJob < ApplicationJob
435-
limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group
435+
limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group, on_conflict: conflict_behaviour
436436
437437
# ...
438438
```
439439
- `key` is the only required parameter, and it can be a symbol, a string or a proc that receives the job arguments as parameters and will be used to identify the jobs that need to be limited together. If the proc returns an Active Record record, the key will be built from its class name and `id`.
440440
- `to` is `1` by default.
441441
- `duration` is set to `SolidQueue.default_concurrency_control_period` by default, which itself defaults to `3 minutes`, but that you can configure as well.
442442
- `group` is used to control the concurrency of different job classes together. It defaults to the job class name.
443+
- `on_conflict` controls behaviour when enqueuing a job which is above the max concurrent executions for your configuration.
444+
- (default) `:block`; the job is blocked and is dispatched until another job completes and unblocks it
445+
- `:discard`; the job is discarded
443446

444447
When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping).
445448

@@ -482,6 +485,31 @@ Jobs are unblocked in order of priority but queue order is not taken into accoun
482485

483486
Finally, failed jobs that are automatically or manually retried work in the same way as new jobs that get enqueued: they get in the queue for getting an open semaphore, and whenever they get it, they'll be run. It doesn't matter if they had already gotten an open semaphore in the past.
484487

488+
### Discarding conflicting jobs
489+
490+
When configuring `on_conflict` with `:discard`, jobs enqueued above the concurrent execution limit are discarded and failed to be enqueued.
491+
492+
```ruby
493+
class ConcurrentJob < ApplicationJob
494+
limits_concurrency key: ->(record) { record }, on_conflict: :discard
495+
496+
def perform(user); end
497+
end
498+
499+
enqueued_job = ConcurrentJob.perform_later(record)
500+
# => instance of ConcurrentJob
501+
enqueued_job.successfully_enqueued?
502+
# => true
503+
504+
second_enqueued_job = ConcurrentJob.perform_later(record) do |job|
505+
job.successfully_enqueued?
506+
# => false
507+
end
508+
509+
second_enqueued_job
510+
# => false
511+
```
512+
485513
### Performance considerations
486514

487515
Concurrency controls introduce significant overhead (blocked executions need to be created and promoted to ready, semaphores need to be created and updated) so you should consider carefully whether you need them. For throttling purposes, where you plan to have `limit` significantly larger than 1, I'd encourage relying on a limited number of workers per queue instead. For example:
@@ -505,6 +533,10 @@ production:
505533

506534
Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues).
507535

536+
### Discarding concurrent jobs
537+
538+
539+
508540
## Failed jobs and retries
509541

510542
Solid Queue doesn't include any automatic retry mechanism, it [relies on Active Job for this](https://edgeguides.rubyonrails.org/active_job_basics.html#retrying-or-discarding-failed-jobs). Jobs that fail will be kept in the system, and a _failed execution_ (a record in the `solid_queue_failed_executions` table) will be created for these. The job will stay there until manually discarded or re-enqueued. You can do this in a console as:

app/models/solid_queue/job.rb

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,32 @@
22

33
module SolidQueue
44
class Job < Record
5-
class EnqueueError < StandardError; end
5+
class EnqueueError < ActiveJob::EnqueueError; end
66

77
include Executable, Clearable, Recurrable
88

99
serialize :arguments, coder: JSON
1010

1111
class << self
1212
def enqueue_all(active_jobs)
13-
active_jobs_by_job_id = active_jobs.index_by(&:job_id)
13+
enqueued_jobs_count = 0
1414

1515
transaction do
1616
jobs = create_all_from_active_jobs(active_jobs)
17-
prepare_all_for_execution(jobs).tap do |enqueued_jobs|
18-
enqueued_jobs.each do |enqueued_job|
19-
active_jobs_by_job_id[enqueued_job.active_job_id].provider_job_id = enqueued_job.id
20-
active_jobs_by_job_id[enqueued_job.active_job_id].successfully_enqueued = true
21-
end
17+
prepare_all_for_execution(jobs)
18+
jobs_by_active_job_id = jobs.index_by(&:active_job_id)
19+
20+
active_jobs.each do |active_job|
21+
job = jobs_by_active_job_id[active_job.job_id]
22+
23+
active_job.provider_job_id = job&.id
24+
active_job.enqueue_error = job&.enqueue_error
25+
active_job.successfully_enqueued = job.present? && job.enqueue_error.nil?
26+
enqueued_jobs_count += 1 if active_job.successfully_enqueued?
2227
end
2328
end
2429

25-
active_jobs.count(&:successfully_enqueued?)
30+
enqueued_jobs_count
2631
end
2732

2833
def enqueue(active_job, scheduled_at: Time.current)
@@ -49,7 +54,7 @@ def create_from_active_job(active_job)
4954
def create_all_from_active_jobs(active_jobs)
5055
job_rows = active_jobs.map { |job| attributes_from_active_job(job) }
5156
insert_all(job_rows)
52-
where(active_job_id: active_jobs.map(&:job_id))
57+
where(active_job_id: active_jobs.map(&:job_id)).order(id: :asc)
5358
end
5459

5560
def attributes_from_active_job(active_job)

app/models/solid_queue/job/concurrency_controls.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ module ConcurrencyControls
88
included do
99
has_one :blocked_execution
1010

11-
delegate :concurrency_limit, :concurrency_duration, to: :job_class
11+
delegate :concurrency_limit, :concurrency_on_conflict, :concurrency_duration, to: :job_class
1212

1313
before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? }
1414
end
@@ -34,6 +34,10 @@ def blocked?
3434
end
3535

3636
private
37+
def discard_concurrent?
38+
concurrency_on_conflict == :discard
39+
end
40+
3741
def acquire_concurrency_lock
3842
return true unless concurrency_limited?
3943

app/models/solid_queue/job/executable.rb

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ module Executable
1313

1414
after_create :prepare_for_execution
1515

16+
attr_accessor :enqueue_error
17+
1618
scope :finished, -> { where.not(finished_at: nil) }
1719
end
1820

@@ -37,7 +39,13 @@ def dispatch_all_at_once(jobs)
3739
end
3840

3941
def dispatch_all_one_by_one(jobs)
40-
jobs.each(&:dispatch)
42+
jobs.each do |job|
43+
begin
44+
job.dispatch
45+
rescue EnqueueError => e
46+
job.enqueue_error = e
47+
end
48+
end
4149
end
4250

4351
def successfully_dispatched(jobs)
@@ -66,6 +74,9 @@ def prepare_for_execution
6674

6775
def dispatch
6876
if acquire_concurrency_lock then ready
77+
elsif discard_concurrent?
78+
discard
79+
raise EnqueueError.new("Dispatched job discarded due to concurrent configuration.")
6980
else
7081
block
7182
end

lib/active_job/concurrency_controls.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,17 @@ module ConcurrencyControls
1111
class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false
1212

1313
class_attribute :concurrency_limit
14+
class_attribute :concurrency_on_conflict
1415
class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period
1516
end
1617

1718
class_methods do
18-
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period)
19+
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block)
1920
self.concurrency_key = key
2021
self.concurrency_limit = to
2122
self.concurrency_group = group
2223
self.concurrency_duration = duration
24+
self.concurrency_on_conflict = on_conflict
2325
end
2426
end
2527

test/models/solid_queue/job_test.rb

Lines changed: 119 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,14 @@ def perform(job_result)
1010
end
1111
end
1212

13+
class DiscardedNonOverlappingJob < NonOverlappingJob
14+
limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard
15+
end
16+
17+
class DiscardedOverlappingJob < NonOverlappingJob
18+
limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :discard
19+
end
20+
1321
class NonOverlappingGroupedJob1 < NonOverlappingJob
1422
limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup"
1523
end
@@ -18,8 +26,19 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
1826
limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup"
1927
end
2028

29+
class DiscardedNonOverlappingGroupedJob1 < NonOverlappingJob
30+
limits_concurrency key: ->(job_result, **) { job_result }, group: "DiscardingGroup", on_conflict: :discard
31+
end
32+
33+
class DiscardedNonOverlappingGroupedJob2 < NonOverlappingJob
34+
limits_concurrency key: ->(job_result, **) { job_result }, group: "DiscardingGroup", on_conflict: :discard
35+
end
36+
2137
setup do
2238
@result = JobResult.create!(queue_name: "default")
39+
@discarded_concurrent_error = SolidQueue::Job::EnqueueError.new(
40+
"Dispatched job discarded due to concurrent configuration."
41+
)
2342
end
2443

2544
test "enqueue active job to be executed right away" do
@@ -98,6 +117,78 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
98117
assert_equal active_job.concurrency_key, job.concurrency_key
99118
end
100119

120+
test "enqueue jobs with discarding concurrency controls" do
121+
assert_ready do
122+
active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A")
123+
assert active_job.successfully_enqueued?
124+
125+
assert_not DiscardedNonOverlappingJob.perform_later(@result, name: "B") do |overlapping_active_job|
126+
assert_not overlapping_active_job.successfully_enqueued?
127+
assert_equal @discarded_concurrent_error, overlapping_active_job.enqueue_error
128+
end
129+
end
130+
end
131+
132+
test "enqueue scheduled job with discarding concurrency controls" do
133+
assert_ready do
134+
active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A")
135+
assert active_job.successfully_enqueued?
136+
end
137+
138+
scheduled_job_id = nil
139+
140+
assert_scheduled do
141+
scheduled_active_job = DiscardedNonOverlappingJob.set(wait: 0.5.seconds).perform_later(@result, name: "B")
142+
assert scheduled_active_job.successfully_enqueued?
143+
assert_nil scheduled_active_job.enqueue_error
144+
145+
scheduled_job_id = scheduled_active_job.provider_job_id
146+
end
147+
148+
scheduled_job = SolidQueue::Job.find(scheduled_job_id)
149+
wait_for { scheduled_job.due? }
150+
151+
dispatched = SolidQueue::ScheduledExecution.dispatch_next_batch(10)
152+
assert_equal 0, dispatched
153+
assert_raises(ActiveRecord::RecordNotFound) { scheduled_job.reload }
154+
end
155+
156+
test "enqueues jobs in bulk with discarding concurrency controls" do
157+
jobs = [
158+
job_1 = DiscardedNonOverlappingJob.new(@result, name: "A"),
159+
job_2 = DiscardedNonOverlappingJob.new(@result, name: "B")
160+
]
161+
162+
assert_job_counts(ready: 1, discarded: 1) do
163+
enqueued_jobs_count = SolidQueue::Job.enqueue_all(jobs)
164+
assert_equal enqueued_jobs_count, 1
165+
end
166+
167+
assert job_1.successfully_enqueued?
168+
assert_not job_2.successfully_enqueued?
169+
assert_equal SolidQueue::Job::EnqueueError, job_2.enqueue_error.class
170+
assert_equal @discarded_concurrent_error.message, job_2.enqueue_error.message
171+
end
172+
173+
test "enqueue jobs with discarding concurrency controls when below limit" do
174+
assert_job_counts(ready: 2) do
175+
assert_ready do
176+
active_job = DiscardedOverlappingJob.perform_later(@result, name: "A")
177+
assert active_job.successfully_enqueued?
178+
end
179+
180+
assert_ready do
181+
active_job = DiscardedOverlappingJob.perform_later(@result, name: "B")
182+
assert active_job.successfully_enqueued?
183+
end
184+
185+
assert_not DiscardedOverlappingJob.perform_later(@result, name: "C") do |overlapping_active_job|
186+
assert_not overlapping_active_job.successfully_enqueued?
187+
assert_equal @discarded_concurrent_error, overlapping_active_job.enqueue_error
188+
end
189+
end
190+
end
191+
101192
test "enqueue jobs with concurrency controls in the same concurrency group" do
102193
assert_ready do
103194
active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A")
@@ -112,6 +203,23 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
112203
end
113204
end
114205

206+
test "enqueue jobs with discarding concurrency controls in the same concurrency group" do
207+
assert_job_counts(ready: 1) do
208+
assert_ready do
209+
active_job = DiscardedNonOverlappingGroupedJob1.perform_later(@result, name: "A")
210+
assert active_job.successfully_enqueued?
211+
assert_equal 1, active_job.concurrency_limit
212+
assert_equal "DiscardingGroup/JobResult/#{@result.id}", active_job.concurrency_key
213+
end
214+
215+
assert_not DiscardedNonOverlappingGroupedJob2.perform_later(@result, name: "B") do |blocked_active_job|
216+
assert_not blocked_active_job.successfully_enqueued?
217+
assert_equal 1, blocked_active_job.concurrency_limit
218+
assert_equal "DiscardingGroup/JobResult/#{@result.id}", blocked_active_job.concurrency_key
219+
end
220+
end
221+
end
222+
115223
test "enqueue multiple jobs" do
116224
active_jobs = [
117225
AddToBufferJob.new(2),
@@ -249,13 +357,15 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
249357
test "raise EnqueueError when there's an ActiveRecordError" do
250358
SolidQueue::Job.stubs(:create!).raises(ActiveRecord::Deadlocked)
251359

252-
active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test")
253360
assert_raises SolidQueue::Job::EnqueueError do
361+
active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test")
254362
SolidQueue::Job.enqueue(active_job)
255363
end
256364

257-
assert_raises SolidQueue::Job::EnqueueError do
258-
AddToBufferJob.perform_later(1)
365+
# #perform_later doesn't raise ActiveJob::EnqueueError, and instead set's successfully_enqueued? to false
366+
assert_not AddToBufferJob.perform_later(1) do |active_job|
367+
assert_not active_job.successfully_enqueued?
368+
assert_equal SolidQueue::Job::EnqueueError, active_job.enqueue_error.class
259369
end
260370
end
261371

@@ -291,8 +401,12 @@ def assert_blocked(&block)
291401
assert SolidQueue::Job.last.blocked?
292402
end
293403

294-
def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, &block)
295-
assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked) do
404+
def assert_discarded(&block)
405+
assert_job_counts(discarded: 1, &block)
406+
end
407+
408+
def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, discarded: 0, &block)
409+
assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked + discarded) do
296410
assert_difference -> { SolidQueue::ReadyExecution.count }, +ready do
297411
assert_difference -> { SolidQueue::ScheduledExecution.count }, +scheduled do
298412
assert_difference -> { SolidQueue::BlockedExecution.count }, +blocked, &block

0 commit comments

Comments
 (0)