Skip to content

Commit bfba37e

Browse files
committed
Implement a first, working but ugly version of perform_all_later
Supporting concurrency-limited, scheduled and immediate jobs.
1 parent 93cccd9 commit bfba37e

File tree

5 files changed

+80
-14
lines changed

5 files changed

+80
-14
lines changed

app/models/solid_queue/job.rb

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,54 @@ class SolidQueue::Job < SolidQueue::Record
1313
DEFAULT_QUEUE_NAME = "default"
1414

1515
class << self
16-
def enqueue_active_job(active_job, scheduled_at: Time.current)
17-
enqueue \
18-
queue_name: active_job.queue_name,
19-
active_job_id: active_job.job_id,
20-
priority: active_job.priority,
21-
scheduled_at: scheduled_at,
22-
class_name: active_job.class.name,
23-
arguments: active_job.serialize,
24-
concurrency_key: active_job.try(:concurrency_key)
25-
end
16+
def enqueue_active_jobs(active_jobs)
17+
scheduled_jobs, immediate_jobs = active_jobs.partition(&:scheduled_at)
18+
with_concurrency_limits, without_concurrency_limits = immediate_jobs.partition(&:concurrency_limited?)
19+
20+
with_concurrency_limits.each do |active_job|
21+
enqueue_active_job(active_job)
22+
end
2623

27-
def enqueue(**kwargs)
28-
create!(**kwargs.compact.with_defaults(defaults)).tap do
29-
SolidQueue.logger.debug "[SolidQueue] Enqueued job #{kwargs}"
24+
transaction do
25+
job_rows = scheduled_jobs.map { |job| attributes_from_active_job(job) }
26+
self.insert_all(job_rows)
27+
inserted_jobs = where(active_job_id: scheduled_jobs.map(&:job_id))
28+
execution_rows = inserted_jobs.map { |job| job.attributes.slice("queue_name", "priority", "scheduled_at").merge(job_id: job.id) }
29+
SolidQueue::ScheduledExecution.insert_all(execution_rows)
30+
end
31+
32+
transaction do
33+
job_rows = without_concurrency_limits.map { |job| attributes_from_active_job(job) }
34+
self.insert_all(job_rows)
35+
inserted_jobs = where(active_job_id: without_concurrency_limits.map(&:job_id))
36+
execution_rows = inserted_jobs.map { |job| job.attributes.slice("queue_name", "priority").merge(job_id: job.id) }
37+
SolidQueue::ReadyExecution.insert_all(execution_rows)
3038
end
3139
end
3240

41+
def enqueue_active_job(active_job, scheduled_at: Time.current)
42+
enqueue **attributes_from_active_job(active_job).reverse_merge(scheduled_at: scheduled_at)
43+
end
44+
3345
private
46+
def enqueue(**kwargs)
47+
create!(**kwargs).tap do
48+
SolidQueue.logger.debug "[SolidQueue] Enqueued job #{kwargs}"
49+
end
50+
end
51+
52+
def attributes_from_active_job(active_job)
53+
{
54+
queue_name: active_job.queue_name,
55+
active_job_id: active_job.job_id,
56+
priority: active_job.priority,
57+
scheduled_at: active_job.scheduled_at,
58+
class_name: active_job.class.name,
59+
arguments: active_job.serialize,
60+
concurrency_key: active_job.concurrency_key
61+
}.compact.with_defaults(defaults)
62+
end
63+
3464
def defaults
3565
{ queue_name: DEFAULT_QUEUE_NAME, priority: DEFAULT_PRIORITY }
3666
end

app/models/solid_queue/job/executable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ module Executable
1717
after_create :prepare_for_execution
1818

1919
scope :finished, -> { where.not(finished_at: nil) }
20-
scope :failed, -> { includes(:failed_execution).where.not(failed_execution: {id: nil}) }
20+
scope :failed, -> { includes(:failed_execution).where.not(failed_execution: { id: nil }) }
2121
end
2222

2323
%w[ ready claimed failed scheduled ].each do |status|

lib/active_job/concurrency_controls.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ def concurrency_key
3636
end
3737
end
3838

39+
def concurrency_limited?
40+
concurrency_key.present?
41+
end
42+
3943
private
4044
def concurrency_group
4145
compute_concurrency_parameter(self.class.concurrency_group)

lib/active_job/queue_adapters/solid_queue_adapter.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ def enqueue_at(active_job, timestamp) # :nodoc:
1919
active_job.provider_job_id = job.id
2020
end
2121
end
22+
23+
def enqueue_all(active_jobs) # :nodoc:
24+
SolidQueue::Job.enqueue_active_jobs(active_jobs)
25+
end
2226
end
2327
end
2428
end

test/models/solid_queue/job_test.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,24 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
102102
end
103103
end
104104

105+
test "enqueue multiple jobs" do
106+
active_jobs = [
107+
AddToBufferJob.new(2),
108+
AddToBufferJob.new(6).set(wait: 2.minutes),
109+
NonOverlappingJob.new(@result),
110+
StoreResultJob.new(42),
111+
AddToBufferJob.new(4),
112+
NonOverlappingGroupedJob1.new(@result),
113+
AddToBufferJob.new(6).set(wait: 3.minutes),
114+
NonOverlappingJob.new(@result),
115+
NonOverlappingGroupedJob2.new(@result)
116+
]
117+
118+
assert_multi(ready: 5, scheduled: 2, blocked: 2) do
119+
ActiveJob.perform_all_later(active_jobs)
120+
end
121+
end
122+
105123
test "block jobs when concurrency limits are reached" do
106124
assert_ready do
107125
NonOverlappingJob.perform_later(@result, name: "A")
@@ -148,4 +166,14 @@ def assert_blocked(&block)
148166
assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::BlockedExecution.count } => +1, &block
149167
end
150168
end
169+
170+
def assert_multi(ready: 0, scheduled: 0, blocked: 0, &block)
171+
assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked) do
172+
assert_difference -> { SolidQueue::ReadyExecution.count }, +ready do
173+
assert_difference -> { SolidQueue::ScheduledExecution.count }, +scheduled do
174+
assert_difference -> { SolidQueue::BlockedExecution.count }, +blocked, &block
175+
end
176+
end
177+
end
178+
end
151179
end

0 commit comments

Comments
 (0)