Skip to content

Commit e855a33

Browse files
committed
Write tests for instrumentation and fix a few bugs found along the way
Fix bug found for async execution mode when worker didn't correctly finish shutting down. Also, always instrument retry_all and release_many_blocked even if none is retried/released, so we have that information as well, with the number of jobs that actually were acted on.
1 parent 9e894a5 commit e855a33

File tree

6 files changed

+195
-22
lines changed

6 files changed

+195
-22
lines changed

app/models/solid_queue/execution/dispatching.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ module Dispatching
99
def dispatch_jobs(job_ids)
1010
jobs = Job.where(id: job_ids)
1111

12-
Job.dispatch_all(jobs).map(&:id).tap do |dispatched_job_ids|
13-
where(job_id: dispatched_job_ids).order(:job_id).delete_all
12+
Job.dispatch_all(jobs).map(&:id).then do |dispatched_job_ids|
13+
if dispatched_job_ids.none? then 0
14+
else
15+
where(job_id: dispatched_job_ids).order(:job_id).delete_all
16+
end
1417
end
1518
end
1619
end

app/models/solid_queue/scheduled_execution.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ class ScheduledExecution < Execution
1212

1313
class << self
1414
def dispatch_next_batch(batch_size)
15-
SolidQueue.instrument(:dispatch_scheduled, batch_size: batch_size, count: 0) do |payload|
16-
transaction do
17-
job_ids = next_batch(batch_size).non_blocking_lock.pluck(:job_id)
18-
if job_ids.empty? then []
19-
else
20-
payload[:count] = dispatch_jobs(job_ids)
15+
transaction do
16+
job_ids = next_batch(batch_size).non_blocking_lock.pluck(:job_id)
17+
if job_ids.empty? then []
18+
else
19+
SolidQueue.instrument(:dispatch_scheduled, batch_size: batch_size) do |payload|
20+
payload[:size] = dispatch_jobs(job_ids)
2121
end
2222
end
2323
end

lib/solid_queue/log_subscriber.rb

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,8 @@
33
require "active_support/log_subscriber"
44

55
class SolidQueue::LogSubscriber < ActiveSupport::LogSubscriber
6-
def release_many_blocked(event)
7-
debug formatted_event(event, action: "Unblock jobs", **event.payload.slice(:limit, :size))
8-
end
9-
10-
def release_blocked(event)
11-
debug formatted_event(event, action: "Release blocked job", **event.payload.slice(:job_id, :concurrency_key, :released))
6+
def dispatch_scheduled(event)
7+
debug formatted_event(event, action: "Dispatch scheduled jobs", **event.payload.slice(:batch_size, :size))
128
end
139

1410
def release_many_claimed(event)
@@ -19,10 +15,6 @@ def release_claimed(event)
1915
debug formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id))
2016
end
2117

22-
def dispatch_scheduled(event)
23-
debug formatted_event(event, action: "Dispatch scheduled jobs", **event.payload.slice(:batch_size, :size))
24-
end
25-
2618
def retry_all(event)
2719
debug formatted_event(event, action: "Retry failed jobs", **event.payload.slice(:jobs_size, :size))
2820
end
@@ -31,6 +23,14 @@ def retry(event)
3123
debug formatted_event(event, action: "Retry failed job", **event.payload.slice(:job_id))
3224
end
3325

26+
def release_many_blocked(event)
27+
debug formatted_event(event, action: "Unblock jobs", **event.payload.slice(:limit, :size))
28+
end
29+
30+
def release_blocked(event)
31+
debug formatted_event(event, action: "Release blocked job", **event.payload.slice(:job_id, :concurrency_key, :released))
32+
end
33+
3434
def register_process(event)
3535
attributes = event.payload.slice(:kind, :pid, :hostname)
3636

lib/solid_queue/worker.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ def claim_executions
3232
end
3333

3434
def shutdown
35-
super
36-
3735
pool.shutdown
3836
pool.wait_for_termination(SolidQueue.shutdown_timeout)
37+
38+
super
3939
end
4040

4141
def all_work_completed?
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
5+
class InstrumentationTest < ActiveSupport::TestCase
6+
test "dispatcher polling emits dispatch_scheduled event" do
7+
8.times { AddToBufferJob.set(wait: 1.day).perform_later("I'm scheduled") }
8+
9+
events = subscribed("dispatch_scheduled.solid_queue") do
10+
travel_to 2.days.from_now
11+
dispatcher = SolidQueue::Dispatcher.new(polling_interval: 0.1, batch_size: 10).tap(&:start)
12+
13+
wait_while_with_timeout(0.5.seconds) { SolidQueue::ScheduledExecution.any? }
14+
dispatcher.stop
15+
end
16+
17+
assert_equal 1, events.size
18+
assert_event events.first, "dispatch_scheduled", batch_size: 10, size: 8
19+
end
20+
21+
test "stopping a worker with claimed executions emits release_claimed events" do
22+
StoreResultJob.perform_later(42, pause: SolidQueue.shutdown_timeout + 10.second)
23+
process = nil
24+
25+
events = subscribed(/release.*_claimed\.solid_queue/) do
26+
worker = SolidQueue::Worker.new.tap(&:start)
27+
28+
wait_while_with_timeout(0.5.seconds) { SolidQueue::ReadyExecution.any? }
29+
process = SolidQueue::Process.last
30+
31+
worker.stop
32+
end
33+
34+
assert_equal 2, events.size
35+
release_one_event, release_many_event = events
36+
assert_event release_one_event, "release_claimed", job_id: SolidQueue::Job.last.id, process_id: process.id
37+
assert_event release_many_event, "release_many_claimed", size: 1
38+
end
39+
40+
test "starting and stopping a worker emits register_process and deregister_process events" do
41+
process = nil
42+
43+
events = subscribed(/(register|deregister)_process\.solid_queue/) do
44+
worker = SolidQueue::Worker.new.tap(&:start)
45+
wait_for_registered_processes(1, timeout: 1.second)
46+
47+
process = SolidQueue::Process.last
48+
49+
worker.stop
50+
wait_for_registered_processes(0, timeout: 1.second)
51+
end
52+
53+
assert_equal 2, events.size
54+
register_event, deregister_event = events
55+
56+
assert_event register_event, "register_process", kind: "Worker", pid: ::Process.pid
57+
assert_event deregister_event, "deregister_process", process: process, pruned: false
58+
end
59+
60+
test "pruning processes emit prune_processes and deregister_process events" do
61+
time = Time.now
62+
processes = 3.times.collect { |i| SolidQueue::Process.create!(kind: "Worker", supervisor_id: 42, pid: 10 + i, hostname: "localhost", last_heartbeat_at: time) }
63+
64+
# Heartbeats will expire
65+
travel_to 3.days.from_now
66+
67+
events = subscribed(/.*process.*\.solid_queue/) do
68+
SolidQueue::Process.prune
69+
end
70+
71+
# 1 prune event + 3 deregister events
72+
assert_equal 4, events.count
73+
deregister_events = events.first(3)
74+
prune_event = events.last
75+
76+
assert_event prune_event, "prune_processes", size: 3
77+
deregister_events.each_with_index do |event, i|
78+
assert_event event, "deregister_process", process: processes[i], pruned: true
79+
end
80+
end
81+
82+
test "retrying failed job emits retry event" do
83+
RaisingJob.perform_later(RuntimeError, "A")
84+
job = SolidQueue::Job.last
85+
86+
worker = SolidQueue::Worker.new.tap(&:start)
87+
wait_for_jobs_to_finish_for(3.seconds)
88+
worker.stop
89+
90+
events = subscribed("retry.solid_queue") do
91+
job.reload.retry
92+
end
93+
94+
assert_equal 1, events.size
95+
assert_event events.first, "retry", job_id: job.id
96+
end
97+
98+
test "retrying failed jobs in bulk emits retry_all" do
99+
3.times { RaisingJob.perform_later(RuntimeError, "A") }
100+
AddToBufferJob.perform_later("A")
101+
102+
jobs = SolidQueue::Job.last(4)
103+
104+
worker = SolidQueue::Worker.new.tap(&:start)
105+
wait_for_jobs_to_finish_for(3.seconds)
106+
worker.stop
107+
108+
events = subscribed("retry_all.solid_queue") do
109+
SolidQueue::FailedExecution.retry_all(jobs)
110+
SolidQueue::FailedExecution.retry_all(jobs)
111+
end
112+
113+
assert_equal 2, events.size
114+
assert_event events.first, "retry_all", jobs_size: 4, size: 3
115+
assert_event events.second, "retry_all", jobs_size: 4, size: 0
116+
end
117+
118+
test "unblocking job emits release_blocked event" do
119+
result = JobResult.create!
120+
# 1 ready, 2 blocked
121+
3.times { SequentialUpdateResultJob.perform_later(result, name: name, pause: 0.2.seconds) }
122+
123+
# Simulate expiry of the concurrency locks
124+
travel_to 3.days.from_now
125+
SolidQueue::Semaphore.expired.delete_all
126+
127+
blocked_jobs = SolidQueue::BlockedExecution.last(2).map(&:job)
128+
concurrency_key = blocked_jobs.first.concurrency_key
129+
130+
events = subscribed("release_blocked.solid_queue") do
131+
SolidQueue::BlockedExecution.release_one(concurrency_key)
132+
SolidQueue::BlockedExecution.release_one(concurrency_key)
133+
end
134+
135+
assert_equal 2, events.size
136+
assert_event events.first, "release_blocked", job_id: blocked_jobs.first.id, concurrency_key: concurrency_key, released: true
137+
assert_event events.second, "release_blocked", job_id: blocked_jobs.second.id, concurrency_key: concurrency_key, released: false
138+
end
139+
140+
test "unblocking jobs in bulk emits release_many_blocked event" do
141+
result = JobResult.create!
142+
# 1 ready, 3 blocked
143+
4.times { SequentialUpdateResultJob.perform_later(result, name: name, pause: 0.2.seconds) }
144+
145+
# Simulate expiry of the concurrency locks
146+
travel_to 3.days.from_now
147+
SolidQueue::Semaphore.expired.delete_all
148+
149+
events = subscribed("release_many_blocked.solid_queue") do
150+
SolidQueue::BlockedExecution.unblock(5)
151+
SolidQueue::BlockedExecution.unblock(5)
152+
end
153+
154+
assert_equal 2, events.size
155+
assert_event events.first, "release_many_blocked", limit: 5, size: 1
156+
assert_event events.second, "release_many_blocked", limit: 5, size: 0
157+
end
158+
159+
private
160+
def subscribed(name, &block)
161+
[].tap do |events|
162+
ActiveSupport::Notifications.subscribed(->(*args) { events << args }, name, &block)
163+
end
164+
end
165+
166+
def assert_event(event, action, **attributes)
167+
assert_equal "#{action}.solid_queue", event.first
168+
assert_equal attributes, event.last.slice(*attributes.keys)
169+
end
170+
end

test/unit/log_subscriber_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ class LogSubscriberTest < ActiveSupport::TestCase
1717

1818
test "unblock many jobs" do
1919
attach_log_subscriber
20-
instrument "unblock_batch.solid_queue", batch_size: 42
20+
instrument "release_many_blocked.solid_queue", limit: 42, size: 10
2121

22-
assert_match_logged :debug, "Unblock jobs", "batch_size: 42"
22+
assert_match_logged :debug, "Unblock jobs", "limit: 42, size: 10"
2323
end
2424

2525
private

0 commit comments

Comments
 (0)