Skip to content

Commit 702bdea

Browse files
committed
Enqueue all jobs in perform_all_later within the same transaction
This allows even further DRYing and simplification, as we can unify the dispatching done when moving a scheduled job batch to ready, having to deal with concurrency-limited jobs, and the same thing when enqueuing multiple jobs.
1 parent 9c8984a commit 702bdea

File tree

3 files changed

+38
-48
lines changed

3 files changed

+38
-48
lines changed

app/models/solid_queue/job.rb

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,16 @@ class Job < Record
1212

1313
class << self
1414
def enqueue_all(active_jobs)
15-
scheduled_jobs, immediate_jobs = active_jobs.partition(&:scheduled_at)
16-
with_concurrency_limits, without_concurrency_limits = immediate_jobs.partition(&:concurrency_limited?)
17-
18-
schedule_all_at_once(scheduled_jobs)
19-
enqueue_all_at_once(without_concurrency_limits)
20-
enqueue_one_by_one(with_concurrency_limits)
21-
end
22-
23-
def schedule_all_at_once(active_jobs)
24-
transaction do
25-
inserted_jobs = create_all_from_active_jobs(active_jobs)
26-
schedule_all(inserted_jobs)
27-
end
28-
end
29-
30-
def enqueue_all_at_once(active_jobs)
3115
transaction do
32-
inserted_jobs = create_all_from_active_jobs(active_jobs)
33-
dispatch_all_at_once(inserted_jobs)
16+
jobs = create_all_from_active_jobs(active_jobs)
17+
prepare_all_for_execution(jobs)
3418
end
3519
end
3620

37-
def enqueue_one_by_one(active_jobs)
38-
active_jobs.each { |active_job| enqueue(active_job) }
39-
end
40-
4121
def enqueue(active_job, scheduled_at: Time.current)
42-
create!(**attributes_from_active_job(active_job).reverse_merge(scheduled_at: scheduled_at)).tap do |job|
22+
active_job.scheduled_at = scheduled_at
23+
24+
create_from_active_job(active_job).tap do |job|
4325
active_job.provider_job_id = job.id
4426
end
4527
end
@@ -48,6 +30,10 @@ def enqueue(active_job, scheduled_at: Time.current)
4830
DEFAULT_PRIORITY = 0
4931
DEFAULT_QUEUE_NAME = "default"
5032

33+
def create_from_active_job(active_job)
34+
create!(**attributes_from_active_job(active_job))
35+
end
36+
5137
def create_all_from_active_jobs(active_jobs)
5238
job_rows = active_jobs.map { |job| attributes_from_active_job(job) }
5339
insert_all(job_rows)
@@ -56,18 +42,14 @@ def create_all_from_active_jobs(active_jobs)
5642

5743
def attributes_from_active_job(active_job)
5844
{
59-
queue_name: active_job.queue_name,
45+
queue_name: active_job.queue_name || DEFAULT_QUEUE_NAME,
6046
active_job_id: active_job.job_id,
61-
priority: active_job.priority,
47+
priority: active_job.priority || DEFAULT_PRIORITY,
6248
scheduled_at: active_job.scheduled_at,
6349
class_name: active_job.class.name,
6450
arguments: active_job.serialize,
6551
concurrency_key: active_job.concurrency_key
66-
}.compact.with_defaults(defaults)
67-
end
68-
69-
def defaults
70-
{ queue_name: DEFAULT_QUEUE_NAME, priority: DEFAULT_PRIORITY }
52+
}
7153
end
7254
end
7355
end

app/models/solid_queue/job/executable.rb

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,32 @@ module Executable
2121
end
2222

2323
class_methods do
24-
def dispatch_all_at_once(jobs)
25-
ReadyExecution.create_all_from_jobs(jobs)
24+
def prepare_all_for_execution(jobs)
25+
due, not_yet_due = jobs.partition(&:due?)
26+
27+
dispatch_all(due)
28+
schedule_all(not_yet_due)
29+
end
30+
31+
def dispatch_all(jobs)
32+
with_concurrency_limits, without_concurrency_limits = jobs.partition(&:concurrency_limited?)
33+
34+
dispatch_all_at_once(without_concurrency_limits)
35+
dispatch_all_one_by_one(with_concurrency_limits)
2636
end
2737

2838
def schedule_all(jobs)
2939
ScheduledExecution.create_all_from_jobs(jobs)
3040
end
41+
42+
private
43+
def dispatch_all_at_once(jobs)
44+
ReadyExecution.create_all_from_jobs jobs
45+
end
46+
47+
def dispatch_all_one_by_one(jobs)
48+
jobs.each(&:dispatch)
49+
end
3150
end
3251

3352
%w[ ready claimed failed scheduled ].each do |status|
@@ -60,6 +79,10 @@ def finished?
6079
finished_at.present?
6180
end
6281

82+
def due?
83+
scheduled_at.nil? || scheduled_at <= Time.current
84+
end
85+
6386
def discard
6487
destroy unless claimed?
6588
end
@@ -73,10 +96,6 @@ def failed_with(exception)
7396
end
7497

7598
private
76-
def due?
77-
scheduled_at.nil? || scheduled_at <= Time.current
78-
end
79-
8099
def schedule
81100
ScheduledExecution.create_or_find_by!(job_id: id)
82101
end

app/models/solid_queue/scheduled_execution.rb

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,14 @@ def dispatch_next_batch(batch_size)
2222
private
2323
def dispatch_batch(job_ids)
2424
jobs = Job.where(id: job_ids)
25-
with_concurrency_limits, without_concurrency_limits = jobs.partition(&:concurrency_limited?)
26-
27-
dispatch_at_once(without_concurrency_limits)
28-
dispatch_one_by_one(with_concurrency_limits)
25+
Job.dispatch_all(jobs)
2926

3027
successfully_dispatched(job_ids).tap do |dispatched_job_ids|
3128
where(job_id: dispatched_job_ids).delete_all
3229
SolidQueue.logger.info("[SolidQueue] Dispatched scheduled batch with #{dispatched_job_ids.size} jobs")
3330
end
3431
end
3532

36-
def dispatch_at_once(jobs)
37-
ReadyExecution.create_all_from_jobs jobs
38-
end
39-
40-
def dispatch_one_by_one(jobs)
41-
jobs.each(&:dispatch)
42-
end
43-
4433
def successfully_dispatched(job_ids)
4534
dispatched_and_ready(job_ids) + dispatched_and_blocked(job_ids)
4635
end

0 commit comments

Comments
 (0)