From 5f5a277136e3a3033cf5c57898f9e932d8383e26 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Thu, 11 Dec 2025 15:49:19 +0100 Subject: [PATCH 1/5] Add Karafka-style instrumentation/events system Implement a pub/sub notification system inspired by Karafka's instrumentation architecture. This enables better monitoring, observability, and integration with APM tools. Key components: - Shoryuken::Instrumentation::Notifications - thread-safe pub/sub event bus - Shoryuken::Instrumentation::Event - event wrapper with metadata - Shoryuken::Instrumentation::LoggerListener - default logging subscriber - Shoryuken.monitor - global accessor for the notifications instance Supported events: - app.started, app.stopping, app.stopped, app.quiet (lifecycle) - message.processed, message.failed (message processing) - error.occurred (errors) Usage: ```ruby # Subscribe to specific events Shoryuken.monitor.subscribe('message.processed') do |event| StatsD.timing('shoryuken.process_time', event.duration * 1000) end # Subscribe to all events Shoryuken.monitor.subscribe do |event| logger.info("Event: #{event.name}") end ``` Backward compatibility: - Existing fire_event callbacks continue to work - Shoryuken.on(:startup) etc. continue to work - Legacy events are also published to the new monitor --- lib/shoryuken.rb | 26 +++ lib/shoryuken/instrumentation.rb | 18 ++ lib/shoryuken/instrumentation/event.rb | 62 +++++ .../instrumentation/logger_listener.rb | 95 ++++++++ .../instrumentation/notifications.rb | 155 +++++++++++++ lib/shoryuken/processor.rb | 18 +- lib/shoryuken/util.rb | 17 +- .../instrumentation/instrumentation_spec.rb | 59 +++++ .../shoryuken/instrumentation/event_spec.rb | 69 ++++++ .../instrumentation/logger_listener_spec.rb | 130 +++++++++++ .../instrumentation/notifications_spec.rb | 216 ++++++++++++++++++ spec/lib/shoryuken/monitor_spec.rb | 49 ++++ spec/lib/shoryuken/processor_spec.rb | 40 ++++ spec/lib/shoryuken/util_spec.rb | 40 ++++ 14 files changed, 991 insertions(+), 3 deletions(-) create mode 100644 lib/shoryuken/instrumentation.rb create mode 100644 lib/shoryuken/instrumentation/event.rb create mode 100644 lib/shoryuken/instrumentation/logger_listener.rb create mode 100644 lib/shoryuken/instrumentation/notifications.rb create mode 100644 spec/integration/instrumentation/instrumentation_spec.rb create mode 100644 spec/lib/shoryuken/instrumentation/event_spec.rb create mode 100644 spec/lib/shoryuken/instrumentation/logger_listener_spec.rb create mode 100644 spec/lib/shoryuken/instrumentation/notifications_spec.rb create mode 100644 spec/lib/shoryuken/monitor_spec.rb diff --git a/lib/shoryuken.rb b/lib/shoryuken.rb index 02516a9e..43881a50 100644 --- a/lib/shoryuken.rb +++ b/lib/shoryuken.rb @@ -36,6 +36,32 @@ def self.healthy? Shoryuken::Runner.instance.healthy? end + # Returns the global instrumentation monitor. + # Use this to subscribe to Shoryuken lifecycle events. + # + # @return [Shoryuken::Instrumentation::Notifications] the monitor instance + # + # @example Subscribe to message processing events + # Shoryuken.monitor.subscribe('message.processed') do |event| + # StatsD.timing('shoryuken.process_time', event.duration * 1000) + # end + # + # @example Subscribe to all events + # Shoryuken.monitor.subscribe do |event| + # logger.info("Event: #{event.name}") + # end + def self.monitor + @_monitor ||= Instrumentation::Notifications.new + end + + # Resets the monitor instance (useful for testing) + # + # @return [void] + # @api private + def self.reset_monitor! + @_monitor = nil + end + def_delegators( :shoryuken_options, :active_job?, diff --git a/lib/shoryuken/instrumentation.rb b/lib/shoryuken/instrumentation.rb new file mode 100644 index 00000000..8450e347 --- /dev/null +++ b/lib/shoryuken/instrumentation.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +require_relative 'instrumentation/event' +require_relative 'instrumentation/notifications' +require_relative 'instrumentation/logger_listener' + +module Shoryuken + # Instrumentation module providing pub/sub event notifications. + # Inspired by Karafka's instrumentation architecture. + # + # @example Subscribing to events + # Shoryuken.monitor.subscribe('message.processed') do |event| + # StatsD.timing('shoryuken.process_time', event.duration * 1000) + # end + # + module Instrumentation + end +end diff --git a/lib/shoryuken/instrumentation/event.rb b/lib/shoryuken/instrumentation/event.rb new file mode 100644 index 00000000..b51a8f29 --- /dev/null +++ b/lib/shoryuken/instrumentation/event.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +module Shoryuken + module Instrumentation + # Represents an instrumentation event with metadata. + # Events are published through the Notifications system and contain + # information about what happened, when, and relevant context. + # + # @example Creating an event + # event = Event.new('message.processed', queue: 'default', duration: 0.5) + # event.name # => 'message.processed' + # event[:queue] # => 'default' + # event.duration # => 0.5 + # + class Event + # @return [String] the event name (e.g., 'message.processed') + attr_reader :name + + # @return [Hash] the event payload containing contextual data + attr_reader :payload + + # @return [Time] when the event was created + attr_reader :time + + # Creates a new Event instance + # + # @param name [String] the event name using dot notation (e.g., 'message.processed') + # @param payload [Hash] contextual data for the event + def initialize(name, payload = {}) + @name = name + @payload = payload + @time = Time.now + end + + # Accesses a value from the payload by key + # + # @param key [Symbol, String] the payload key + # @return [Object, nil] the value or nil if not found + def [](key) + payload[key] + end + + # Returns the duration from the payload if present + # + # @return [Float, nil] the duration in seconds or nil + def duration + payload[:duration] + end + + # Returns a hash representation of the event + # + # @return [Hash] the event as a hash + def to_h + { + name: name, + payload: payload, + time: time + } + end + end + end +end diff --git a/lib/shoryuken/instrumentation/logger_listener.rb b/lib/shoryuken/instrumentation/logger_listener.rb new file mode 100644 index 00000000..1bd2580b --- /dev/null +++ b/lib/shoryuken/instrumentation/logger_listener.rb @@ -0,0 +1,95 @@ +# frozen_string_literal: true + +module Shoryuken + module Instrumentation + # Default listener that logs instrumentation events. + # This provides human-readable log output for key Shoryuken events. + # + # @example Subscribing the logger listener + # Shoryuken.monitor.subscribe(&LoggerListener.new.method(:call)) + # + class LoggerListener + # Creates a new LoggerListener + # + # @param logger [Logger] the logger to use (defaults to Shoryuken.logger) + def initialize(logger = nil) + @logger = logger + end + + # Returns the logger instance + # + # @return [Logger] the logger + def logger + @logger || Shoryuken.logger + end + + # Handles an instrumentation event by logging it appropriately + # + # @param event [Event] the event to handle + # @return [void] + def call(event) + case event.name + when 'app.started' + log_app_started(event) + when 'app.stopping' + log_app_stopping(event) + when 'app.stopped' + log_app_stopped(event) + when 'message.processed' + log_message_processed(event) + when 'message.failed' + log_message_failed(event) + when 'error.occurred' + log_error_occurred(event) + when 'queue.polling' + log_queue_polling(event) + end + end + + private + + def log_app_started(event) + groups = event[:groups] || [] + logger.info { "Shoryuken started with #{groups.size} group(s)" } + end + + def log_app_stopping(_event) + logger.info { 'Shoryuken shutting down...' } + end + + def log_app_stopped(_event) + logger.info { 'Shoryuken stopped' } + end + + def log_message_processed(event) + duration_ms = event.duration ? (event.duration * 1000).round(2) : 0 + worker = event[:worker] || 'Unknown' + queue = event[:queue] || 'Unknown' + + logger.info { "Processed #{worker}/#{queue} in #{duration_ms}ms" } + end + + def log_message_failed(event) + worker = event[:worker] || 'Unknown' + queue = event[:queue] || 'Unknown' + error = event[:error] + error_message = error.respond_to?(:message) ? error.message : error.to_s + + logger.error { "Failed #{worker}/#{queue}: #{error_message}" } + end + + def log_error_occurred(event) + error = event[:error] + error_class = error.respond_to?(:class) ? error.class.name : 'Unknown' + error_message = error.respond_to?(:message) ? error.message : error.to_s + + logger.error { "Error occurred: #{error_class} - #{error_message}" } + end + + def log_queue_polling(event) + queue = event[:queue] || 'Unknown' + logger.debug { "Polling queue: #{queue}" } + end + end + end +end diff --git a/lib/shoryuken/instrumentation/notifications.rb b/lib/shoryuken/instrumentation/notifications.rb new file mode 100644 index 00000000..63141e6e --- /dev/null +++ b/lib/shoryuken/instrumentation/notifications.rb @@ -0,0 +1,155 @@ +# frozen_string_literal: true + +module Shoryuken + module Instrumentation + # A thread-safe pub/sub notification system for instrumentation events. + # Inspired by Karafka's instrumentation architecture, this allows external + # systems (APM, logging, metrics) to subscribe to Shoryuken lifecycle events. + # + # @example Subscribing to specific events + # Shoryuken.monitor.subscribe('message.processed') do |event| + # StatsD.timing('shoryuken.process_time', event.duration * 1000) + # end + # + # @example Subscribing to all events + # Shoryuken.monitor.subscribe do |event| + # logger.info("Event: #{event.name}") + # end + # + # @example Instrumenting a block + # Shoryuken.monitor.instrument('message.processed', queue: 'default') do + # process_message + # end + # + class Notifications + # List of all supported events in the system + EVENTS = %w[ + app.started + app.stopping + app.stopped + app.quiet + + message.received + message.processed + message.failed + message.deleted + + worker.started + worker.completed + worker.failed + + queue.polling + queue.empty + + error.occurred + ].freeze + + # Creates a new Notifications instance + def initialize + @subscribers = Hash.new { |h, k| h[k] = [] } + @mutex = Mutex.new + end + + # Subscribes to events + # + # @param event_name [String, nil] the event name to subscribe to, or nil for all events + # @yield [Event] block called when matching events are published + # @return [void] + # + # @example Subscribe to specific event + # subscribe('message.processed') { |event| puts event.name } + # + # @example Subscribe to all events + # subscribe { |event| puts event.name } + def subscribe(event_name = nil, &block) + @mutex.synchronize do + if event_name + @subscribers[event_name] << block + else + @subscribers[:all] << block + end + end + end + + # Unsubscribes a block from events + # + # @param event_name [String, nil] the event name to unsubscribe from, or nil for all events + # @param block [Proc] the block to unsubscribe + # @return [void] + def unsubscribe(event_name = nil, &block) + @mutex.synchronize do + key = event_name || :all + @subscribers[key].delete(block) + end + end + + # Instruments a block of code, measuring its duration and publishing an event + # + # @param event_name [String] the event name to publish + # @param payload [Hash] additional data to include in the event + # @yield the code block to instrument + # @return [Object] the result of the block + # + # @example + # monitor.instrument('message.processed', queue: 'default') do + # worker.perform(message) + # end + def instrument(event_name, payload = {}) + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + result = yield if block_given? + duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + + event = Event.new(event_name, payload.merge(duration: duration)) + publish(event) + result + end + + # Publishes an event to all matching subscribers + # + # @param event_or_name [Event, String] an Event instance or event name + # @param payload [Hash] payload hash (only used if first arg is a String) + # @return [void] + # + # @example With Event instance + # publish(Event.new('message.processed', queue: 'default')) + # + # @example With name and payload + # publish('message.processed', queue: 'default') + def publish(event_or_name, payload = {}) + event = event_or_name.is_a?(Event) ? event_or_name : Event.new(event_or_name, payload) + + subscribers_for_event = @mutex.synchronize do + @subscribers[event.name] + @subscribers[:all] + end + + subscribers_for_event.each do |subscriber| + subscriber.call(event) + rescue StandardError => e + # Log but don't raise - subscribers should not break the main flow + Shoryuken.logger.error { "Instrumentation subscriber error: #{e.message}" } + Shoryuken.logger.error { e.backtrace.join("\n") } if e.backtrace + end + end + + # Clears all subscribers (useful for testing) + # + # @return [void] + def clear + @mutex.synchronize do + @subscribers.clear + end + end + + # Returns the number of subscribers for an event + # + # @param event_name [String, nil] the event name, or nil for global subscribers + # @return [Integer] the subscriber count + def subscriber_count(event_name = nil) + @mutex.synchronize do + key = event_name || :all + @subscribers[key].size + end + end + end + end +end diff --git a/lib/shoryuken/processor.rb b/lib/shoryuken/processor.rb index ca3f0878..644ca0d6 100644 --- a/lib/shoryuken/processor.rb +++ b/lib/shoryuken/processor.rb @@ -38,8 +38,10 @@ def process return logger.error { "No worker found for #{queue}" } unless worker Shoryuken::Logging.with_context("#{worker_name(worker.class, sqs_msg, body)}/#{queue}/#{sqs_msg.message_id}") do - worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do - worker.perform(sqs_msg, body) + Shoryuken.monitor.instrument('message.processed', message_payload) do + worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do + worker.perform(sqs_msg, body) + end end end end @@ -52,6 +54,7 @@ def process worker_perform.call end rescue Exception => e + Shoryuken.monitor.publish('message.failed', message_payload.merge(error: e)) Array(Shoryuken.exception_handlers).each { |handler| handler.call(e, queue, sqs_msg) } raise @@ -59,6 +62,17 @@ def process private + # Returns payload hash for instrumentation events + # + # @return [Hash] the payload for instrumentation + def message_payload + { + queue: queue, + message_id: sqs_msg.is_a?(Array) ? sqs_msg.map(&:message_id) : sqs_msg.message_id, + worker: worker&.class&.name + } + end + # Fetches the worker instance for processing # # @return [Object, nil] the worker instance or nil if not found diff --git a/lib/shoryuken/util.rb b/lib/shoryuken/util.rb index d4a0741c..21ff9992 100644 --- a/lib/shoryuken/util.rb +++ b/lib/shoryuken/util.rb @@ -11,7 +11,16 @@ def logger Shoryuken.logger end - # Fires a lifecycle event to all registered handlers + # Maps legacy lifecycle events to new instrumentation event names + LEGACY_EVENT_MAP = { + startup: 'app.started', + quiet: 'app.quiet', + shutdown: 'app.stopping', + stopped: 'app.stopped' + }.freeze + + # Fires a lifecycle event to all registered handlers. + # Also publishes to the instrumentation monitor for subscribers. # # @param event [Symbol] the event name to fire # @param reverse [Boolean] whether to call handlers in reverse order @@ -19,6 +28,12 @@ def logger # @return [void] def fire_event(event, reverse = false, event_options = {}) logger.debug { "Firing '#{event}' lifecycle event" } + + # Publish to the new instrumentation system + new_event_name = LEGACY_EVENT_MAP[event] || "legacy.#{event}" + Shoryuken.monitor.publish(new_event_name, event_options.merge(legacy_event: event)) + + # Maintain backward compatibility with existing callback system arr = Shoryuken.options[:lifecycle_events][event] arr.reverse! if reverse arr.each do |block| diff --git a/spec/integration/instrumentation/instrumentation_spec.rb b/spec/integration/instrumentation/instrumentation_spec.rb new file mode 100644 index 00000000..cdf82fc9 --- /dev/null +++ b/spec/integration/instrumentation/instrumentation_spec.rb @@ -0,0 +1,59 @@ +# frozen_string_literal: true + +# This spec tests the instrumentation system integration. +# It verifies that events are published during message processing. + +setup_localstack + +queue_name = DT.queue +create_test_queue(queue_name) +Shoryuken.add_group('default', 1) +Shoryuken.add_queue(queue_name, 1, 'default') + +# Reset monitor to ensure clean state +Shoryuken.reset_monitor! + +# Collect events +events_received = [] +Shoryuken.monitor.subscribe do |event| + events_received << { name: event.name, payload: event.payload.dup, time: event.time } +end + +# Worker for testing +worker_class = Class.new do + include Shoryuken::Worker + + shoryuken_options auto_delete: true, batch: false + + def perform(sqs_msg, body) + DT[:processed] << { message_id: sqs_msg.message_id, body: body } + end +end + +worker_class.get_shoryuken_options['queue'] = queue_name +Shoryuken.register_worker(queue_name, worker_class) + +# Send test messages +2.times { |i| Shoryuken::Client.queues(queue_name).send_message(message_body: "instrumentation-test-#{i}") } + +sleep 1 + +poll_queues_until { DT[:processed].size >= 2 } + +# Verify messages were processed +assert_equal(2, DT[:processed].size) + +# Verify instrumentation events were captured +processed_events = events_received.select { |e| e[:name] == 'message.processed' } +assert(processed_events.size >= 2, "Should have at least 2 message.processed events, got #{processed_events.size}") + +# Verify event payloads contain expected data +processed_events.each do |event| + assert_equal(queue_name, event[:payload][:queue], 'Event should include queue name') + assert(event[:payload][:message_id], 'Event should include message_id') + assert(event[:payload][:duration], 'Event should include duration') + assert(event[:payload][:duration] >= 0, 'Duration should be non-negative') +end + +# Cleanup +Shoryuken.reset_monitor! diff --git a/spec/lib/shoryuken/instrumentation/event_spec.rb b/spec/lib/shoryuken/instrumentation/event_spec.rb new file mode 100644 index 00000000..283c37f4 --- /dev/null +++ b/spec/lib/shoryuken/instrumentation/event_spec.rb @@ -0,0 +1,69 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Shoryuken::Instrumentation::Event do + describe '#initialize' do + it 'sets the name' do + event = described_class.new('message.processed') + expect(event.name).to eq('message.processed') + end + + it 'sets the payload' do + event = described_class.new('message.processed', queue: 'default', worker: 'TestWorker') + expect(event.payload).to eq(queue: 'default', worker: 'TestWorker') + end + + it 'defaults payload to empty hash' do + event = described_class.new('message.processed') + expect(event.payload).to eq({}) + end + + it 'sets the time' do + freeze_time = Time.now + allow(Time).to receive(:now).and_return(freeze_time) + + event = described_class.new('message.processed') + expect(event.time).to eq(freeze_time) + end + end + + describe '#[]' do + it 'returns payload value by key' do + event = described_class.new('message.processed', queue: 'default') + expect(event[:queue]).to eq('default') + end + + it 'returns nil for missing key' do + event = described_class.new('message.processed') + expect(event[:missing]).to be_nil + end + end + + describe '#duration' do + it 'returns duration from payload' do + event = described_class.new('message.processed', duration: 1.5) + expect(event.duration).to eq(1.5) + end + + it 'returns nil if duration not set' do + event = described_class.new('message.processed') + expect(event.duration).to be_nil + end + end + + describe '#to_h' do + it 'returns hash representation' do + freeze_time = Time.now + allow(Time).to receive(:now).and_return(freeze_time) + + event = described_class.new('message.processed', queue: 'default') + + expect(event.to_h).to eq( + name: 'message.processed', + payload: { queue: 'default' }, + time: freeze_time + ) + end + end +end diff --git a/spec/lib/shoryuken/instrumentation/logger_listener_spec.rb b/spec/lib/shoryuken/instrumentation/logger_listener_spec.rb new file mode 100644 index 00000000..3870299e --- /dev/null +++ b/spec/lib/shoryuken/instrumentation/logger_listener_spec.rb @@ -0,0 +1,130 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Shoryuken::Instrumentation::LoggerListener do + let(:logger) { instance_double(Logger, info: nil, error: nil, debug: nil) } + let(:listener) { described_class.new(logger) } + + describe '#initialize' do + it 'accepts a custom logger' do + expect(listener.logger).to eq(logger) + end + + it 'defaults to Shoryuken.logger' do + listener_without_logger = described_class.new + expect(listener_without_logger.logger).to eq(Shoryuken.logger) + end + end + + describe '#call' do + context 'with app.started event' do + it 'logs info message' do + event = Shoryuken::Instrumentation::Event.new('app.started', groups: %w[default priority]) + + expect(logger).to receive(:info).and_yield.and_return('Shoryuken started with 2 group(s)') + + listener.call(event) + end + end + + context 'with app.stopping event' do + it 'logs info message' do + event = Shoryuken::Instrumentation::Event.new('app.stopping') + + expect(logger).to receive(:info).and_yield.and_return('Shoryuken shutting down...') + + listener.call(event) + end + end + + context 'with app.stopped event' do + it 'logs info message' do + event = Shoryuken::Instrumentation::Event.new('app.stopped') + + expect(logger).to receive(:info).and_yield.and_return('Shoryuken stopped') + + listener.call(event) + end + end + + context 'with message.processed event' do + it 'logs info message with duration' do + event = Shoryuken::Instrumentation::Event.new( + 'message.processed', + queue: 'default', + worker: 'TestWorker', + duration: 0.12345 + ) + + expect(logger).to receive(:info).and_yield.and_return('Processed TestWorker/default in 123.45ms') + + listener.call(event) + end + + it 'handles missing duration' do + event = Shoryuken::Instrumentation::Event.new( + 'message.processed', + queue: 'default', + worker: 'TestWorker' + ) + + expect(logger).to receive(:info) + + listener.call(event) + end + end + + context 'with message.failed event' do + it 'logs error message' do + error = StandardError.new('Something went wrong') + event = Shoryuken::Instrumentation::Event.new( + 'message.failed', + queue: 'default', + worker: 'TestWorker', + error: error + ) + + expect(logger).to receive(:error).and_yield.and_return('Failed TestWorker/default: Something went wrong') + + listener.call(event) + end + end + + context 'with error.occurred event' do + it 'logs error message with class name' do + error = ArgumentError.new('Invalid argument') + event = Shoryuken::Instrumentation::Event.new( + 'error.occurred', + error: error + ) + + expect(logger).to receive(:error).and_yield.and_return('Error occurred: ArgumentError - Invalid argument') + + listener.call(event) + end + end + + context 'with queue.polling event' do + it 'logs debug message' do + event = Shoryuken::Instrumentation::Event.new('queue.polling', queue: 'default') + + expect(logger).to receive(:debug).and_yield.and_return('Polling queue: default') + + listener.call(event) + end + end + + context 'with unknown event' do + it 'does not log anything' do + event = Shoryuken::Instrumentation::Event.new('unknown.event') + + expect(logger).not_to receive(:info) + expect(logger).not_to receive(:error) + expect(logger).not_to receive(:debug) + + listener.call(event) + end + end + end +end diff --git a/spec/lib/shoryuken/instrumentation/notifications_spec.rb b/spec/lib/shoryuken/instrumentation/notifications_spec.rb new file mode 100644 index 00000000..00285102 --- /dev/null +++ b/spec/lib/shoryuken/instrumentation/notifications_spec.rb @@ -0,0 +1,216 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Shoryuken::Instrumentation::Notifications do + let(:notifications) { described_class.new } + + after do + notifications.clear + end + + describe '#subscribe' do + it 'subscribes to specific event' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + notifications.publish('message.processed', queue: 'default') + notifications.publish('message.failed', queue: 'default') + + expect(events.size).to eq(1) + expect(events.first.name).to eq('message.processed') + end + + it 'subscribes to all events when no event name given' do + events = [] + notifications.subscribe { |e| events << e } + + notifications.publish('message.processed', queue: 'default') + notifications.publish('message.failed', queue: 'default') + + expect(events.size).to eq(2) + end + + it 'allows multiple subscribers for same event' do + counter = { a: 0, b: 0 } + notifications.subscribe('message.processed') { counter[:a] += 1 } + notifications.subscribe('message.processed') { counter[:b] += 1 } + + notifications.publish('message.processed') + + expect(counter[:a]).to eq(1) + expect(counter[:b]).to eq(1) + end + end + + describe '#unsubscribe' do + it 'removes subscriber from specific event' do + events = [] + block = ->(e) { events << e } + notifications.subscribe('message.processed', &block) + + notifications.publish('message.processed') + expect(events.size).to eq(1) + + notifications.unsubscribe('message.processed', &block) + notifications.publish('message.processed') + expect(events.size).to eq(1) + end + + it 'removes global subscriber' do + events = [] + block = ->(e) { events << e } + notifications.subscribe(&block) + + notifications.publish('message.processed') + expect(events.size).to eq(1) + + notifications.unsubscribe(&block) + notifications.publish('message.processed') + expect(events.size).to eq(1) + end + end + + describe '#instrument' do + it 'executes the block' do + result = notifications.instrument('message.processed') { 'hello' } + expect(result).to eq('hello') + end + + it 'publishes event with duration' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + notifications.instrument('message.processed', queue: 'default') { sleep 0.01 } + + expect(events.size).to eq(1) + expect(events.first.duration).to be >= 0.01 + expect(events.first[:queue]).to eq('default') + end + + it 'publishes event even without block' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + notifications.instrument('message.processed', queue: 'default') + + expect(events.size).to eq(1) + expect(events.first.duration).to be_a(Float) + end + end + + describe '#publish' do + it 'publishes event by name and payload' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + notifications.publish('message.processed', queue: 'default') + + expect(events.size).to eq(1) + expect(events.first.name).to eq('message.processed') + expect(events.first[:queue]).to eq('default') + end + + it 'publishes existing Event instance' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + event = Shoryuken::Instrumentation::Event.new('message.processed', queue: 'default') + notifications.publish(event) + + expect(events.size).to eq(1) + expect(events.first).to eq(event) + end + + it 'does not raise when subscriber raises' do + notifications.subscribe('message.processed') { raise 'boom' } + + expect { notifications.publish('message.processed') }.not_to raise_error + end + + it 'logs error when subscriber raises' do + notifications.subscribe('message.processed') { raise 'boom' } + + expect(Shoryuken.logger).to receive(:error).at_least(:once) + notifications.publish('message.processed') + end + + it 'continues to other subscribers when one raises' do + events = [] + notifications.subscribe('message.processed') { raise 'boom' } + notifications.subscribe('message.processed') { |e| events << e } + + notifications.publish('message.processed') + + expect(events.size).to eq(1) + end + end + + describe '#clear' do + it 'removes all subscribers' do + notifications.subscribe('message.processed') { } + notifications.subscribe { } + + expect(notifications.subscriber_count('message.processed')).to eq(1) + expect(notifications.subscriber_count).to eq(1) + + notifications.clear + + expect(notifications.subscriber_count('message.processed')).to eq(0) + expect(notifications.subscriber_count).to eq(0) + end + end + + describe '#subscriber_count' do + it 'returns count for specific event' do + notifications.subscribe('message.processed') { } + notifications.subscribe('message.processed') { } + notifications.subscribe('message.failed') { } + + expect(notifications.subscriber_count('message.processed')).to eq(2) + expect(notifications.subscriber_count('message.failed')).to eq(1) + end + + it 'returns count for global subscribers' do + notifications.subscribe { } + notifications.subscribe { } + + expect(notifications.subscriber_count).to eq(2) + end + + it 'returns 0 for events with no subscribers' do + expect(notifications.subscriber_count('nonexistent')).to eq(0) + end + end + + describe 'thread safety' do + it 'handles concurrent subscriptions' do + threads = 10.times.map do + Thread.new do + 10.times do + notifications.subscribe('message.processed') { } + end + end + end + + threads.each(&:join) + + expect(notifications.subscriber_count('message.processed')).to eq(100) + end + + it 'handles concurrent publishing' do + counter = Concurrent::AtomicFixnum.new(0) + notifications.subscribe('message.processed') { counter.increment } + + threads = 10.times.map do + Thread.new do + 10.times { notifications.publish('message.processed') } + end + end + + threads.each(&:join) + + expect(counter.value).to eq(100) + end + end +end diff --git a/spec/lib/shoryuken/monitor_spec.rb b/spec/lib/shoryuken/monitor_spec.rb new file mode 100644 index 00000000..7abffc67 --- /dev/null +++ b/spec/lib/shoryuken/monitor_spec.rb @@ -0,0 +1,49 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe 'Shoryuken.monitor' do + after do + Shoryuken.reset_monitor! + end + + describe '.monitor' do + it 'returns a Notifications instance' do + expect(Shoryuken.monitor).to be_a(Shoryuken::Instrumentation::Notifications) + end + + it 'returns the same instance on multiple calls' do + monitor1 = Shoryuken.monitor + monitor2 = Shoryuken.monitor + expect(monitor1).to be(monitor2) + end + + it 'allows subscribing to events' do + events = [] + Shoryuken.monitor.subscribe('test.event') { |e| events << e } + + Shoryuken.monitor.publish('test.event', key: 'value') + + expect(events.size).to eq(1) + expect(events.first[:key]).to eq('value') + end + end + + describe '.reset_monitor!' do + it 'creates a new monitor instance' do + original = Shoryuken.monitor + Shoryuken.reset_monitor! + expect(Shoryuken.monitor).not_to be(original) + end + + it 'clears subscribers' do + events = [] + Shoryuken.monitor.subscribe('test.event') { |e| events << e } + + Shoryuken.reset_monitor! + Shoryuken.monitor.publish('test.event') + + expect(events).to be_empty + end + end +end diff --git a/spec/lib/shoryuken/processor_spec.rb b/spec/lib/shoryuken/processor_spec.rb index b131efcf..98c2546c 100644 --- a/spec/lib/shoryuken/processor_spec.rb +++ b/spec/lib/shoryuken/processor_spec.rb @@ -34,6 +34,46 @@ subject.process end + context 'instrumentation' do + before do + Shoryuken.reset_monitor! + end + + after do + Shoryuken.reset_monitor! + end + + it 'publishes message.processed event on success' do + events = [] + Shoryuken.monitor.subscribe('message.processed') { |e| events << e } + + allow_any_instance_of(TestWorker).to receive(:perform) + + subject.process + + expect(events.size).to eq(1) + expect(events.first[:queue]).to eq(queue) + expect(events.first[:message_id]).to eq(sqs_msg.message_id) + expect(events.first[:worker]).to eq('TestWorker') + expect(events.first.duration).to be_a(Float) + end + + it 'publishes message.failed event on error' do + events = [] + Shoryuken.monitor.subscribe('message.failed') { |e| events << e } + + allow_any_instance_of(TestWorker).to receive(:perform).and_raise(StandardError, 'test error') + + expect { subject.process }.to raise_error(StandardError, 'test error') + + expect(events.size).to eq(1) + expect(events.first[:queue]).to eq(queue) + expect(events.first[:message_id]).to eq(sqs_msg.message_id) + expect(events.first[:error]).to be_a(StandardError) + expect(events.first[:error].message).to eq('test error') + end + end + context 'when custom middleware' do let(:queue) { 'worker_called_middleware' } diff --git a/spec/lib/shoryuken/util_spec.rb b/spec/lib/shoryuken/util_spec.rb index 0a081e25..b9fb2adb 100644 --- a/spec/lib/shoryuken/util_spec.rb +++ b/spec/lib/shoryuken/util_spec.rb @@ -52,8 +52,13 @@ let(:callback_without_options) { proc { value_holder.value = :without_options } } let(:callback_with_options) { proc { |options| value_holder.value = [:with_options, options] } } + before do + Shoryuken.reset_monitor! + end + after :all do Shoryuken.options[:lifecycle_events].delete(:some_event) + Shoryuken.reset_monitor! end it 'triggers callbacks that do not accept arguments' do @@ -69,5 +74,40 @@ expect(value_holder).to receive(:value=).with([:with_options, { my_option: :some_option }]) subject.fire_event(:some_event, false, my_option: :some_option) end + + context 'instrumentation' do + it 'publishes mapped event for known lifecycle events' do + events = [] + Shoryuken.monitor.subscribe('app.started') { |e| events << e } + + Shoryuken.options[:lifecycle_events][:startup] = [] + subject.fire_event(:startup) + + expect(events.size).to eq(1) + expect(events.first.name).to eq('app.started') + expect(events.first[:legacy_event]).to eq(:startup) + end + + it 'publishes legacy.* event for unknown lifecycle events' do + events = [] + Shoryuken.monitor.subscribe('legacy.dispatch') { |e| events << e } + + Shoryuken.options[:lifecycle_events][:dispatch] = [] + subject.fire_event(:dispatch) + + expect(events.size).to eq(1) + expect(events.first.name).to eq('legacy.dispatch') + end + + it 'includes event_options in the payload' do + events = [] + Shoryuken.monitor.subscribe('app.stopped') { |e| events << e } + + Shoryuken.options[:lifecycle_events][:stopped] = [] + subject.fire_event(:stopped, false, custom_key: 'custom_value') + + expect(events.first[:custom_key]).to eq('custom_value') + end + end end end From 442bd6231951b6cb2d91e75a302145e3751d701b Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Thu, 11 Dec 2025 16:00:37 +0100 Subject: [PATCH 2/5] Add ActiveSupport and Karafka-style error handling to instrumentation - Make instrument() ActiveSupport::Notifications compatible: - Add :exception [class_name, message] to payload on error - Add :exception_object to payload on error - Always publish event via ensure (even on error) - Re-raise the exception after publishing - Add Karafka-style error.occurred event on exception: - Publishes separate error.occurred event with :type key - Includes :error, :error_class, :error_message - Preserves original payload for context - Allows centralized error handling via single subscription - Add comprehensive tests for both behaviors --- .../instrumentation/notifications.rb | 55 ++++++- lib/shoryuken/processor.rb | 3 +- .../instrumentation/notifications_spec.rb | 144 ++++++++++++++++++ spec/lib/shoryuken/processor_spec.rb | 25 ++- 4 files changed, 215 insertions(+), 12 deletions(-) diff --git a/lib/shoryuken/instrumentation/notifications.rb b/lib/shoryuken/instrumentation/notifications.rb index 63141e6e..f3d60301 100644 --- a/lib/shoryuken/instrumentation/notifications.rb +++ b/lib/shoryuken/instrumentation/notifications.rb @@ -83,24 +83,65 @@ def unsubscribe(event_name = nil, &block) end end - # Instruments a block of code, measuring its duration and publishing an event + # Instruments a block of code, measuring its duration and publishing an event. + # Compatible with ActiveSupport::Notifications - if an exception occurs, + # it adds :exception and :exception_object to the payload and re-raises. + # + # Additionally, on exception, publishes a separate 'error.occurred' event + # (Karafka-style) with a :type key indicating the original event name. # # @param event_name [String] the event name to publish # @param payload [Hash] additional data to include in the event - # @yield the code block to instrument + # @yield [payload] the code block to instrument (payload is yielded for modification) # @return [Object] the result of the block # - # @example + # @example Basic usage # monitor.instrument('message.processed', queue: 'default') do # worker.perform(message) # end + # + # @example Checking for exceptions in subscriber + # monitor.subscribe('message.processed') do |event| + # if event[:exception] + # # Handle error case + # Sentry.capture_exception(event[:exception_object]) + # else + # # Handle success case + # StatsD.timing('process_time', event.duration) + # end + # end + # + # @example Subscribing to all errors (Karafka-style) + # monitor.subscribe('error.occurred') do |event| + # Sentry.capture_exception(event[:error], extra: { type: event[:type] }) + # end def instrument(event_name, payload = {}) started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) - result = yield if block_given? - duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + exception_raised = nil + begin + result = yield payload if block_given? + rescue Exception => e + exception_raised = e + payload[:exception] = [e.class.name, e.message] + payload[:exception_object] = e + raise e + ensure + duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + event = Event.new(event_name, payload.merge(duration: duration)) + publish(event) - event = Event.new(event_name, payload.merge(duration: duration)) - publish(event) + # Publish a separate error.occurred event (Karafka-style) for centralized error handling + if exception_raised + error_payload = payload.merge( + type: event_name, + error: exception_raised, + error_class: exception_raised.class.name, + error_message: exception_raised.message, + duration: duration + ) + publish('error.occurred', error_payload) + end + end result end diff --git a/lib/shoryuken/processor.rb b/lib/shoryuken/processor.rb index 644ca0d6..7f6b795f 100644 --- a/lib/shoryuken/processor.rb +++ b/lib/shoryuken/processor.rb @@ -54,7 +54,8 @@ def process worker_perform.call end rescue Exception => e - Shoryuken.monitor.publish('message.failed', message_payload.merge(error: e)) + # Note: message.processed event is already published by instrument() with + # :exception and :exception_object in the payload (ActiveSupport-compatible) Array(Shoryuken.exception_handlers).each { |handler| handler.call(e, queue, sqs_msg) } raise diff --git a/spec/lib/shoryuken/instrumentation/notifications_spec.rb b/spec/lib/shoryuken/instrumentation/notifications_spec.rb index 00285102..2f378d37 100644 --- a/spec/lib/shoryuken/instrumentation/notifications_spec.rb +++ b/spec/lib/shoryuken/instrumentation/notifications_spec.rb @@ -97,6 +97,150 @@ expect(events.size).to eq(1) expect(events.first.duration).to be_a(Float) end + + it 'yields payload to allow modification' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + notifications.instrument('message.processed', queue: 'default') do |payload| + payload[:custom_key] = 'custom_value' + end + + expect(events.first[:custom_key]).to eq('custom_value') + end + + context 'when exception occurs (ActiveSupport-compatible)' do + it 'adds :exception to payload with class name and message' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + raise ArgumentError, 'invalid argument' + end + end.to raise_error(ArgumentError, 'invalid argument') + + expect(events.size).to eq(1) + expect(events.first[:exception]).to eq(['ArgumentError', 'invalid argument']) + end + + it 'adds :exception_object to payload' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + raise StandardError, 'test error' + end + end.to raise_error(StandardError) + + expect(events.first[:exception_object]).to be_a(StandardError) + expect(events.first[:exception_object].message).to eq('test error') + end + + it 'still publishes event with duration' do + events = [] + notifications.subscribe('message.processed') { |e| events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + sleep 0.01 + raise 'error' + end + end.to raise_error(RuntimeError) + + expect(events.first.duration).to be >= 0.01 + end + + it 're-raises the exception' do + expect do + notifications.instrument('message.processed') do + raise ArgumentError, 'test' + end + end.to raise_error(ArgumentError, 'test') + end + end + + context 'when exception occurs (Karafka-style error.occurred)' do + it 'publishes error.occurred event with type key' do + error_events = [] + notifications.subscribe('error.occurred') { |e| error_events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + raise StandardError, 'test error' + end + end.to raise_error(StandardError) + + expect(error_events.size).to eq(1) + expect(error_events.first[:type]).to eq('message.processed') + end + + it 'includes error object in error.occurred event' do + error_events = [] + notifications.subscribe('error.occurred') { |e| error_events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + raise ArgumentError, 'invalid argument' + end + end.to raise_error(ArgumentError) + + expect(error_events.first[:error]).to be_a(ArgumentError) + expect(error_events.first[:error].message).to eq('invalid argument') + end + + it 'includes error_class and error_message in error.occurred event' do + error_events = [] + notifications.subscribe('error.occurred') { |e| error_events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + raise RuntimeError, 'something went wrong' + end + end.to raise_error(RuntimeError) + + expect(error_events.first[:error_class]).to eq('RuntimeError') + expect(error_events.first[:error_message]).to eq('something went wrong') + end + + it 'includes original payload in error.occurred event' do + error_events = [] + notifications.subscribe('error.occurred') { |e| error_events << e } + + expect do + notifications.instrument('message.processed', queue: 'default', worker: 'TestWorker') do + raise StandardError, 'test error' + end + end.to raise_error(StandardError) + + expect(error_events.first[:queue]).to eq('default') + expect(error_events.first[:worker]).to eq('TestWorker') + end + + it 'includes duration in error.occurred event' do + error_events = [] + notifications.subscribe('error.occurred') { |e| error_events << e } + + expect do + notifications.instrument('message.processed', queue: 'default') do + sleep 0.01 + raise StandardError, 'test error' + end + end.to raise_error(StandardError) + + expect(error_events.first[:duration]).to be >= 0.01 + end + + it 'does not publish error.occurred on success' do + error_events = [] + notifications.subscribe('error.occurred') { |e| error_events << e } + + notifications.instrument('message.processed', queue: 'default') { 'success' } + + expect(error_events).to be_empty + end + end end describe '#publish' do diff --git a/spec/lib/shoryuken/processor_spec.rb b/spec/lib/shoryuken/processor_spec.rb index 98c2546c..c32091d2 100644 --- a/spec/lib/shoryuken/processor_spec.rb +++ b/spec/lib/shoryuken/processor_spec.rb @@ -58,9 +58,9 @@ expect(events.first.duration).to be_a(Float) end - it 'publishes message.failed event on error' do + it 'includes exception info in message.processed event on error (ActiveSupport-compatible)' do events = [] - Shoryuken.monitor.subscribe('message.failed') { |e| events << e } + Shoryuken.monitor.subscribe('message.processed') { |e| events << e } allow_any_instance_of(TestWorker).to receive(:perform).and_raise(StandardError, 'test error') @@ -69,8 +69,25 @@ expect(events.size).to eq(1) expect(events.first[:queue]).to eq(queue) expect(events.first[:message_id]).to eq(sqs_msg.message_id) - expect(events.first[:error]).to be_a(StandardError) - expect(events.first[:error].message).to eq('test error') + expect(events.first[:exception]).to eq(['StandardError', 'test error']) + expect(events.first[:exception_object]).to be_a(StandardError) + expect(events.first[:exception_object].message).to eq('test error') + end + + it 'publishes error.occurred event on error (Karafka-style)' do + error_events = [] + Shoryuken.monitor.subscribe('error.occurred') { |e| error_events << e } + + allow_any_instance_of(TestWorker).to receive(:perform).and_raise(StandardError, 'test error') + + expect { subject.process }.to raise_error(StandardError, 'test error') + + expect(error_events.size).to eq(1) + expect(error_events.first[:type]).to eq('message.processed') + expect(error_events.first[:queue]).to eq(queue) + expect(error_events.first[:error]).to be_a(StandardError) + expect(error_events.first[:error_class]).to eq('StandardError') + expect(error_events.first[:error_message]).to eq('test error') end end From acf9f40e913f9d5f5bdebfc4553cd0fe88d28ce6 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Thu, 11 Dec 2025 16:24:14 +0100 Subject: [PATCH 3/5] Migrate logging to use instrumentation listener pattern - Add new events to Notifications: - fetcher.started, fetcher.completed, fetcher.retry - manager.dispatch, manager.processor_assigned, manager.failed - queue.empty, app.quiet - Enhance LoggerListener to handle all events: - Use dynamic method dispatch (on_event_name pattern) - Handle fetcher events (started, completed, retry) - Handle manager events (dispatch, processor_assigned, failed) - Handle queue events (polling, empty) - Skip logging message.processed when exception present - Include backtrace in error.occurred logging - Migrate direct logger calls to events: - Fetcher: publish fetcher.started/completed/retry events - Manager: publish manager.dispatch/processor_assigned/failed events - Launcher: remove direct logger calls (use events via fire_event) - Update tests: - Add comprehensive LoggerListener tests for new events - Update fetcher_spec to verify events instead of logger calls This follows the Karafka pattern where components publish events and listeners handle logging, allowing users to customize logging behavior or replace it entirely with their own listener. --- lib/shoryuken/fetcher.rb | 14 +- .../instrumentation/logger_listener.rb | 96 +++++++--- .../instrumentation/notifications.rb | 10 + lib/shoryuken/launcher.rb | 16 +- lib/shoryuken/manager.rb | 20 +- spec/lib/shoryuken/fetcher_spec.rb | 18 +- .../instrumentation/logger_listener_spec.rb | 177 +++++++++++++++++- 7 files changed, 295 insertions(+), 56 deletions(-) diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index f2623827..e827fd8c 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -25,12 +25,14 @@ def fetch(queue, limit) fetch_with_auto_retry(3) do started_at = Time.now - logger.debug { "Looking for new messages in #{queue}" } + Shoryuken.monitor.publish('fetcher.started', queue: queue.name, limit: limit) sqs_msgs = Array(receive_messages(queue, limit)) - logger.debug { "Found #{sqs_msgs.size} messages for #{queue.name}" } unless sqs_msgs.empty? - logger.debug { "Fetcher for #{queue} completed in #{elapsed(started_at)} ms" } + Shoryuken.monitor.publish('fetcher.completed', + queue: queue.name, + message_count: sqs_msgs.size, + duration_ms: elapsed(started_at)) sqs_msgs end @@ -54,7 +56,11 @@ def fetch_with_auto_retry(max_attempts) attempts += 1 - logger.debug { "Retrying fetch attempt #{attempts} for #{e.message}" } + Shoryuken.monitor.publish('fetcher.retry', + attempt: attempts, + max_attempts: max_attempts, + error_message: e.message, + error_class: e.class.name) sleep((1..5).to_a.sample) diff --git a/lib/shoryuken/instrumentation/logger_listener.rb b/lib/shoryuken/instrumentation/logger_listener.rb index 1bd2580b..0111f6a7 100644 --- a/lib/shoryuken/instrumentation/logger_listener.rb +++ b/lib/shoryuken/instrumentation/logger_listener.rb @@ -28,40 +28,73 @@ def logger # @param event [Event] the event to handle # @return [void] def call(event) - case event.name - when 'app.started' - log_app_started(event) - when 'app.stopping' - log_app_stopping(event) - when 'app.stopped' - log_app_stopped(event) - when 'message.processed' - log_message_processed(event) - when 'message.failed' - log_message_failed(event) - when 'error.occurred' - log_error_occurred(event) - when 'queue.polling' - log_queue_polling(event) - end + method_name = "on_#{event.name.tr('.', '_')}" + send(method_name, event) if respond_to?(method_name, true) end private - def log_app_started(event) + # App lifecycle events + + def on_app_started(event) groups = event[:groups] || [] logger.info { "Shoryuken started with #{groups.size} group(s)" } end - def log_app_stopping(_event) + def on_app_stopping(_event) logger.info { 'Shoryuken shutting down...' } end - def log_app_stopped(_event) + def on_app_stopped(_event) logger.info { 'Shoryuken stopped' } end - def log_message_processed(event) + def on_app_quiet(_event) + logger.info { 'Shoryuken is quiet' } + end + + # Fetcher events + + def on_fetcher_started(event) + logger.debug { "Looking for new messages in #{event[:queue]}" } + end + + def on_fetcher_completed(event) + queue = event[:queue] + message_count = event[:message_count] || 0 + duration_ms = event[:duration_ms] + + logger.debug { "Found #{message_count} messages for #{queue}" } if message_count.positive? + logger.debug { "Fetcher for #{queue} completed in #{duration_ms} ms" } + end + + def on_fetcher_retry(event) + logger.debug { "Retrying fetch attempt #{event[:attempt]} for #{event[:error_message]}" } + end + + # Manager events + + def on_manager_dispatch(event) + logger.debug do + "Ready: #{event[:ready]}, Busy: #{event[:busy]}, Active Queues: #{event[:active_queues]}" + end + end + + def on_manager_processor_assigned(event) + logger.debug { "Assigning #{event[:message_id]}" } + end + + def on_manager_failed(event) + logger.error { "Manager failed: #{event[:error_message]}" } + logger.error { event[:backtrace].join("\n") } if event[:backtrace] + end + + # Message processing events + + def on_message_processed(event) + # Skip logging if there was an exception - error.occurred handles that + return if event[:exception] + duration_ms = event.duration ? (event.duration * 1000).round(2) : 0 worker = event[:worker] || 'Unknown' queue = event[:queue] || 'Unknown' @@ -69,7 +102,7 @@ def log_message_processed(event) logger.info { "Processed #{worker}/#{queue} in #{duration_ms}ms" } end - def log_message_failed(event) + def on_message_failed(event) worker = event[:worker] || 'Unknown' queue = event[:queue] || 'Unknown' error = event[:error] @@ -78,18 +111,33 @@ def log_message_failed(event) logger.error { "Failed #{worker}/#{queue}: #{error_message}" } end - def log_error_occurred(event) + # Error events + + def on_error_occurred(event) error = event[:error] error_class = error.respond_to?(:class) ? error.class.name : 'Unknown' error_message = error.respond_to?(:message) ? error.message : error.to_s + type = event[:type] + + if type + logger.error { "Error in #{type}: #{error_class} - #{error_message}" } + else + logger.error { "Error occurred: #{error_class} - #{error_message}" } + end - logger.error { "Error occurred: #{error_class} - #{error_message}" } + logger.error { error.backtrace.join("\n") } if error.respond_to?(:backtrace) && error.backtrace end - def log_queue_polling(event) + # Queue events + + def on_queue_polling(event) queue = event[:queue] || 'Unknown' logger.debug { "Polling queue: #{queue}" } end + + def on_queue_empty(event) + logger.debug { "Queue #{event[:queue]} is empty" } + end end end end diff --git a/lib/shoryuken/instrumentation/notifications.rb b/lib/shoryuken/instrumentation/notifications.rb index f3d60301..3e1f4c80 100644 --- a/lib/shoryuken/instrumentation/notifications.rb +++ b/lib/shoryuken/instrumentation/notifications.rb @@ -29,6 +29,16 @@ class Notifications app.stopped app.quiet + fetcher.started + fetcher.completed + fetcher.retry + + manager.dispatch + manager.processor_assigned + manager.processor_done + manager.utilization_changed + manager.failed + message.received message.processed message.failed diff --git a/lib/shoryuken/launcher.rb b/lib/shoryuken/launcher.rb index 28205b42..5ccf735f 100644 --- a/lib/shoryuken/launcher.rb +++ b/lib/shoryuken/launcher.rb @@ -27,8 +27,6 @@ def stopping? # # @return [void] def start - logger.info { 'Starting' } - start_callback start_managers end @@ -113,8 +111,6 @@ def start_managers # # @return [void] def initiate_stop - logger.info { 'Shutting down' } - stop_callback end @@ -122,11 +118,7 @@ def initiate_stop # # @return [void] def start_callback - if (callback = Shoryuken.start_callback) - logger.debug { 'Calling start_callback' } - callback.call - end - + Shoryuken.start_callback&.call fire_event(:startup) end @@ -134,11 +126,7 @@ def start_callback # # @return [void] def stop_callback - if (callback = Shoryuken.stop_callback) - logger.debug { 'Calling stop_callback' } - callback.call - end - + Shoryuken.stop_callback&.call fire_event(:shutdown, true) end diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 54c2cc91..ee013bba 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -93,7 +93,12 @@ def dispatch fire_event(:dispatch, false, queue_name: queue.name) - logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" } + Shoryuken.monitor.publish('manager.dispatch', + group: @group, + queue: queue.name, + ready: ready, + busy: busy, + active_queues: @polling_strategy.active_queues) batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) rescue => e @@ -139,7 +144,10 @@ def processor_done(queue) def assign(queue_name, sqs_msg) return unless running? - logger.debug { "Assigning #{sqs_msg.message_id}" } + Shoryuken.monitor.publish('manager.processor_assigned', + group: @group, + queue: queue_name, + message_id: sqs_msg.message_id) @busy_processors.increment fire_utilization_update_event @@ -205,8 +213,12 @@ def message_id # @param ex [Exception] the exception that occurred # @return [void] def handle_dispatch_error(ex) - logger.error { "Manager failed: #{ex.message}" } - logger.error { ex.backtrace.join("\n") } unless ex.backtrace.nil? + Shoryuken.monitor.publish('manager.failed', + group: @group, + error: ex, + error_message: ex.message, + error_class: ex.class.name, + backtrace: ex.backtrace) Process.kill('USR1', Process.pid) diff --git a/spec/lib/shoryuken/fetcher_spec.rb b/spec/lib/shoryuken/fetcher_spec.rb index 61befe74..5c84e498 100644 --- a/spec/lib/shoryuken/fetcher_spec.rb +++ b/spec/lib/shoryuken/fetcher_spec.rb @@ -35,20 +35,22 @@ subject.fetch(queue_config, limit) end - it 'logs debug only' do + it 'publishes fetcher events' do # See https://github.com/ruby-shoryuken/shoryuken/issues/435 - logger = double 'logger' - - allow(subject).to receive(:logger).and_return(logger) + # Fetcher should publish events instead of direct logging + events = [] + Shoryuken.monitor.subscribe('fetcher.started') { |e| events << e } + Shoryuken.monitor.subscribe('fetcher.completed') { |e| events << e } expect(Shoryuken::Client).to receive(:queues).with(queue_name).and_return(queue) - expect(queue).to receive(:receive_messages).and_return([double('SQS Msg')]) - expect(logger).to receive(:debug).exactly(3).times - expect(logger).to_not receive(:info) - subject.fetch(queue_config, limit) + + expect(events.size).to eq(2) + expect(events.map(&:name)).to eq(%w[fetcher.started fetcher.completed]) + expect(events.first[:queue]).to eq(queue_name) + expect(events.last[:message_count]).to eq(1) end context 'when receive options per queue' do diff --git a/spec/lib/shoryuken/instrumentation/logger_listener_spec.rb b/spec/lib/shoryuken/instrumentation/logger_listener_spec.rb index 3870299e..97b9d2fc 100644 --- a/spec/lib/shoryuken/instrumentation/logger_listener_spec.rb +++ b/spec/lib/shoryuken/instrumentation/logger_listener_spec.rb @@ -3,7 +3,7 @@ require 'spec_helper' RSpec.describe Shoryuken::Instrumentation::LoggerListener do - let(:logger) { instance_double(Logger, info: nil, error: nil, debug: nil) } + let(:logger) { instance_double(Logger, info: nil, error: nil, debug: nil, warn: nil) } let(:listener) { described_class.new(logger) } describe '#initialize' do @@ -48,6 +48,129 @@ end end + context 'with app.quiet event' do + it 'logs info message' do + event = Shoryuken::Instrumentation::Event.new('app.quiet') + + expect(logger).to receive(:info).and_yield.and_return('Shoryuken is quiet') + + listener.call(event) + end + end + + context 'with fetcher.started event' do + it 'logs debug message' do + event = Shoryuken::Instrumentation::Event.new('fetcher.started', queue: 'default', limit: 10) + + expect(logger).to receive(:debug).and_yield.and_return('Looking for new messages in default') + + listener.call(event) + end + end + + context 'with fetcher.completed event' do + it 'logs debug message with message count' do + event = Shoryuken::Instrumentation::Event.new( + 'fetcher.completed', + queue: 'default', + message_count: 5, + duration_ms: 123.45 + ) + + expect(logger).to receive(:debug).twice + + listener.call(event) + end + + it 'does not log found messages when count is zero' do + event = Shoryuken::Instrumentation::Event.new( + 'fetcher.completed', + queue: 'default', + message_count: 0, + duration_ms: 50.0 + ) + + # Only one debug call for completion, not for "found messages" + expect(logger).to receive(:debug).once + + listener.call(event) + end + end + + context 'with fetcher.retry event' do + it 'logs debug message with attempt info' do + event = Shoryuken::Instrumentation::Event.new( + 'fetcher.retry', + attempt: 2, + max_attempts: 3, + error_message: 'Connection timeout' + ) + + expect(logger).to receive(:debug).and_yield.and_return('Retrying fetch attempt 2 for Connection timeout') + + listener.call(event) + end + end + + context 'with manager.dispatch event' do + it 'logs debug message with state info' do + event = Shoryuken::Instrumentation::Event.new( + 'manager.dispatch', + group: 'default', + queue: 'my_queue', + ready: 5, + busy: 3, + active_queues: %w[queue1 queue2] + ) + + expect(logger).to receive(:debug) + + listener.call(event) + end + end + + context 'with manager.processor_assigned event' do + it 'logs debug message with message id' do + event = Shoryuken::Instrumentation::Event.new( + 'manager.processor_assigned', + group: 'default', + queue: 'my_queue', + message_id: 'msg-123' + ) + + expect(logger).to receive(:debug).and_yield.and_return('Assigning msg-123') + + listener.call(event) + end + end + + context 'with manager.failed event' do + it 'logs error message and backtrace' do + event = Shoryuken::Instrumentation::Event.new( + 'manager.failed', + group: 'default', + error_message: 'Something went wrong', + backtrace: ['line1', 'line2'] + ) + + expect(logger).to receive(:error).twice + + listener.call(event) + end + + it 'handles missing backtrace' do + event = Shoryuken::Instrumentation::Event.new( + 'manager.failed', + group: 'default', + error_message: 'Something went wrong' + ) + + expect(logger).to receive(:error).once + + listener.call(event) + end + end + context 'with message.processed event' do it 'logs info message with duration' do event = Shoryuken::Instrumentation::Event.new( @@ -73,6 +196,20 @@ listener.call(event) end + + it 'does not log when exception is present' do + event = Shoryuken::Instrumentation::Event.new( + 'message.processed', + queue: 'default', + worker: 'TestWorker', + duration: 0.12345, + exception: ['StandardError', 'test error'] + ) + + expect(logger).not_to receive(:info) + + listener.call(event) + end end context 'with message.failed event' do @@ -99,7 +236,33 @@ error: error ) - expect(logger).to receive(:error).and_yield.and_return('Error occurred: ArgumentError - Invalid argument') + expect(logger).to receive(:error).at_least(:once) + + listener.call(event) + end + + it 'includes type when present' do + error = ArgumentError.new('Invalid argument') + event = Shoryuken::Instrumentation::Event.new( + 'error.occurred', + error: error, + type: 'message.processed' + ) + + expect(logger).to receive(:error).at_least(:once) + + listener.call(event) + end + + it 'logs backtrace when present' do + error = ArgumentError.new('Invalid argument') + error.set_backtrace(['line1', 'line2']) + event = Shoryuken::Instrumentation::Event.new( + 'error.occurred', + error: error + ) + + expect(logger).to receive(:error).twice listener.call(event) end @@ -115,6 +278,16 @@ end end + context 'with queue.empty event' do + it 'logs debug message' do + event = Shoryuken::Instrumentation::Event.new('queue.empty', queue: 'default') + + expect(logger).to receive(:debug).and_yield.and_return('Queue default is empty') + + listener.call(event) + end + end + context 'with unknown event' do it 'does not log anything' do event = Shoryuken::Instrumentation::Event.new('unknown.event') From d6d5b22d448a020ca4e905ad1a128d8f48ef7f5e Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Thu, 11 Dec 2025 16:36:33 +0100 Subject: [PATCH 4/5] Fix manager.processor_assigned event for test compatibility Handle case where sqs_msg doesn't respond to message_id (e.g., in tests using plain arrays as message mocks). This prevents NoMethodError during instrumentation event publishing. --- lib/shoryuken/manager.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index ee013bba..699abe55 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -144,10 +144,11 @@ def processor_done(queue) def assign(queue_name, sqs_msg) return unless running? + message_id = sqs_msg.respond_to?(:message_id) ? sqs_msg.message_id : sqs_msg.to_s Shoryuken.monitor.publish('manager.processor_assigned', group: @group, queue: queue_name, - message_id: sqs_msg.message_id) + message_id: message_id) @busy_processors.increment fire_utilization_update_event From ce2fe3817f3083bd76effe5e536de2b70fbe0946 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Thu, 11 Dec 2025 16:49:31 +0100 Subject: [PATCH 5/5] Use proper SQS message doubles in manager_spec Replace plain arrays/integers with proper message doubles that respond to message_id, matching the real SQS message interface. This ensures tests accurately reflect production behavior. --- lib/shoryuken/manager.rb | 3 +-- spec/lib/shoryuken/manager_spec.rb | 22 +++++++++++++++------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 699abe55..ee013bba 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -144,11 +144,10 @@ def processor_done(queue) def assign(queue_name, sqs_msg) return unless running? - message_id = sqs_msg.respond_to?(:message_id) ? sqs_msg.message_id : sqs_msg.to_s Shoryuken.monitor.publish('manager.processor_assigned', group: @group, queue: queue_name, - message_id: message_id) + message_id: sqs_msg.message_id) @busy_processors.increment fire_utilization_update_event diff --git a/spec/lib/shoryuken/manager_spec.rb b/spec/lib/shoryuken/manager_spec.rb index bc3db4e5..5756bb41 100644 --- a/spec/lib/shoryuken/manager_spec.rb +++ b/spec/lib/shoryuken/manager_spec.rb @@ -14,6 +14,11 @@ let(:concurrency) { 1 } let(:executor) { Concurrent::ImmediateExecutor.new } + # Helper to create proper SQS message doubles + def sqs_message(id: SecureRandom.uuid, body: 'test') + double(Shoryuken::Message, message_id: id, body: body, receipt_handle: SecureRandom.uuid) + end + subject { Shoryuken::Manager.new('default', fetcher, polling_strategy, concurrency, executor) } before do @@ -64,7 +69,7 @@ end specify do - message = ['test1'] + message = sqs_message(id: 'msg-123') messages = [message] q = Shoryuken::Polling::QueueConfiguration.new(queue, {}) @@ -101,7 +106,7 @@ context 'when batch' do specify do - messages = %w[test1 test2 test3] + messages = [sqs_message(id: 'msg-1'), sqs_message(id: 'msg-2'), sqs_message(id: 'msg-3')] q = Shoryuken::Polling::QueueConfiguration.new(queue, {}) expect(fetcher).to receive(:fetch).with(q, described_class::BATCH_LIMIT).and_return(messages) @@ -142,13 +147,16 @@ describe '#dispatch_single_messages' do let(:concurrency) { 3 } - it 'assings messages from batch one by one' do + it 'assigns messages from batch one by one' do q = polling_strategy.next_queue - messages = [1, 2, 3] + msg1 = sqs_message(id: 'msg-1') + msg2 = sqs_message(id: 'msg-2') + msg3 = sqs_message(id: 'msg-3') + messages = [msg1, msg2, msg3] expect(fetcher).to receive(:fetch).with(q, concurrency).and_return(messages) - expect_any_instance_of(described_class).to receive(:assign).with(q.name, 1) - expect_any_instance_of(described_class).to receive(:assign).with(q.name, 2) - expect_any_instance_of(described_class).to receive(:assign).with(q.name, 3) + expect_any_instance_of(described_class).to receive(:assign).with(q.name, msg1) + expect_any_instance_of(described_class).to receive(:assign).with(q.name, msg2) + expect_any_instance_of(described_class).to receive(:assign).with(q.name, msg3) subject.send(:dispatch_single_messages, q) end end