Skip to content

Commit d903c82

Browse files
authored
Merge pull request #93 from basecamp/multi-enqueue
Implement `perform_all_later` via `enqueue_all`
2 parents 5b1ba0b + 5642d01 commit d903c82

File tree

11 files changed

+209
-91
lines changed

11 files changed

+209
-91
lines changed

app/models/solid_queue/blocked_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
module SolidQueue
44
class BlockedExecution < Execution
5-
assume_attributes_from_job :concurrency_key
5+
assumes_attributes_from_job :concurrency_key
66
before_create :set_expires_at
77

88
has_one :semaphore, foreign_key: :key, primary_key: :concurrency_key

app/models/solid_queue/execution.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,17 @@ class Execution < Record
1111
belongs_to :job
1212

1313
alias_method :discard, :destroy
14+
15+
class << self
16+
def create_all_from_jobs(jobs)
17+
insert_all execution_data_from_jobs(jobs)
18+
end
19+
20+
def execution_data_from_jobs(jobs)
21+
jobs.collect do |job|
22+
attributes_from_job(job).merge(job_id: job.id)
23+
end
24+
end
25+
end
1426
end
1527
end

app/models/solid_queue/execution/job_attributes.rb

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,24 @@ class Execution
55
module JobAttributes
66
extend ActiveSupport::Concern
77

8-
ASSUMIBLE_ATTRIBUTES_FROM_JOB = %i[ queue_name priority ]
8+
included do
9+
class_attribute :assumible_attributes_from_job, instance_accessor: false, default: %i[ queue_name priority ]
10+
end
911

1012
class_methods do
11-
def assume_attributes_from_job(*attributes)
12-
before_create -> { assume_attributes_from_job(ASSUMIBLE_ATTRIBUTES_FROM_JOB | attributes) }
13+
def assumes_attributes_from_job(*attribute_names)
14+
self.assumible_attributes_from_job |= attribute_names
15+
before_create -> { assume_attributes_from_job }
16+
end
17+
18+
def attributes_from_job(job)
19+
job.attributes.symbolize_keys.slice(*assumible_attributes_from_job)
1320
end
1421
end
1522

1623
private
17-
def assume_attributes_from_job(attributes)
18-
attributes.each do |attribute|
24+
def assume_attributes_from_job
25+
self.class.assumible_attributes_from_job.each do |attribute|
1926
send("#{attribute}=", job.send(attribute))
2027
end
2128
end

app/models/solid_queue/job.rb

Lines changed: 52 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,62 @@
11
# frozen_string_literal: true
22

3-
class SolidQueue::Job < SolidQueue::Record
4-
include Executable
3+
module SolidQueue
4+
class Job < Record
5+
include Executable
56

6-
if Gem::Version.new(Rails.version) >= Gem::Version.new("7.1")
7-
serialize :arguments, coder: JSON
8-
else
9-
serialize :arguments, JSON
10-
end
11-
12-
DEFAULT_PRIORITY = 0
13-
DEFAULT_QUEUE_NAME = "default"
14-
15-
class << self
16-
def enqueue_active_job(active_job, scheduled_at: Time.current)
17-
enqueue \
18-
queue_name: active_job.queue_name,
19-
active_job_id: active_job.job_id,
20-
priority: active_job.priority,
21-
scheduled_at: scheduled_at,
22-
class_name: active_job.class.name,
23-
arguments: active_job.serialize,
24-
concurrency_key: active_job.try(:concurrency_key)
7+
if Gem::Version.new(Rails.version) >= Gem::Version.new("7.1")
8+
serialize :arguments, coder: JSON
9+
else
10+
serialize :arguments, JSON
2511
end
2612

27-
def enqueue(**kwargs)
28-
create!(**kwargs.compact.with_defaults(defaults)).tap do
29-
SolidQueue.logger.debug "[SolidQueue] Enqueued job #{kwargs}"
13+
class << self
14+
def enqueue_all(active_jobs)
15+
active_jobs_by_job_id = active_jobs.index_by(&:job_id)
16+
17+
transaction do
18+
jobs = create_all_from_active_jobs(active_jobs)
19+
prepare_all_for_execution(jobs).tap do |enqueued_jobs|
20+
enqueued_jobs.each do |enqueued_job|
21+
active_jobs_by_job_id[enqueued_job.active_job_id].provider_job_id = enqueued_job.id
22+
end
23+
end
24+
end
3025
end
31-
end
3226

33-
private
34-
def defaults
35-
{ queue_name: DEFAULT_QUEUE_NAME, priority: DEFAULT_PRIORITY }
27+
def enqueue(active_job, scheduled_at: Time.current)
28+
active_job.scheduled_at = scheduled_at
29+
30+
create_from_active_job(active_job).tap do |enqueued_job|
31+
active_job.provider_job_id = enqueued_job.id
32+
end
3633
end
34+
35+
private
36+
DEFAULT_PRIORITY = 0
37+
DEFAULT_QUEUE_NAME = "default"
38+
39+
def create_from_active_job(active_job)
40+
create!(**attributes_from_active_job(active_job))
41+
end
42+
43+
def create_all_from_active_jobs(active_jobs)
44+
job_rows = active_jobs.map { |job| attributes_from_active_job(job) }
45+
insert_all(job_rows)
46+
where(active_job_id: active_jobs.map(&:job_id))
47+
end
48+
49+
def attributes_from_active_job(active_job)
50+
{
51+
queue_name: active_job.queue_name || DEFAULT_QUEUE_NAME,
52+
active_job_id: active_job.job_id,
53+
priority: active_job.priority || DEFAULT_PRIORITY,
54+
scheduled_at: active_job.scheduled_at,
55+
class_name: active_job.class.name,
56+
arguments: active_job.serialize,
57+
concurrency_key: active_job.concurrency_key
58+
}
59+
end
60+
end
3761
end
3862
end

app/models/solid_queue/job/executable.rb

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,56 @@ module Executable
66
extend ActiveSupport::Concern
77

88
included do
9-
include Clearable, ConcurrencyControls
9+
include Clearable, ConcurrencyControls, Schedulable
1010

1111
has_one :ready_execution, dependent: :destroy
1212
has_one :claimed_execution, dependent: :destroy
1313
has_one :failed_execution, dependent: :destroy
1414

15-
has_one :scheduled_execution, dependent: :destroy
16-
1715
after_create :prepare_for_execution
1816

1917
scope :finished, -> { where.not(finished_at: nil) }
20-
scope :failed, -> { includes(:failed_execution).where.not(failed_execution: {id: nil}) }
18+
scope :failed, -> { includes(:failed_execution).where.not(failed_execution: { id: nil }) }
2119
end
2220

23-
%w[ ready claimed failed scheduled ].each do |status|
21+
class_methods do
22+
def prepare_all_for_execution(jobs)
23+
due, not_yet_due = jobs.partition(&:due?)
24+
dispatch_all(due) + schedule_all(not_yet_due)
25+
end
26+
27+
def dispatch_all(jobs)
28+
with_concurrency_limits, without_concurrency_limits = jobs.partition(&:concurrency_limited?)
29+
30+
dispatch_all_at_once(without_concurrency_limits)
31+
dispatch_all_one_by_one(with_concurrency_limits)
32+
33+
successfully_dispatched(jobs)
34+
end
35+
36+
private
37+
def dispatch_all_at_once(jobs)
38+
ReadyExecution.create_all_from_jobs jobs
39+
end
40+
41+
def dispatch_all_one_by_one(jobs)
42+
jobs.each(&:dispatch)
43+
end
44+
45+
def successfully_dispatched(jobs)
46+
dispatched_and_ready(jobs) + dispatched_and_blocked(jobs)
47+
end
48+
49+
def dispatched_and_ready(jobs)
50+
where(id: ReadyExecution.where(job_id: jobs.map(&:id)).pluck(:job_id))
51+
end
52+
53+
def dispatched_and_blocked(jobs)
54+
where(id: BlockedExecution.where(job_id: jobs.map(&:id)).pluck(:job_id))
55+
end
56+
end
57+
58+
%w[ ready claimed failed ].each do |status|
2459
define_method("#{status}?") { public_send("#{status}_execution").present? }
2560
end
2661

@@ -50,6 +85,10 @@ def finished?
5085
finished_at.present?
5186
end
5287

88+
def due?
89+
scheduled_at.nil? || scheduled_at <= Time.current
90+
end
91+
5392
def discard
5493
destroy unless claimed?
5594
end
@@ -63,14 +102,6 @@ def failed_with(exception)
63102
end
64103

65104
private
66-
def due?
67-
scheduled_at.nil? || scheduled_at <= Time.current
68-
end
69-
70-
def schedule
71-
ScheduledExecution.create_or_find_by!(job_id: id)
72-
end
73-
74105
def ready
75106
ReadyExecution.create_or_find_by!(job_id: id)
76107
end
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Job
5+
module Schedulable
6+
extend ActiveSupport::Concern
7+
8+
included do
9+
has_one :scheduled_execution, dependent: :destroy
10+
11+
scope :scheduled, -> { where.not(finished_at: nil) }
12+
end
13+
14+
class_methods do
15+
def schedule_all(jobs)
16+
schedule_all_at_once(jobs)
17+
successfully_scheduled(jobs)
18+
end
19+
20+
private
21+
def schedule_all_at_once(jobs)
22+
ScheduledExecution.create_all_from_jobs(jobs)
23+
end
24+
25+
def successfully_scheduled(jobs)
26+
where(id: ScheduledExecution.where(job_id: jobs.map(&:id)).pluck(:job_id))
27+
end
28+
end
29+
30+
def due?
31+
scheduled_at.nil? || scheduled_at <= Time.current
32+
end
33+
34+
private
35+
def schedule
36+
ScheduledExecution.create_or_find_by!(job_id: id)
37+
end
38+
end
39+
end
40+
end

app/models/solid_queue/ready_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ module SolidQueue
44
class ReadyExecution < Execution
55
scope :queued_as, ->(queue_name) { where(queue_name: queue_name) }
66

7-
assume_attributes_from_job
7+
assumes_attributes_from_job
88

99
class << self
1010
def claim(queue_list, limit, process_id)

app/models/solid_queue/scheduled_execution.rb

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class ScheduledExecution < Execution
66
scope :ordered, -> { order(scheduled_at: :asc, priority: :asc) }
77
scope :next_batch, ->(batch_size) { due.ordered.limit(batch_size) }
88

9-
assume_attributes_from_job :scheduled_at
9+
assumes_attributes_from_job :scheduled_at
1010

1111
class << self
1212
def dispatch_next_batch(batch_size)
@@ -22,44 +22,12 @@ def dispatch_next_batch(batch_size)
2222
private
2323
def dispatch_batch(job_ids)
2424
jobs = Job.where(id: job_ids)
25-
with_concurrency_limits, without_concurrency_limits = jobs.partition(&:concurrency_limited?)
2625

27-
dispatch_at_once(without_concurrency_limits)
28-
dispatch_one_by_one(with_concurrency_limits)
29-
30-
successfully_dispatched(job_ids).tap do |dispatched_job_ids|
26+
Job.dispatch_all(jobs).map(&:id).tap do |dispatched_job_ids|
3127
where(job_id: dispatched_job_ids).delete_all
3228
SolidQueue.logger.info("[SolidQueue] Dispatched scheduled batch with #{dispatched_job_ids.size} jobs")
3329
end
3430
end
35-
36-
def dispatch_at_once(jobs)
37-
ReadyExecution.insert_all ready_rows_from_batch(jobs)
38-
end
39-
40-
def dispatch_one_by_one(jobs)
41-
jobs.each(&:dispatch)
42-
end
43-
44-
def ready_rows_from_batch(jobs)
45-
prepared_at = Time.current
46-
47-
jobs.map do |job|
48-
{ job_id: job.id, queue_name: job.queue_name, priority: job.priority, created_at: prepared_at }
49-
end
50-
end
51-
52-
def successfully_dispatched(job_ids)
53-
dispatched_and_ready(job_ids) + dispatched_and_blocked(job_ids)
54-
end
55-
56-
def dispatched_and_ready(job_ids)
57-
ReadyExecution.where(job_id: job_ids).pluck(:job_id)
58-
end
59-
60-
def dispatched_and_blocked(job_ids)
61-
BlockedExecution.where(job_id: job_ids).pluck(:job_id)
62-
end
6331
end
6432
end
6533
end

lib/active_job/concurrency_controls.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ def concurrency_key
3636
end
3737
end
3838

39+
def concurrency_limited?
40+
concurrency_key.present?
41+
end
42+
3943
private
4044
def concurrency_group
4145
compute_concurrency_parameter(self.class.concurrency_group)

lib/active_job/queue_adapters/solid_queue_adapter.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ module QueueAdapters
99
# Rails.application.config.active_job.queue_adapter = :solid_queue
1010
class SolidQueueAdapter
1111
def enqueue(active_job) # :nodoc:
12-
SolidQueue::Job.enqueue_active_job(active_job).tap do |job|
13-
active_job.provider_job_id = job.id
14-
end
12+
SolidQueue::Job.enqueue(active_job)
1513
end
1614

1715
def enqueue_at(active_job, timestamp) # :nodoc:
18-
SolidQueue::Job.enqueue_active_job(active_job, scheduled_at: Time.at(timestamp)).tap do |job|
19-
active_job.provider_job_id = job.id
20-
end
16+
SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp))
17+
end
18+
19+
def enqueue_all(active_jobs) # :nodoc:
20+
SolidQueue::Job.enqueue_all(active_jobs)
2121
end
2222
end
2323
end

0 commit comments

Comments
 (0)