Skip to content

Commit 5f5a277

Browse files
committed
Add Karafka-style instrumentation/events system
Implement a pub/sub notification system inspired by Karafka's instrumentation architecture. This enables better monitoring, observability, and integration with APM tools. Key components: - Shoryuken::Instrumentation::Notifications - thread-safe pub/sub event bus - Shoryuken::Instrumentation::Event - event wrapper with metadata - Shoryuken::Instrumentation::LoggerListener - default logging subscriber - Shoryuken.monitor - global accessor for the notifications instance Supported events: - app.started, app.stopping, app.stopped, app.quiet (lifecycle) - message.processed, message.failed (message processing) - error.occurred (errors) Usage: ```ruby # Subscribe to specific events Shoryuken.monitor.subscribe('message.processed') do |event| StatsD.timing('shoryuken.process_time', event.duration * 1000) end # Subscribe to all events Shoryuken.monitor.subscribe do |event| logger.info("Event: #{event.name}") end ``` Backward compatibility: - Existing fire_event callbacks continue to work - Shoryuken.on(:startup) etc. continue to work - Legacy events are also published to the new monitor
1 parent bc3e913 commit 5f5a277

File tree

14 files changed

+991
-3
lines changed

14 files changed

+991
-3
lines changed

lib/shoryuken.rb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,32 @@ def self.healthy?
3636
Shoryuken::Runner.instance.healthy?
3737
end
3838

39+
# Returns the global instrumentation monitor.
40+
# Use this to subscribe to Shoryuken lifecycle events.
41+
#
42+
# @return [Shoryuken::Instrumentation::Notifications] the monitor instance
43+
#
44+
# @example Subscribe to message processing events
45+
# Shoryuken.monitor.subscribe('message.processed') do |event|
46+
# StatsD.timing('shoryuken.process_time', event.duration * 1000)
47+
# end
48+
#
49+
# @example Subscribe to all events
50+
# Shoryuken.monitor.subscribe do |event|
51+
# logger.info("Event: #{event.name}")
52+
# end
53+
def self.monitor
54+
@_monitor ||= Instrumentation::Notifications.new
55+
end
56+
57+
# Resets the monitor instance (useful for testing)
58+
#
59+
# @return [void]
60+
# @api private
61+
def self.reset_monitor!
62+
@_monitor = nil
63+
end
64+
3965
def_delegators(
4066
:shoryuken_options,
4167
:active_job?,

lib/shoryuken/instrumentation.rb

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# frozen_string_literal: true
2+
3+
require_relative 'instrumentation/event'
4+
require_relative 'instrumentation/notifications'
5+
require_relative 'instrumentation/logger_listener'
6+
7+
module Shoryuken
8+
# Instrumentation module providing pub/sub event notifications.
9+
# Inspired by Karafka's instrumentation architecture.
10+
#
11+
# @example Subscribing to events
12+
# Shoryuken.monitor.subscribe('message.processed') do |event|
13+
# StatsD.timing('shoryuken.process_time', event.duration * 1000)
14+
# end
15+
#
16+
module Instrumentation
17+
end
18+
end
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# frozen_string_literal: true
2+
3+
module Shoryuken
4+
module Instrumentation
5+
# Represents an instrumentation event with metadata.
6+
# Events are published through the Notifications system and contain
7+
# information about what happened, when, and relevant context.
8+
#
9+
# @example Creating an event
10+
# event = Event.new('message.processed', queue: 'default', duration: 0.5)
11+
# event.name # => 'message.processed'
12+
# event[:queue] # => 'default'
13+
# event.duration # => 0.5
14+
#
15+
class Event
16+
# @return [String] the event name (e.g., 'message.processed')
17+
attr_reader :name
18+
19+
# @return [Hash] the event payload containing contextual data
20+
attr_reader :payload
21+
22+
# @return [Time] when the event was created
23+
attr_reader :time
24+
25+
# Creates a new Event instance
26+
#
27+
# @param name [String] the event name using dot notation (e.g., 'message.processed')
28+
# @param payload [Hash] contextual data for the event
29+
def initialize(name, payload = {})
30+
@name = name
31+
@payload = payload
32+
@time = Time.now
33+
end
34+
35+
# Accesses a value from the payload by key
36+
#
37+
# @param key [Symbol, String] the payload key
38+
# @return [Object, nil] the value or nil if not found
39+
def [](key)
40+
payload[key]
41+
end
42+
43+
# Returns the duration from the payload if present
44+
#
45+
# @return [Float, nil] the duration in seconds or nil
46+
def duration
47+
payload[:duration]
48+
end
49+
50+
# Returns a hash representation of the event
51+
#
52+
# @return [Hash] the event as a hash
53+
def to_h
54+
{
55+
name: name,
56+
payload: payload,
57+
time: time
58+
}
59+
end
60+
end
61+
end
62+
end
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# frozen_string_literal: true
2+
3+
module Shoryuken
4+
module Instrumentation
5+
# Default listener that logs instrumentation events.
6+
# This provides human-readable log output for key Shoryuken events.
7+
#
8+
# @example Subscribing the logger listener
9+
# Shoryuken.monitor.subscribe(&LoggerListener.new.method(:call))
10+
#
11+
class LoggerListener
12+
# Creates a new LoggerListener
13+
#
14+
# @param logger [Logger] the logger to use (defaults to Shoryuken.logger)
15+
def initialize(logger = nil)
16+
@logger = logger
17+
end
18+
19+
# Returns the logger instance
20+
#
21+
# @return [Logger] the logger
22+
def logger
23+
@logger || Shoryuken.logger
24+
end
25+
26+
# Handles an instrumentation event by logging it appropriately
27+
#
28+
# @param event [Event] the event to handle
29+
# @return [void]
30+
def call(event)
31+
case event.name
32+
when 'app.started'
33+
log_app_started(event)
34+
when 'app.stopping'
35+
log_app_stopping(event)
36+
when 'app.stopped'
37+
log_app_stopped(event)
38+
when 'message.processed'
39+
log_message_processed(event)
40+
when 'message.failed'
41+
log_message_failed(event)
42+
when 'error.occurred'
43+
log_error_occurred(event)
44+
when 'queue.polling'
45+
log_queue_polling(event)
46+
end
47+
end
48+
49+
private
50+
51+
def log_app_started(event)
52+
groups = event[:groups] || []
53+
logger.info { "Shoryuken started with #{groups.size} group(s)" }
54+
end
55+
56+
def log_app_stopping(_event)
57+
logger.info { 'Shoryuken shutting down...' }
58+
end
59+
60+
def log_app_stopped(_event)
61+
logger.info { 'Shoryuken stopped' }
62+
end
63+
64+
def log_message_processed(event)
65+
duration_ms = event.duration ? (event.duration * 1000).round(2) : 0
66+
worker = event[:worker] || 'Unknown'
67+
queue = event[:queue] || 'Unknown'
68+
69+
logger.info { "Processed #{worker}/#{queue} in #{duration_ms}ms" }
70+
end
71+
72+
def log_message_failed(event)
73+
worker = event[:worker] || 'Unknown'
74+
queue = event[:queue] || 'Unknown'
75+
error = event[:error]
76+
error_message = error.respond_to?(:message) ? error.message : error.to_s
77+
78+
logger.error { "Failed #{worker}/#{queue}: #{error_message}" }
79+
end
80+
81+
def log_error_occurred(event)
82+
error = event[:error]
83+
error_class = error.respond_to?(:class) ? error.class.name : 'Unknown'
84+
error_message = error.respond_to?(:message) ? error.message : error.to_s
85+
86+
logger.error { "Error occurred: #{error_class} - #{error_message}" }
87+
end
88+
89+
def log_queue_polling(event)
90+
queue = event[:queue] || 'Unknown'
91+
logger.debug { "Polling queue: #{queue}" }
92+
end
93+
end
94+
end
95+
end
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# frozen_string_literal: true
2+
3+
module Shoryuken
4+
module Instrumentation
5+
# A thread-safe pub/sub notification system for instrumentation events.
6+
# Inspired by Karafka's instrumentation architecture, this allows external
7+
# systems (APM, logging, metrics) to subscribe to Shoryuken lifecycle events.
8+
#
9+
# @example Subscribing to specific events
10+
# Shoryuken.monitor.subscribe('message.processed') do |event|
11+
# StatsD.timing('shoryuken.process_time', event.duration * 1000)
12+
# end
13+
#
14+
# @example Subscribing to all events
15+
# Shoryuken.monitor.subscribe do |event|
16+
# logger.info("Event: #{event.name}")
17+
# end
18+
#
19+
# @example Instrumenting a block
20+
# Shoryuken.monitor.instrument('message.processed', queue: 'default') do
21+
# process_message
22+
# end
23+
#
24+
class Notifications
25+
# List of all supported events in the system
26+
EVENTS = %w[
27+
app.started
28+
app.stopping
29+
app.stopped
30+
app.quiet
31+
32+
message.received
33+
message.processed
34+
message.failed
35+
message.deleted
36+
37+
worker.started
38+
worker.completed
39+
worker.failed
40+
41+
queue.polling
42+
queue.empty
43+
44+
error.occurred
45+
].freeze
46+
47+
# Creates a new Notifications instance
48+
def initialize
49+
@subscribers = Hash.new { |h, k| h[k] = [] }
50+
@mutex = Mutex.new
51+
end
52+
53+
# Subscribes to events
54+
#
55+
# @param event_name [String, nil] the event name to subscribe to, or nil for all events
56+
# @yield [Event] block called when matching events are published
57+
# @return [void]
58+
#
59+
# @example Subscribe to specific event
60+
# subscribe('message.processed') { |event| puts event.name }
61+
#
62+
# @example Subscribe to all events
63+
# subscribe { |event| puts event.name }
64+
def subscribe(event_name = nil, &block)
65+
@mutex.synchronize do
66+
if event_name
67+
@subscribers[event_name] << block
68+
else
69+
@subscribers[:all] << block
70+
end
71+
end
72+
end
73+
74+
# Unsubscribes a block from events
75+
#
76+
# @param event_name [String, nil] the event name to unsubscribe from, or nil for all events
77+
# @param block [Proc] the block to unsubscribe
78+
# @return [void]
79+
def unsubscribe(event_name = nil, &block)
80+
@mutex.synchronize do
81+
key = event_name || :all
82+
@subscribers[key].delete(block)
83+
end
84+
end
85+
86+
# Instruments a block of code, measuring its duration and publishing an event
87+
#
88+
# @param event_name [String] the event name to publish
89+
# @param payload [Hash] additional data to include in the event
90+
# @yield the code block to instrument
91+
# @return [Object] the result of the block
92+
#
93+
# @example
94+
# monitor.instrument('message.processed', queue: 'default') do
95+
# worker.perform(message)
96+
# end
97+
def instrument(event_name, payload = {})
98+
started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
99+
result = yield if block_given?
100+
duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at
101+
102+
event = Event.new(event_name, payload.merge(duration: duration))
103+
publish(event)
104+
result
105+
end
106+
107+
# Publishes an event to all matching subscribers
108+
#
109+
# @param event_or_name [Event, String] an Event instance or event name
110+
# @param payload [Hash] payload hash (only used if first arg is a String)
111+
# @return [void]
112+
#
113+
# @example With Event instance
114+
# publish(Event.new('message.processed', queue: 'default'))
115+
#
116+
# @example With name and payload
117+
# publish('message.processed', queue: 'default')
118+
def publish(event_or_name, payload = {})
119+
event = event_or_name.is_a?(Event) ? event_or_name : Event.new(event_or_name, payload)
120+
121+
subscribers_for_event = @mutex.synchronize do
122+
@subscribers[event.name] + @subscribers[:all]
123+
end
124+
125+
subscribers_for_event.each do |subscriber|
126+
subscriber.call(event)
127+
rescue StandardError => e
128+
# Log but don't raise - subscribers should not break the main flow
129+
Shoryuken.logger.error { "Instrumentation subscriber error: #{e.message}" }
130+
Shoryuken.logger.error { e.backtrace.join("\n") } if e.backtrace
131+
end
132+
end
133+
134+
# Clears all subscribers (useful for testing)
135+
#
136+
# @return [void]
137+
def clear
138+
@mutex.synchronize do
139+
@subscribers.clear
140+
end
141+
end
142+
143+
# Returns the number of subscribers for an event
144+
#
145+
# @param event_name [String, nil] the event name, or nil for global subscribers
146+
# @return [Integer] the subscriber count
147+
def subscriber_count(event_name = nil)
148+
@mutex.synchronize do
149+
key = event_name || :all
150+
@subscribers[key].size
151+
end
152+
end
153+
end
154+
end
155+
end

0 commit comments

Comments
 (0)