Skip to content

Commit 1d41b53

Browse files
authored
Merge pull request #208 from rails/log-and-instrumentation
Rely on `ActiveSupport::Notifications` and `ActiveSupport::LogSubscriber` for logging and instrumentation
2 parents b76f9f4 + b9db22d commit 1d41b53

26 files changed

+707
-124
lines changed

app/models/solid_queue/blocked_execution.rb

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,25 @@ class BlockedExecution < Execution
1010
scope :expired, -> { where(expires_at: ...Time.current) }
1111

1212
class << self
13-
def unblock(count)
14-
expired.distinct.limit(count).pluck(:concurrency_key).then do |concurrency_keys|
15-
release_many releasable(concurrency_keys)
13+
def unblock(limit)
14+
SolidQueue.instrument(:release_many_blocked, limit: limit) do |payload|
15+
expired.distinct.limit(limit).pluck(:concurrency_key).then do |concurrency_keys|
16+
payload[:size] = release_many releasable(concurrency_keys)
17+
end
1618
end
1719
end
1820

1921
def release_many(concurrency_keys)
2022
# We want to release exactly one blocked execution for each concurrency key, and we need to do it
2123
# one by one, locking each record and acquiring the semaphore individually for each of them:
22-
Array(concurrency_keys).each { |concurrency_key| release_one(concurrency_key) }
24+
Array(concurrency_keys).count { |concurrency_key| release_one(concurrency_key) }
2325
end
2426

2527
def release_one(concurrency_key)
2628
transaction do
27-
ordered.where(concurrency_key: concurrency_key).limit(1).non_blocking_lock.each(&:release)
29+
if execution = ordered.where(concurrency_key: concurrency_key).limit(1).non_blocking_lock.first
30+
execution.release
31+
end
2832
end
2933
end
3034

@@ -38,12 +42,14 @@ def releasable(concurrency_keys)
3842
end
3943

4044
def release
41-
transaction do
42-
if acquire_concurrency_lock
43-
promote_to_ready
44-
destroy!
45+
SolidQueue.instrument(:release_blocked, job_id: job.id, concurrency_key: concurrency_key, released: false) do |payload|
46+
transaction do
47+
if acquire_concurrency_lock
48+
promote_to_ready
49+
destroy!
4550

46-
SolidQueue.logger.debug("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}")
51+
payload[:released] = true
52+
end
4753
end
4854
end
4955
end

app/models/solid_queue/claimed_execution.rb

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@ def claiming(job_ids, process_id, &block)
2020
end
2121

2222
def release_all
23-
includes(:job).each(&:release)
23+
SolidQueue.instrument(:release_many_claimed) do |payload|
24+
includes(:job).tap do |executions|
25+
payload[:size] = executions.size
26+
executions.each(&:release)
27+
end
28+
end
2429
end
2530

2631
def discard_all_in_batches(*)
@@ -45,9 +50,11 @@ def perform
4550
end
4651

4752
def release
48-
transaction do
49-
job.dispatch_bypassing_concurrency_limits
50-
destroy!
53+
SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id) do
54+
transaction do
55+
job.dispatch_bypassing_concurrency_limits
56+
destroy!
57+
end
5158
end
5259
end
5360

app/models/solid_queue/execution.rb

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ class UndiscardableError < StandardError; end
1313
belongs_to :job
1414

1515
class << self
16+
def type
17+
model_name.element.sub("_execution", "").to_sym
18+
end
19+
1620
def create_all_from_jobs(jobs)
1721
insert_all execution_data_from_jobs(jobs)
1822
end
@@ -27,25 +31,32 @@ def discard_all_in_batches(batch_size: 500)
2731
pending = count
2832
discarded = 0
2933

30-
loop do
31-
transaction do
32-
job_ids = limit(batch_size).order(:job_id).lock.pluck(:job_id)
34+
SolidQueue.instrument(:discard_all, batch_size: batch_size, status: type, batches: 0, size: 0) do |payload|
35+
loop do
36+
transaction do
37+
job_ids = limit(batch_size).order(:job_id).lock.pluck(:job_id)
38+
discarded = discard_jobs job_ids
3339

34-
discard_jobs job_ids
35-
discarded = where(job_id: job_ids).delete_all
36-
pending -= discarded
37-
end
40+
where(job_id: job_ids).delete_all
41+
pending -= discarded
42+
43+
payload[:size] += discarded
44+
payload[:batches] += 1
45+
end
3846

39-
break if pending <= 0 || discarded == 0
47+
break if pending <= 0 || discarded == 0
48+
end
4049
end
4150
end
4251

4352
def discard_all_from_jobs(jobs)
44-
transaction do
45-
job_ids = lock_all_from_jobs(jobs)
53+
SolidQueue.instrument(:discard_all, jobs_size: jobs.size, status: type) do |payload|
54+
transaction do
55+
job_ids = lock_all_from_jobs(jobs)
4656

47-
discard_jobs job_ids
48-
where(job_id: job_ids).delete_all
57+
payload[:size] = discard_jobs job_ids
58+
where(job_id: job_ids).delete_all
59+
end
4960
end
5061
end
5162

@@ -59,10 +70,16 @@ def discard_jobs(job_ids)
5970
end
6071
end
6172

73+
def type
74+
self.class.type
75+
end
76+
6277
def discard
63-
with_lock do
64-
job.destroy
65-
destroy
78+
SolidQueue.instrument(:discard, job_id: job_id, status: type) do
79+
with_lock do
80+
job.destroy
81+
destroy
82+
end
6683
end
6784
end
6885
end

app/models/solid_queue/execution/dispatching.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ module Dispatching
99
def dispatch_jobs(job_ids)
1010
jobs = Job.where(id: job_ids)
1111

12-
Job.dispatch_all(jobs).map(&:id).tap do |dispatched_job_ids|
13-
where(job_id: dispatched_job_ids).order(:job_id).delete_all
14-
SolidQueue.logger.info("[SolidQueue] Dispatched #{dispatched_job_ids.size} jobs")
12+
Job.dispatch_all(jobs).map(&:id).then do |dispatched_job_ids|
13+
if dispatched_job_ids.none? then 0
14+
else
15+
where(job_id: dispatched_job_ids).order(:job_id).delete_all
16+
end
1517
end
1618
end
1719
end

app/models/solid_queue/failed_execution.rb

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,19 @@ class FailedExecution < Execution
1111
attr_accessor :exception
1212

1313
def self.retry_all(jobs)
14-
transaction do
15-
dispatch_jobs lock_all_from_jobs(jobs)
14+
SolidQueue.instrument(:retry_all, jobs_size: jobs.size) do |payload|
15+
transaction do
16+
payload[:size] = dispatch_jobs lock_all_from_jobs(jobs)
17+
end
1618
end
1719
end
1820

1921
def retry
20-
with_lock do
21-
job.prepare_for_execution
22-
destroy!
22+
SolidQueue.instrument(:retry, job_id: job.id) do
23+
with_lock do
24+
job.prepare_for_execution
25+
destroy!
26+
end
2327
end
2428
end
2529

app/models/solid_queue/job/executable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def status
9393
if finished?
9494
:finished
9595
elsif execution.present?
96-
execution.model_name.element.sub("_execution", "").to_sym
96+
execution.type
9797
end
9898
end
9999

app/models/solid_queue/process.rb

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,24 @@ class SolidQueue::Process < SolidQueue::Record
1212
after_destroy -> { claimed_executions.release_all }
1313

1414
def self.register(**attributes)
15-
create!(attributes.merge(last_heartbeat_at: Time.current))
15+
SolidQueue.instrument :register_process, **attributes do
16+
create!(attributes.merge(last_heartbeat_at: Time.current))
17+
end
18+
rescue Exception => error
19+
SolidQueue.instrument :register_process, **attributes.merge(error: error)
20+
raise
1621
end
1722

1823
def heartbeat
1924
touch(:last_heartbeat_at)
2025
end
2126

22-
def deregister
23-
destroy!
24-
rescue Exception
25-
SolidQueue.logger.error("[SolidQueue] Error deregistering process #{id} - #{metadata}")
26-
raise
27+
def deregister(pruned: false)
28+
SolidQueue.instrument :deregister_process, process: self, pruned: pruned, claimed_size: claimed_executions.size do |payload|
29+
destroy!
30+
rescue Exception => error
31+
payload[:error] = error
32+
raise
33+
end
2734
end
2835
end

app/models/solid_queue/process/prunable.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ module SolidQueue::Process::Prunable
99

1010
class_methods do
1111
def prune
12-
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
13-
batch.each do |process|
14-
SolidQueue.logger.info("[SolidQueue] Pruning dead process #{process.id} - #{process.metadata}")
15-
process.deregister
12+
SolidQueue.instrument :prune_processes, size: 0 do |payload|
13+
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
14+
payload[:size] += batch.size
15+
16+
batch.each { |process| process.deregister(pruned: true) }
1617
end
1718
end
1819
end

app/models/solid_queue/recurring_execution.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ class RecurringExecution < Execution
77
class << self
88
def record(task_key, run_at, &block)
99
transaction do
10-
if job_id = block.call
11-
create!(job_id: job_id, task_key: task_key, run_at: run_at)
10+
block.call.tap do |active_job|
11+
create!(job_id: active_job.provider_job_id, task_key: task_key, run_at: run_at)
1212
end
1313
end
1414
rescue ActiveRecord::RecordNotUnique
15-
SolidQueue.logger.info("[SolidQueue] Skipped recurring task #{task_key} at #{run_at}already dispatched")
15+
# Task already dispatched
1616
end
1717

1818
def clear_in_batches(batch_size: 500)

app/models/solid_queue/scheduled_execution.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ def dispatch_next_batch(batch_size)
1616
job_ids = next_batch(batch_size).non_blocking_lock.pluck(:job_id)
1717
if job_ids.empty? then []
1818
else
19-
dispatch_jobs(job_ids)
19+
SolidQueue.instrument(:dispatch_scheduled, batch_size: batch_size) do |payload|
20+
payload[:size] = dispatch_jobs(job_ids)
21+
end
2022
end
2123
end
2224
end

0 commit comments

Comments
 (0)