Skip to content

Commit 32403b7

Browse files
committed
Emit claim events when claiming jobs
1 parent f6d980f commit 32403b7

File tree

3 files changed

+33
-5
lines changed

3 files changed

+33
-5
lines changed

app/models/solid_queue/claimed_execution.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,14 @@ class << self
1515
def claiming(job_ids, process_id, &block)
1616
job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } }
1717

18-
insert_all!(job_data)
19-
where(job_id: job_ids, process_id: process_id).load.tap do |claimed|
20-
block.call(claimed)
18+
SolidQueue.instrument(:claim, process_id: process_id, job_ids: job_ids) do |payload|
19+
insert_all!(job_data)
20+
where(job_id: job_ids, process_id: process_id).load.tap do |claimed|
21+
block.call(claimed)
22+
23+
payload[:size] = claimed.size
24+
payload[:claimed_job_ids] = claimed.map(&:job_id)
25+
end
2126
end
2227
end
2328

lib/solid_queue/log_subscriber.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ def dispatch_scheduled(event)
77
debug formatted_event(event, action: "Dispatch scheduled jobs", **event.payload.slice(:batch_size, :size))
88
end
99

10+
def claim(event)
11+
debug formatted_event(event, action: "Claim jobs", **event.payload.slice(:process_id, :job_ids, :claimed_job_ids, :size))
12+
end
13+
1014
def release_many_claimed(event)
1115
debug formatted_event(event, action: "Release claimed jobs", **event.payload.slice(:size))
1216
end
@@ -45,15 +49,15 @@ def enqueue_recurring_task(event)
4549

4650
if event.payload[:other_adapter]
4751
action = attributes[:active_job_id].present? ? "Enqueued recurring task outside Solid Queue" : "Error enqueuing recurring task"
48-
info formatted_event(event, action: action, **attributes)
52+
debug formatted_event(event, action: action, **attributes)
4953
else
5054
action = case
5155
when event.payload[:skipped].present? then "Skipped recurring task – already dispatched"
5256
when attributes[:active_job_id].nil? then "Error enqueuing recurring task"
5357
else "Enqueued recurring task"
5458
end
5559

56-
info formatted_event(event, action: action, **attributes)
60+
debug formatted_event(event, action: action, **attributes)
5761
end
5862
end
5963

test/integration/instrumentation_test.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,25 @@ class InstrumentationTest < ActiveSupport::TestCase
1818
assert_event events.first, "dispatch_scheduled", batch_size: 10, size: 8
1919
end
2020

21+
test "claiming jobs emits claim events" do
22+
3.times { StoreResultJob.perform_later(42) }
23+
process = nil
24+
jobs = SolidQueue::Job.last(3)
25+
26+
events = subscribed("claim.solid_queue") do
27+
worker = SolidQueue::Worker.new.tap(&:start)
28+
29+
wait_while_with_timeout!(3.seconds) { SolidQueue::ReadyExecution.any? }
30+
process = SolidQueue::Process.last
31+
32+
worker.stop
33+
wait_for_registered_processes(0, timeout: 3.second)
34+
end
35+
36+
assert_equal 1, events.size
37+
assert_event events.first, "claim", process_id: process.id, job_ids: jobs.map(&:id), claimed_job_ids: jobs.map(&:id), size: 3
38+
end
39+
2140
test "stopping a worker with claimed executions emits release_claimed events" do
2241
StoreResultJob.perform_later(42, pause: SolidQueue.shutdown_timeout + 100.second)
2342
process = nil

0 commit comments

Comments
 (0)