Skip to content

Commit c072fa4

Browse files
committed
Pass process.name all the way to claimed executions when claiming jobs
1 parent 5d5f7bc commit c072fa4

File tree

10 files changed

+38
-35
lines changed

10 files changed

+38
-35
lines changed

app/models/solid_queue/claimed_execution.rb

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# frozen_string_literal: true
22

33
class SolidQueue::ClaimedExecution < SolidQueue::Execution
4-
belongs_to :process
4+
belongs_to :process, primary_key: :name, foreign_key: :process_name
55

66
scope :orphaned, -> { where.missing(:process) }
77

@@ -12,12 +12,13 @@ def success?
1212
end
1313

1414
class << self
15-
def claiming(job_ids, process_id, &block)
16-
job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } }
15+
def claiming(job_ids, process, &block)
16+
process_data = { process_id: process.id, process_name: process.name }
17+
job_data = Array(job_ids).collect { |job_id| { job_id: job_id }.merge(process_data) }
1718

18-
SolidQueue.instrument(:claim, process_id: process_id, job_ids: job_ids) do |payload|
19+
SolidQueue.instrument(:claim, job_ids: job_ids, **process_data) do |payload|
1920
insert_all!(job_data)
20-
where(job_id: job_ids, process_id: process_id).load.tap do |claimed|
21+
where(job_id: job_ids, process_id: process.id).load.tap do |claimed|
2122
block.call(claimed)
2223

2324
payload[:size] = claimed.size
@@ -45,6 +46,7 @@ def fail_all_with(error)
4546
end
4647

4748
payload[:process_ids] = executions.map(&:process_id).uniq
49+
payload[:process_names] = executions.map(&:process_name).uniq
4850
payload[:job_ids] = executions.map(&:job_id).uniq
4951
payload[:size] = executions.size
5052
end
@@ -74,7 +76,7 @@ def perform
7476
end
7577

7678
def release
77-
SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id) do
79+
SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id, process_name: process_name) do
7880
transaction do
7981
job.dispatch_bypassing_concurrency_limits
8082
destroy!

app/models/solid_queue/process/executor.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ module Executor
66
extend ActiveSupport::Concern
77

88
included do
9-
has_many :claimed_executions
9+
has_many :claimed_executions, primary_key: :name, foreign_key: :process_name
1010

1111
after_destroy :release_all_claimed_executions
1212
end

app/models/solid_queue/ready_execution.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ class ReadyExecution < Execution
77
assumes_attributes_from_job
88

99
class << self
10-
def claim(queue_list, limit, process_id)
10+
def claim(queue_list, limit, process)
1111
QueueSelector.new(queue_list, self).scoped_relations.flat_map do |queue_relation|
12-
select_and_lock(queue_relation, process_id, limit).tap do |locked|
12+
select_and_lock(queue_relation, process, limit).tap do |locked|
1313
limit -= locked.size
1414
end
1515
end
@@ -20,23 +20,23 @@ def aggregated_count_across(queue_list)
2020
end
2121

2222
private
23-
def select_and_lock(queue_relation, process_id, limit)
23+
def select_and_lock(queue_relation, process, limit)
2424
return [] if limit <= 0
2525

2626
transaction do
2727
candidates = select_candidates(queue_relation, limit)
28-
lock_candidates(candidates, process_id)
28+
lock_candidates(candidates, process)
2929
end
3030
end
3131

3232
def select_candidates(queue_relation, limit)
3333
queue_relation.ordered.limit(limit).non_blocking_lock.select(:id, :job_id)
3434
end
3535

36-
def lock_candidates(executions, process_id)
36+
def lock_candidates(executions, process)
3737
return [] if executions.none?
3838

39-
SolidQueue::ClaimedExecution.claiming(executions.map(&:job_id), process_id) do |claimed|
39+
SolidQueue::ClaimedExecution.claiming(executions.map(&:job_id), process) do |claimed|
4040
ids_to_delete = executions.index_by(&:job_id).values_at(*claimed.map(&:job_id)).map(&:id)
4141
where(id: ids_to_delete).delete_all
4242
end

lib/solid_queue/worker.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def poll
3838

3939
def claim_executions
4040
with_polling_volume do
41-
SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process_id)
41+
SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process)
4242
end
4343
end
4444

test/integration/instrumentation_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ class InstrumentationTest < ActiveSupport::TestCase
150150
3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) }
151151
jobs = SolidQueue::Job.last(3)
152152

153-
SolidQueue::ReadyExecution.claim("*", 5, process.id)
153+
SolidQueue::ReadyExecution.claim("*", 5, process)
154154

155155
events = subscribed(/fail.*_claimed\.solid_queue/) do
156156
SolidQueue::Process.prune

test/models/solid_queue/claimed_execution_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def prepare_and_claim_job(active_job, process: @process)
8787

8888
job.prepare_for_execution
8989
assert_difference -> { SolidQueue::ClaimedExecution.count } => +1 do
90-
SolidQueue::ReadyExecution.claim(job.queue_name, 1, process.id)
90+
SolidQueue::ReadyExecution.claim(job.queue_name, 1, process)
9191
end
9292

9393
SolidQueue::ClaimedExecution.last

test/models/solid_queue/process_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class SolidQueue::ProcessTest < ActiveSupport::TestCase
2020
3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) }
2121
jobs = SolidQueue::Job.last(3)
2222

23-
SolidQueue::ReadyExecution.claim("*", 5, process.id)
23+
SolidQueue::ReadyExecution.claim("*", 5, process)
2424

2525
travel_to 10.minutes.from_now
2626

@@ -40,7 +40,7 @@ class SolidQueue::ProcessTest < ActiveSupport::TestCase
4040
3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) }
4141
jobs = SolidQueue::Job.last(3)
4242

43-
SolidQueue::ReadyExecution.claim("*", 5, process.id)
43+
SolidQueue::ReadyExecution.claim("*", 5, process)
4444

4545
travel_to 10.minutes.from_now
4646

test/models/solid_queue/ready_execution_test.rb

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
66
AddToBufferJob.set(queue: "backend", priority: 5 - i).perform_later(i)
77
end
88

9+
@process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123", metadata: { queue: "background" })
910
@jobs = SolidQueue::Job.where(queue_name: "backend").order(:priority)
1011
end
1112

1213
test "claim all jobs for existing queue" do
1314
assert_claimed_jobs(@jobs.count) do
14-
SolidQueue::ReadyExecution.claim("backend", @jobs.count + 1, 42)
15+
SolidQueue::ReadyExecution.claim("backend", @jobs.count + 1, @process)
1516
end
1617

1718
@jobs.each(&:reload)
@@ -21,13 +22,13 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
2122

2223
test "claim jobs for queue without jobs at the moment" do
2324
assert_no_difference [ -> { SolidQueue::ReadyExecution.count }, -> { SolidQueue::ClaimedExecution.count } ] do
24-
SolidQueue::ReadyExecution.claim("some_non_existing_queue", 10, 42)
25+
SolidQueue::ReadyExecution.claim("some_non_existing_queue", 10, @process)
2526
end
2627
end
2728

2829
test "claim some jobs for existing queue" do
2930
assert_claimed_jobs(2) do
30-
SolidQueue::ReadyExecution.claim("backend", 2, 42)
31+
SolidQueue::ReadyExecution.claim("backend", 2, @process)
3132
end
3233

3334
@jobs.first(2).each do |job|
@@ -45,23 +46,23 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
4546
AddToBufferJob.perform_later("hey")
4647

4748
assert_claimed_jobs(6) do
48-
SolidQueue::ReadyExecution.claim(%w[ backend background ], SolidQueue::Job.count + 1, 42)
49+
SolidQueue::ReadyExecution.claim(%w[ backend background ], SolidQueue::Job.count + 1, @process)
4950
end
5051
end
5152

5253
test "claim jobs using a wildcard" do
5354
AddToBufferJob.perform_later("hey")
5455

5556
assert_claimed_jobs(6) do
56-
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42)
57+
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, @process)
5758
end
5859
end
5960

6061
test "claim jobs using queue prefixes" do
6162
AddToBufferJob.perform_later("hey")
6263

6364
assert_claimed_jobs(1) do
64-
SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, 42)
65+
SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, @process)
6566
end
6667

6768
assert @jobs.none?(&:claimed?)
@@ -73,7 +74,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
7374
SolidQueue::Queue.find_by_name("backend").pause
7475

7576
assert_claimed_jobs(1) do
76-
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42)
77+
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, @process)
7778
end
7879

7980
@jobs.each(&:reload)
@@ -84,15 +85,15 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
8485
AddToBufferJob.perform_later("hey")
8586

8687
assert_claimed_jobs(6) do
87-
SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, 42)
88+
SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, @process)
8889
end
8990
end
9091

9192
test "claim jobs for queue without jobs at the moment using prefixes" do
9293
AddToBufferJob.perform_later("hey")
9394

9495
assert_claimed_jobs(0) do
95-
SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, 42)
96+
SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, @process)
9697
end
9798
end
9899

@@ -101,7 +102,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
101102
job = SolidQueue::Job.last
102103

103104
assert_claimed_jobs(3) do
104-
SolidQueue::ReadyExecution.claim("*", 3, 42)
105+
SolidQueue::ReadyExecution.claim("*", 3, @process)
105106
end
106107

107108
assert job.reload.claimed?
@@ -117,7 +118,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
117118
assert_equal "background", job.queue_name
118119

119120
assert_claimed_jobs(3) do
120-
SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, 42)
121+
SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, @process)
121122
end
122123

123124
assert job.reload.claimed?
@@ -136,7 +137,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
136137
claimed_jobs = []
137138
4.times do
138139
assert_claimed_jobs(2) do
139-
SolidQueue::ReadyExecution.claim(%w[ queue_b* queue_a* ], 2, 42)
140+
SolidQueue::ReadyExecution.claim(%w[ queue_b* queue_a* ], 2, @process)
140141
end
141142

142143
claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job)
@@ -157,7 +158,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
157158
claimed_jobs = []
158159
5.times do
159160
assert_claimed_jobs(2) do
160-
SolidQueue::ReadyExecution.claim(%w[ queue_a2 queue_c1 queue_b* queue_c2 queue_a* ], 2, 42)
161+
SolidQueue::ReadyExecution.claim(%w[ queue_a2 queue_c1 queue_b* queue_c2 queue_a* ], 2, @process)
161162
end
162163

163164
claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job)

test/unit/process_recovery_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class ProcessRecoveryTest < ActiveSupport::TestCase
2828

2929
# Enqueue a job and manually claim it for the worker to avoid timing races
3030
job = enqueue_store_result_job(42)
31-
claimed_execution = SolidQueue::ReadyExecution.claim("*", 5, worker_process.id).first
31+
claimed_execution = SolidQueue::ReadyExecution.claim("*", 5, worker_process).first
3232
assert claimed_execution.present?
3333
assert_equal worker_process.id, claimed_execution.process_id
3434

test/unit/supervisor_test.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ class SupervisorTest < ActiveSupport::TestCase
111111
3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) }
112112
process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123")
113113

114-
SolidQueue::ReadyExecution.claim("*", 5, process.id)
114+
SolidQueue::ReadyExecution.claim("*", 5, process)
115115

116116
assert_equal 3, SolidQueue::ClaimedExecution.count
117117
assert_equal 0, SolidQueue::ReadyExecution.count
@@ -138,7 +138,7 @@ class SupervisorTest < ActiveSupport::TestCase
138138
4.times { |i| ThrottledUpdateResultJob.set(queue: :new_queue).perform_later(result) }
139139
process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123")
140140

141-
SolidQueue::ReadyExecution.claim("*", 5, process.id)
141+
SolidQueue::ReadyExecution.claim("*", 5, process)
142142

143143
assert_equal 3, SolidQueue::ClaimedExecution.count
144144
assert_equal 0, SolidQueue::ReadyExecution.count
@@ -193,7 +193,7 @@ class SupervisorTest < ActiveSupport::TestCase
193193
worker_process = SolidQueue::Process.register(kind: "Worker", pid: 999_999, name: worker_name)
194194

195195
job = StoreResultJob.perform_later(42)
196-
claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process.id).first
196+
claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process).first
197197

198198
terminated_fork = Struct.new(:name).new(worker_name)
199199

0 commit comments

Comments
 (0)