Skip to content

Commit 9c8984a

Browse files
committed
Tidy up and DRY SolidQueue::Job.enqueue_all_active_jobs
Taking advantage of the new methods created in previous commits.
1 parent 7bdfdf5 commit 9c8984a

File tree

4 files changed

+71
-57
lines changed

4 files changed

+71
-57
lines changed

app/models/solid_queue/job.rb

Lines changed: 56 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,74 @@
11
# frozen_string_literal: true
22

3-
class SolidQueue::Job < SolidQueue::Record
4-
include Executable
3+
module SolidQueue
4+
class Job < Record
5+
include Executable
56

6-
if Gem::Version.new(Rails.version) >= Gem::Version.new("7.1")
7-
serialize :arguments, coder: JSON
8-
else
9-
serialize :arguments, JSON
10-
end
11-
12-
DEFAULT_PRIORITY = 0
13-
DEFAULT_QUEUE_NAME = "default"
7+
if Gem::Version.new(Rails.version) >= Gem::Version.new("7.1")
8+
serialize :arguments, coder: JSON
9+
else
10+
serialize :arguments, JSON
11+
end
1412

15-
class << self
16-
def enqueue_all_active_jobs(active_jobs)
17-
scheduled_jobs, immediate_jobs = active_jobs.partition(&:scheduled_at)
18-
with_concurrency_limits, without_concurrency_limits = immediate_jobs.partition(&:concurrency_limited?)
13+
class << self
14+
def enqueue_all(active_jobs)
15+
scheduled_jobs, immediate_jobs = active_jobs.partition(&:scheduled_at)
16+
with_concurrency_limits, without_concurrency_limits = immediate_jobs.partition(&:concurrency_limited?)
1917

20-
with_concurrency_limits.each do |active_job|
21-
enqueue_active_job(active_job)
18+
schedule_all_at_once(scheduled_jobs)
19+
enqueue_all_at_once(without_concurrency_limits)
20+
enqueue_one_by_one(with_concurrency_limits)
2221
end
2322

24-
transaction do
25-
job_rows = scheduled_jobs.map { |job| attributes_from_active_job(job) }
26-
insert_all(job_rows)
27-
inserted_jobs = where(active_job_id: scheduled_jobs.map(&:job_id))
28-
SolidQueue::ScheduledExecution.create_all_from_jobs(inserted_jobs)
23+
def schedule_all_at_once(active_jobs)
24+
transaction do
25+
inserted_jobs = create_all_from_active_jobs(active_jobs)
26+
schedule_all(inserted_jobs)
27+
end
2928
end
3029

31-
transaction do
32-
job_rows = without_concurrency_limits.map { |job| attributes_from_active_job(job) }
33-
insert_all(job_rows)
34-
inserted_jobs = where(active_job_id: without_concurrency_limits.map(&:job_id))
35-
SolidQueue::ReadyExecution.create_all_from_jobs(inserted_jobs)
30+
def enqueue_all_at_once(active_jobs)
31+
transaction do
32+
inserted_jobs = create_all_from_active_jobs(active_jobs)
33+
dispatch_all_at_once(inserted_jobs)
34+
end
3635
end
37-
end
3836

39-
def enqueue_active_job(active_job, scheduled_at: Time.current)
40-
enqueue **attributes_from_active_job(active_job).reverse_merge(scheduled_at: scheduled_at)
41-
end
37+
def enqueue_one_by_one(active_jobs)
38+
active_jobs.each { |active_job| enqueue(active_job) }
39+
end
4240

43-
private
44-
def enqueue(**kwargs)
45-
create!(**kwargs).tap do
46-
SolidQueue.logger.debug "[SolidQueue] Enqueued job #{kwargs}"
41+
def enqueue(active_job, scheduled_at: Time.current)
42+
create!(**attributes_from_active_job(active_job).reverse_merge(scheduled_at: scheduled_at)).tap do |job|
43+
active_job.provider_job_id = job.id
4744
end
4845
end
4946

50-
def attributes_from_active_job(active_job)
51-
{
52-
queue_name: active_job.queue_name,
53-
active_job_id: active_job.job_id,
54-
priority: active_job.priority,
55-
scheduled_at: active_job.scheduled_at,
56-
class_name: active_job.class.name,
57-
arguments: active_job.serialize,
58-
concurrency_key: active_job.concurrency_key
59-
}.compact.with_defaults(defaults)
60-
end
47+
private
48+
DEFAULT_PRIORITY = 0
49+
DEFAULT_QUEUE_NAME = "default"
6150

62-
def defaults
63-
{ queue_name: DEFAULT_QUEUE_NAME, priority: DEFAULT_PRIORITY }
64-
end
51+
def create_all_from_active_jobs(active_jobs)
52+
job_rows = active_jobs.map { |job| attributes_from_active_job(job) }
53+
insert_all(job_rows)
54+
where(active_job_id: active_jobs.map(&:job_id))
55+
end
56+
57+
def attributes_from_active_job(active_job)
58+
{
59+
queue_name: active_job.queue_name,
60+
active_job_id: active_job.job_id,
61+
priority: active_job.priority,
62+
scheduled_at: active_job.scheduled_at,
63+
class_name: active_job.class.name,
64+
arguments: active_job.serialize,
65+
concurrency_key: active_job.concurrency_key
66+
}.compact.with_defaults(defaults)
67+
end
68+
69+
def defaults
70+
{ queue_name: DEFAULT_QUEUE_NAME, priority: DEFAULT_PRIORITY }
71+
end
72+
end
6573
end
6674
end

app/models/solid_queue/job/executable.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@ module Executable
2020
scope :failed, -> { includes(:failed_execution).where.not(failed_execution: { id: nil }) }
2121
end
2222

23+
class_methods do
24+
def dispatch_all_at_once(jobs)
25+
ReadyExecution.create_all_from_jobs(jobs)
26+
end
27+
28+
def schedule_all(jobs)
29+
ScheduledExecution.create_all_from_jobs(jobs)
30+
end
31+
end
32+
2333
%w[ ready claimed failed scheduled ].each do |status|
2434
define_method("#{status}?") { public_send("#{status}_execution").present? }
2535
end

lib/active_job/queue_adapters/solid_queue_adapter.rb

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,15 @@ module QueueAdapters
99
# Rails.application.config.active_job.queue_adapter = :solid_queue
1010
class SolidQueueAdapter
1111
def enqueue(active_job) # :nodoc:
12-
SolidQueue::Job.enqueue_active_job(active_job).tap do |job|
13-
active_job.provider_job_id = job.id
14-
end
12+
SolidQueue::Job.enqueue(active_job)
1513
end
1614

1715
def enqueue_at(active_job, timestamp) # :nodoc:
18-
SolidQueue::Job.enqueue_active_job(active_job, scheduled_at: Time.at(timestamp)).tap do |job|
19-
active_job.provider_job_id = job.id
20-
end
16+
SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp))
2117
end
2218

2319
def enqueue_all(active_jobs) # :nodoc:
24-
SolidQueue::Job.enqueue_all_active_jobs(active_jobs)
20+
SolidQueue::Job.enqueue_all(active_jobs)
2521
end
2622
end
2723
end

test/models/solid_queue/job_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
3131
active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test")
3232

3333
assert_ready do
34-
SolidQueue::Job.enqueue_active_job(active_job)
34+
SolidQueue::Job.enqueue(active_job)
3535
end
3636

3737
solid_queue_job = SolidQueue::Job.last
@@ -51,7 +51,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
5151
active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test")
5252

5353
assert_scheduled do
54-
SolidQueue::Job.enqueue_active_job(active_job, scheduled_at: 5.minutes.from_now)
54+
SolidQueue::Job.enqueue(active_job, scheduled_at: 5.minutes.from_now)
5555
end
5656

5757
solid_queue_job = SolidQueue::Job.last

0 commit comments

Comments
 (0)