Skip to content

Commit 9e894a5

Browse files
committed
Implement basic structure to use Active Support notifications and log subscribers
And instrument a bunch of actions.
1 parent 1540864 commit 9e894a5

File tree

11 files changed

+192
-43
lines changed

11 files changed

+192
-43
lines changed

app/models/solid_queue/blocked_execution.rb

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,18 @@ class BlockedExecution < Execution
1010
scope :expired, -> { where(expires_at: ...Time.current) }
1111

1212
class << self
13-
def unblock(count)
14-
expired.distinct.limit(count).pluck(:concurrency_key).then do |concurrency_keys|
15-
release_many releasable(concurrency_keys)
13+
def unblock(limit)
14+
SolidQueue.instrument(:release_many_blocked, limit: limit) do |payload|
15+
expired.distinct.limit(limit).pluck(:concurrency_key).then do |concurrency_keys|
16+
payload[:size] = release_many releasable(concurrency_keys)
17+
end
1618
end
1719
end
1820

1921
def release_many(concurrency_keys)
2022
# We want to release exactly one blocked execution for each concurrency key, and we need to do it
2123
# one by one, locking each record and acquiring the semaphore individually for each of them:
22-
Array(concurrency_keys).each { |concurrency_key| release_one(concurrency_key) }
24+
Array(concurrency_keys).count { |concurrency_key| release_one(concurrency_key) }
2325
end
2426

2527
def release_one(concurrency_key)
@@ -38,12 +40,14 @@ def releasable(concurrency_keys)
3840
end
3941

4042
def release
41-
transaction do
42-
if acquire_concurrency_lock
43-
promote_to_ready
44-
destroy!
43+
SolidQueue.instrument(:release_blocked, job_id: job.id, concurrency_key: concurrency_key, released: false) do |payload|
44+
transaction do
45+
if acquire_concurrency_lock
46+
promote_to_ready
47+
destroy!
4548

46-
SolidQueue.logger.debug("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}")
49+
payload[:released] = true
50+
end
4751
end
4852
end
4953
end

app/models/solid_queue/claimed_execution.rb

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@ def claiming(job_ids, process_id, &block)
2020
end
2121

2222
def release_all
23-
includes(:job).each(&:release)
23+
SolidQueue.instrument(:release_many_claimed) do |payload|
24+
includes(:job).tap do |executions|
25+
payload[:size] = executions.size
26+
executions.each(&:release)
27+
end
28+
end
2429
end
2530

2631
def discard_all_in_batches(*)
@@ -45,9 +50,11 @@ def perform
4550
end
4651

4752
def release
48-
transaction do
49-
job.dispatch_bypassing_concurrency_limits
50-
destroy!
53+
SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id) do
54+
transaction do
55+
job.dispatch_bypassing_concurrency_limits
56+
destroy!
57+
end
5158
end
5259
end
5360

app/models/solid_queue/execution/dispatching.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ def dispatch_jobs(job_ids)
1111

1212
Job.dispatch_all(jobs).map(&:id).tap do |dispatched_job_ids|
1313
where(job_id: dispatched_job_ids).order(:job_id).delete_all
14-
SolidQueue.logger.info("[SolidQueue] Dispatched #{dispatched_job_ids.size} jobs")
1514
end
1615
end
1716
end

app/models/solid_queue/failed_execution.rb

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,19 @@ class FailedExecution < Execution
1111
attr_accessor :exception
1212

1313
def self.retry_all(jobs)
14-
transaction do
15-
dispatch_jobs lock_all_from_jobs(jobs)
14+
SolidQueue.instrument(:retry_all, jobs_size: jobs.size) do |payload|
15+
transaction do
16+
payload[:size] = dispatch_jobs lock_all_from_jobs(jobs)
17+
end
1618
end
1719
end
1820

1921
def retry
20-
with_lock do
21-
job.prepare_for_execution
22-
destroy!
22+
SolidQueue.instrument(:retry, job_id: job.id) do
23+
with_lock do
24+
job.prepare_for_execution
25+
destroy!
26+
end
2327
end
2428
end
2529

app/models/solid_queue/process.rb

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,24 @@ class SolidQueue::Process < SolidQueue::Record
1212
after_destroy -> { claimed_executions.release_all }
1313

1414
def self.register(**attributes)
15-
create!(attributes.merge(last_heartbeat_at: Time.current))
15+
SolidQueue.instrument :register_process, **attributes do
16+
create!(attributes.merge(last_heartbeat_at: Time.current))
17+
end
18+
rescue Exception => error
19+
SolidQueue.instrument :register_process, **attributes.merge(error: error)
20+
raise
1621
end
1722

1823
def heartbeat
1924
touch(:last_heartbeat_at)
2025
end
2126

22-
def deregister
23-
destroy!
24-
rescue Exception
25-
SolidQueue.logger.error("[SolidQueue] Error deregistering process #{id} - #{metadata}")
27+
def deregister(pruned: false)
28+
SolidQueue.instrument :deregister_process, process: self, pruned: pruned do
29+
destroy!
30+
end
31+
rescue Exception => error
32+
SolidQueue.instrument :deregister_process, process: self, error: error
2633
raise
2734
end
2835
end

app/models/solid_queue/process/prunable.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ module SolidQueue::Process::Prunable
99

1010
class_methods do
1111
def prune
12-
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
13-
batch.each do |process|
14-
SolidQueue.logger.info("[SolidQueue] Pruning dead process #{process.id} - #{process.metadata}")
15-
process.deregister
12+
SolidQueue.instrument :prune_processes, size: 0 do |payload|
13+
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
14+
payload[:size] += batch.size
15+
16+
batch.each { |process| process.deregister(pruned: true) }
1617
end
1718
end
1819
end

app/models/solid_queue/scheduled_execution.rb

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ class ScheduledExecution < Execution
1212

1313
class << self
1414
def dispatch_next_batch(batch_size)
15-
transaction do
16-
job_ids = next_batch(batch_size).non_blocking_lock.pluck(:job_id)
17-
if job_ids.empty? then []
18-
else
19-
dispatch_jobs(job_ids)
15+
SolidQueue.instrument(:dispatch_scheduled, batch_size: batch_size, count: 0) do |payload|
16+
transaction do
17+
job_ids = next_batch(batch_size).non_blocking_lock.pluck(:job_id)
18+
if job_ids.empty? then []
19+
else
20+
payload[:count] = dispatch_jobs(job_ids)
21+
end
2022
end
2123
end
2224
end

lib/solid_queue.rb

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
loader.setup
1919

2020
module SolidQueue
21+
extend self
22+
2123
mattr_accessor :logger, default: ActiveSupport::Logger.new($stdout)
2224
mattr_accessor :app_executor, :on_thread_error, :connects_to
2325

@@ -39,17 +41,19 @@ module SolidQueue
3941
mattr_accessor :clear_finished_jobs_after, default: 1.day
4042
mattr_accessor :default_concurrency_control_period, default: 3.minutes
4143

42-
class << self
43-
def supervisor?
44-
supervisor
45-
end
44+
def supervisor?
45+
supervisor
46+
end
4647

47-
def silence_polling?
48-
silence_polling
49-
end
48+
def silence_polling?
49+
silence_polling
50+
end
51+
52+
def preserve_finished_jobs?
53+
preserve_finished_jobs
54+
end
5055

51-
def preserve_finished_jobs?
52-
preserve_finished_jobs
53-
end
56+
def instrument(channel, **options, &block)
57+
ActiveSupport::Notifications.instrument("#{channel}.solid_queue", **options, &block)
5458
end
5559
end

lib/solid_queue/engine.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ class Engine < ::Rails::Engine
2828
ActiveSupport.on_load(:solid_queue) do
2929
self.logger = app.logger
3030
end
31+
32+
SolidQueue::LogSubscriber.attach_to :solid_queue
3133
end
3234

3335
initializer "solid_queue.active_job.extensions" do

lib/solid_queue/log_subscriber.rb

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# frozen_string_literal: true
2+
3+
require "active_support/log_subscriber"
4+
5+
class SolidQueue::LogSubscriber < ActiveSupport::LogSubscriber
6+
def release_many_blocked(event)
7+
debug formatted_event(event, action: "Unblock jobs", **event.payload.slice(:limit, :size))
8+
end
9+
10+
def release_blocked(event)
11+
debug formatted_event(event, action: "Release blocked job", **event.payload.slice(:job_id, :concurrency_key, :released))
12+
end
13+
14+
def release_many_claimed(event)
15+
debug formatted_event(event, action: "Release claimed jobs", **event.payload.slice(:size))
16+
end
17+
18+
def release_claimed(event)
19+
debug formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id))
20+
end
21+
22+
def dispatch_scheduled(event)
23+
debug formatted_event(event, action: "Dispatch scheduled jobs", **event.payload.slice(:batch_size, :size))
24+
end
25+
26+
def retry_all(event)
27+
debug formatted_event(event, action: "Retry failed jobs", **event.payload.slice(:jobs_size, :size))
28+
end
29+
30+
def retry(event)
31+
debug formatted_event(event, action: "Retry failed job", **event.payload.slice(:job_id))
32+
end
33+
34+
def register_process(event)
35+
attributes = event.payload.slice(:kind, :pid, :hostname)
36+
37+
if error = event.payload[:error]
38+
warn formatted_event(event, action: "Error registering process", **attributes.merge(error: formatted_error(error)))
39+
else
40+
info formatted_event(event, action: "Register process", **attributes)
41+
end
42+
end
43+
44+
def deregister_process(event)
45+
process = event.payload[:process]
46+
47+
attributes = {
48+
process_id: process.id,
49+
pid: process.pid,
50+
kind: process.kind,
51+
hostname: process.hostname,
52+
last_heartbeat_at: process.last_heartbeat_at,
53+
claimed_size: process.claimed_executions.size,
54+
pruned: event.payload
55+
}
56+
57+
if error = event.payload[:error]
58+
warn formatted_event(event, action: "Error deregistering process", **attributes.merge(formatted_error(error)))
59+
else
60+
info formatted_event(event, action: "Deregister process", **attributes)
61+
end
62+
end
63+
64+
def prune_processes(event)
65+
debug formatted_event(event, action: "Prune dead processes", **event.payload.slice(:size))
66+
end
67+
68+
private
69+
def formatted_event(event, action:, **attributes)
70+
"SolidQueue-#{SolidQueue::VERSION} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}"
71+
end
72+
73+
def formatted_attributes(**attributes)
74+
attributes.map { |attr, value| "#{attr}: #{value.inspect}" }.join(", ")
75+
end
76+
77+
def formatted_error(error)
78+
[ error.class, error.message ].compact.join(" ")
79+
end
80+
end

0 commit comments

Comments
 (0)