Skip to content

Commit 03eb148

Browse files
committed
Set provider_job_id in active_jobs passed to enqueue_all
Only for those enqueued successfully, that's it.
1 parent 702bdea commit 03eb148

File tree

4 files changed

+39
-21
lines changed

4 files changed

+39
-21
lines changed

app/models/solid_queue/job.rb

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,23 @@ class Job < Record
1212

1313
class << self
1414
def enqueue_all(active_jobs)
15+
active_jobs_by_job_id = active_jobs.index_by(&:job_id)
16+
1517
transaction do
1618
jobs = create_all_from_active_jobs(active_jobs)
17-
prepare_all_for_execution(jobs)
19+
prepare_all_for_execution(jobs).tap do |enqueued_jobs|
20+
enqueued_jobs.each do |enqueued_job|
21+
active_jobs_by_job_id[enqueued_job.active_job_id].provider_job_id = enqueued_job.id
22+
end
23+
end
1824
end
1925
end
2026

2127
def enqueue(active_job, scheduled_at: Time.current)
2228
active_job.scheduled_at = scheduled_at
2329

24-
create_from_active_job(active_job).tap do |job|
25-
active_job.provider_job_id = job.id
30+
create_from_active_job(active_job).tap do |enqueued_job|
31+
active_job.provider_job_id = enqueued_job.id
2632
end
2733
end
2834

app/models/solid_queue/job/executable.rb

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,30 +23,51 @@ module Executable
2323
class_methods do
2424
def prepare_all_for_execution(jobs)
2525
due, not_yet_due = jobs.partition(&:due?)
26-
27-
dispatch_all(due)
28-
schedule_all(not_yet_due)
26+
dispatch_all(due) + schedule_all(not_yet_due)
2927
end
3028

3129
def dispatch_all(jobs)
3230
with_concurrency_limits, without_concurrency_limits = jobs.partition(&:concurrency_limited?)
3331

3432
dispatch_all_at_once(without_concurrency_limits)
3533
dispatch_all_one_by_one(with_concurrency_limits)
34+
35+
successfully_dispatched(jobs)
3636
end
3737

3838
def schedule_all(jobs)
39-
ScheduledExecution.create_all_from_jobs(jobs)
39+
schedule_all_at_once(jobs)
40+
successfully_scheduled(jobs)
4041
end
4142

4243
private
4344
def dispatch_all_at_once(jobs)
4445
ReadyExecution.create_all_from_jobs jobs
4546
end
4647

48+
def schedule_all_at_once(jobs)
49+
ScheduledExecution.create_all_from_jobs(jobs)
50+
end
51+
4752
def dispatch_all_one_by_one(jobs)
4853
jobs.each(&:dispatch)
4954
end
55+
56+
def successfully_dispatched(jobs)
57+
dispatched_and_ready(jobs) + dispatched_and_blocked(jobs)
58+
end
59+
60+
def dispatched_and_ready(jobs)
61+
where(id: ReadyExecution.where(job_id: jobs.map(&:id)).pluck(:job_id))
62+
end
63+
64+
def dispatched_and_blocked(jobs)
65+
where(id: BlockedExecution.where(job_id: jobs.map(&:id)).pluck(:job_id))
66+
end
67+
68+
def successfully_scheduled(jobs)
69+
where(id: ScheduledExecution.where(job_id: jobs.map(&:id)).pluck(:job_id))
70+
end
5071
end
5172

5273
%w[ ready claimed failed scheduled ].each do |status|

app/models/solid_queue/scheduled_execution.rb

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,12 @@ def dispatch_next_batch(batch_size)
2222
private
2323
def dispatch_batch(job_ids)
2424
jobs = Job.where(id: job_ids)
25-
Job.dispatch_all(jobs)
2625

27-
successfully_dispatched(job_ids).tap do |dispatched_job_ids|
26+
Job.dispatch_all(jobs).map(&:id).tap do |dispatched_job_ids|
2827
where(job_id: dispatched_job_ids).delete_all
2928
SolidQueue.logger.info("[SolidQueue] Dispatched scheduled batch with #{dispatched_job_ids.size} jobs")
3029
end
3130
end
32-
33-
def successfully_dispatched(job_ids)
34-
dispatched_and_ready(job_ids) + dispatched_and_blocked(job_ids)
35-
end
36-
37-
def dispatched_and_ready(job_ids)
38-
ReadyExecution.where(job_id: job_ids).pluck(:job_id)
39-
end
40-
41-
def dispatched_and_blocked(job_ids)
42-
BlockedExecution.where(job_id: job_ids).pluck(:job_id)
43-
end
4431
end
4532
end
4633
end

test/models/solid_queue/job_test.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
3535
end
3636

3737
solid_queue_job = SolidQueue::Job.last
38+
assert_equal solid_queue_job.id, active_job.provider_job_id
3839
assert_equal 8, solid_queue_job.priority
3940
assert_equal "test", solid_queue_job.queue_name
4041
assert_equal "AddToBufferJob", solid_queue_job.class_name
@@ -118,6 +119,9 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
118119
assert_multi(ready: 5, scheduled: 2, blocked: 2) do
119120
ActiveJob.perform_all_later(active_jobs)
120121
end
122+
123+
jobs = SolidQueue::Job.last(9)
124+
assert_equal active_jobs.map(&:provider_job_id).sort, jobs.pluck(:id).sort
121125
end
122126

123127
test "block jobs when concurrency limits are reached" do

0 commit comments

Comments
 (0)