Skip to content

Commit 740c20f

Browse files
committed
Instrument discard and discard_all (in batches and from jobs)
And fix issue when discarding in batches, where the discarded count was always being zero when the foreign key with cascade deletion is present from jobs to ready executions. Discarding all jobs would delete all ready executions in cascade, and then deleting them afterwards, for the case where there aren't any foreign keys, would return 0 jobs discarded.
1 parent 4152fab commit 740c20f

File tree

4 files changed

+84
-18
lines changed

4 files changed

+84
-18
lines changed

app/models/solid_queue/execution.rb

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ class UndiscardableError < StandardError; end
1313
belongs_to :job
1414

1515
class << self
16+
def type
17+
model_name.element.sub("_execution", "").to_sym
18+
end
19+
1620
def create_all_from_jobs(jobs)
1721
insert_all execution_data_from_jobs(jobs)
1822
end
@@ -27,25 +31,32 @@ def discard_all_in_batches(batch_size: 500)
2731
pending = count
2832
discarded = 0
2933

30-
loop do
31-
transaction do
32-
job_ids = limit(batch_size).order(:job_id).lock.pluck(:job_id)
34+
SolidQueue.instrument(:discard_all, batch_size: batch_size, status: type, batches: 0, size: 0) do |payload|
35+
loop do
36+
transaction do
37+
job_ids = limit(batch_size).order(:job_id).lock.pluck(:job_id)
38+
discarded = discard_jobs job_ids
3339

34-
discard_jobs job_ids
35-
discarded = where(job_id: job_ids).delete_all
36-
pending -= discarded
37-
end
40+
where(job_id: job_ids).delete_all
41+
pending -= discarded
42+
43+
payload[:size] += discarded
44+
payload[:batches] += 1
45+
end
3846

39-
break if pending <= 0 || discarded == 0
47+
break if pending <= 0 || discarded == 0
48+
end
4049
end
4150
end
4251

4352
def discard_all_from_jobs(jobs)
44-
transaction do
45-
job_ids = lock_all_from_jobs(jobs)
53+
SolidQueue.instrument(:discard_all, jobs_size: jobs.size, status: type) do |payload|
54+
transaction do
55+
job_ids = lock_all_from_jobs(jobs)
4656

47-
discard_jobs job_ids
48-
where(job_id: job_ids).delete_all
57+
payload[:size] = discard_jobs job_ids
58+
where(job_id: job_ids).delete_all
59+
end
4960
end
5061
end
5162

@@ -59,10 +70,16 @@ def discard_jobs(job_ids)
5970
end
6071
end
6172

73+
def type
74+
self.class.type
75+
end
76+
6277
def discard
63-
with_lock do
64-
job.destroy
65-
destroy
78+
SolidQueue.instrument(:discard, job_id: job_id, status: type) do
79+
with_lock do
80+
job.destroy
81+
destroy
82+
end
6683
end
6784
end
6885
end

app/models/solid_queue/job/executable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def status
9393
if finished?
9494
:finished
9595
elsif execution.present?
96-
execution.model_name.element.sub("_execution", "").to_sym
96+
execution.type
9797
end
9898
end
9999

lib/solid_queue/log_subscriber.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ def retry(event)
2323
debug formatted_event(event, action: "Retry failed job", **event.payload.slice(:job_id))
2424
end
2525

26+
def discard_all(event)
27+
debug formatted_event(event, action: "Discard jobs", **event.payload.slice(:jobs_size, :size, :status))
28+
end
29+
30+
def discard(event)
31+
debug formatted_event(event, action: "Discard job", **event.payload.slice(:job_id, :status))
32+
end
33+
2634
def release_many_blocked(event)
2735
debug formatted_event(event, action: "Unblock jobs", **event.payload.slice(:limit, :size))
2836
end

test/integration/instrumentation_test.rb

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,51 @@ class InstrumentationTest < ActiveSupport::TestCase
115115
assert_event events.second, "retry_all", jobs_size: 4, size: 0
116116
end
117117

118+
test "discarding job emits a discard event" do
119+
AddToBufferJob.perform_later("A")
120+
job = SolidQueue::Job.last
121+
122+
events = subscribed("discard.solid_queue") do
123+
job.discard
124+
end
125+
126+
assert_equal 1, events.size
127+
assert_event events.first, "discard", job_id: job.id, status: :ready
128+
end
129+
130+
test "discarding jobs in bulk emits a discard_all event" do
131+
# 5 ready jobs
132+
5.times { AddToBufferJob.perform_later("A") }
133+
# 1 ready + 3 blocked
134+
result = JobResult.create!
135+
4.times { SequentialUpdateResultJob.perform_later(result, name: "A") }
136+
137+
events = subscribed("discard_all.solid_queue") do
138+
SolidQueue::ReadyExecution.discard_all_from_jobs(SolidQueue::Job.all)
139+
SolidQueue::ReadyExecution.discard_all_from_jobs(SolidQueue::Job.all)
140+
end
141+
142+
assert_equal 2, events.size
143+
assert_event events.first, "discard_all", jobs_size: 9, status: :ready, size: 6
144+
# Only 3 blocked jobs remaining for the second discard_all_from_jobs call
145+
assert_event events.second, "discard_all", jobs_size: 3, status: :ready, size: 0
146+
end
147+
148+
test "discarding jobs in batches emits a discard_all event" do
149+
15.times { AddToBufferJob.perform_later("A") }
150+
151+
events = subscribed("discard_all.solid_queue") do
152+
SolidQueue::ReadyExecution.discard_all_in_batches(batch_size: 6)
153+
end
154+
155+
assert_equal 1, events.size
156+
assert_event events.first, "discard_all", batch_size: 6, status: :ready, batches: 3, size: 15
157+
end
158+
118159
test "unblocking job emits release_blocked event" do
119160
result = JobResult.create!
120161
# 1 ready, 2 blocked
121-
3.times { SequentialUpdateResultJob.perform_later(result, name: name, pause: 0.2.seconds) }
162+
3.times { SequentialUpdateResultJob.perform_later(result, name: "A") }
122163

123164
# Simulate expiry of the concurrency locks
124165
travel_to 3.days.from_now
@@ -140,7 +181,7 @@ class InstrumentationTest < ActiveSupport::TestCase
140181
test "unblocking jobs in bulk emits release_many_blocked event" do
141182
result = JobResult.create!
142183
# 1 ready, 3 blocked
143-
4.times { SequentialUpdateResultJob.perform_later(result, name: name, pause: 0.2.seconds) }
184+
4.times { SequentialUpdateResultJob.perform_later(result, name: "A") }
144185

145186
# Simulate expiry of the concurrency locks
146187
travel_to 3.days.from_now

0 commit comments

Comments
 (0)