diff --git a/lib/shoryuken/middleware/server/non_retryable_exception.rb b/lib/shoryuken/middleware/server/non_retryable_exception.rb new file mode 100644 index 00000000..c80e9bf0 --- /dev/null +++ b/lib/shoryuken/middleware/server/non_retryable_exception.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +module Shoryuken + module Middleware + module Server + # Middleware that handles non-retryable exceptions by deleting messages immediately. + # When a configured exception occurs, the message is deleted instead of being retried. + # + # Configure non-retryable exceptions per worker: + # + # class MyWorker + # include Shoryuken::Worker + # + # shoryuken_options queue: 'my_queue', + # non_retryable_exceptions: [InvalidInputError, RecordNotFoundError] + # + # def perform(sqs_msg, body) + # # ... + # end + # end + class NonRetryableException + include Util + + # Processes a message and handles non-retryable exceptions + # + # @param worker [Object] the worker instance + # @param queue [String] the queue name + # @param sqs_msg [Shoryuken::Message, Array e + non_retryable_exceptions = worker.class.get_shoryuken_options['non_retryable_exceptions'] + + return raise unless non_retryable_exceptions + + exception_classes = Array(non_retryable_exceptions) + return raise unless exception_classes.any? { |klass| e.is_a?(klass) } + + # Handle batch messages + messages = sqs_msg.is_a?(Array) ? sqs_msg : [sqs_msg] + + logger.warn do + "Non-retryable exception #{e.class} occurred for message(s) #{messages.map(&:message_id).join(', ')}. " \ + "Deleting message(s) immediately. Error: #{e.message}" + end + + logger.debug { e.backtrace.join("\n") } if e.backtrace + + # Delete the message(s) immediately + entries = messages.map.with_index { |message, i| { id: i.to_s, receipt_handle: message.receipt_handle } } + Shoryuken::Client.queues(queue).delete_messages(entries: entries) + + # Don't re-raise - the exception has been handled by deleting the message + end + end + end + end +end + diff --git a/lib/shoryuken/options.rb b/lib/shoryuken/options.rb index 18a855c1..fe7d68d7 100644 --- a/lib/shoryuken/options.rb +++ b/lib/shoryuken/options.rb @@ -302,6 +302,7 @@ def active_job_queue_name_prefixing? def default_server_middleware Middleware::Chain.new do |m| m.add Middleware::Server::Timing + m.add Middleware::Server::NonRetryableException m.add Middleware::Server::ExponentialBackoffRetry m.add Middleware::Server::AutoDelete m.add Middleware::Server::AutoExtendVisibility diff --git a/lib/shoryuken/worker.rb b/lib/shoryuken/worker.rb index 17689a7b..d245b42a 100644 --- a/lib/shoryuken/worker.rb +++ b/lib/shoryuken/worker.rb @@ -123,6 +123,7 @@ def server_middleware # @option opts [Boolean] :auto_delete (false) Automatically delete messages after processing # @option opts [Boolean] :auto_visibility_timeout (false) Automatically extend message visibility # @option opts [Array] :retry_intervals Exponential backoff retry intervals in seconds + # @option opts [Array] :non_retryable_exceptions Exception classes that should skip retries and delete message immediately # @option opts [Hash] :sqs Additional SQS client options # # @example Basic worker configuration @@ -171,6 +172,19 @@ def server_middleware # complex_processing(body) # end # end + # + # @example Worker with non-retryable exceptions + # class ValidationWorker + # include Shoryuken::Worker + # shoryuken_options queue: 'validation_queue', + # non_retryable_exceptions: [InvalidInputError, RecordNotFoundError] + # + # def perform(sqs_msg, body) + # # If InvalidInputError or RecordNotFoundError is raised, + # # the message will be deleted immediately instead of retrying + # validate_and_process(body) + # end + # end def shoryuken_options(opts = {}) self.shoryuken_options_hash = get_shoryuken_options.merge(stringify_keys(opts || {})) normalize_worker_queue! diff --git a/spec/lib/shoryuken/middleware/server/non_retryable_exception_spec.rb b/spec/lib/shoryuken/middleware/server/non_retryable_exception_spec.rb new file mode 100644 index 00000000..82ec9dfb --- /dev/null +++ b/spec/lib/shoryuken/middleware/server/non_retryable_exception_spec.rb @@ -0,0 +1,143 @@ +# frozen_string_literal: true + +RSpec.describe Shoryuken::Middleware::Server::NonRetryableException do + let(:queue) { 'default' } + let(:sqs_queue) { double Shoryuken::Queue } + + def build_message + double Shoryuken::Message, + queue_url: queue, + body: 'test', + message_id: SecureRandom.uuid, + receipt_handle: SecureRandom.uuid + end + + let(:sqs_msg) { build_message } + + before do + allow(Shoryuken::Client).to receive(:queues).with(queue).and_return(sqs_queue) + end + + context 'when non_retryable_exceptions is not configured' do + it 're-raises the exception' do + TestWorker.get_shoryuken_options['non_retryable_exceptions'] = nil + + expect(sqs_queue).not_to receive(:delete_messages) + + expect { + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise StandardError, 'test error' } + }.to raise_error(StandardError, 'test error') + end + end + + context 'when exception is not in non_retryable_exceptions list' do + it 're-raises the exception' do + TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [ArgumentError] + + expect(sqs_queue).not_to receive(:delete_messages) + + expect { + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise StandardError, 'test error' } + }.to raise_error(StandardError, 'test error') + end + end + + context 'when exception is in non_retryable_exceptions list' do + it 'deletes the message and does not re-raise' do + TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [StandardError] + + expect(sqs_queue).to receive(:delete_messages).with(entries: [ + { id: '0', receipt_handle: sqs_msg.receipt_handle } + ]) + + expect(Shoryuken.logger).to receive(:warn) do |&block| + expect(block.call).to match(/Non-retryable exception StandardError/) + end + + expect { + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise StandardError, 'test error' } + }.not_to raise_error + end + + it 'logs the exception backtrace in debug mode' do + TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [StandardError] + + error = StandardError.new('test error') + error.set_backtrace(['backtrace line 1', 'backtrace line 2']) + + allow(sqs_queue).to receive(:delete_messages) + + expect(Shoryuken.logger).to receive(:warn) + expect(Shoryuken.logger).to receive(:debug) do |&block| + expect(block.call).to eq("backtrace line 1\nbacktrace line 2") + end + + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise error } + end + + it 'handles multiple exception classes' do + TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [ArgumentError, StandardError] + + expect(sqs_queue).to receive(:delete_messages).with(entries: [ + { id: '0', receipt_handle: sqs_msg.receipt_handle } + ]) + + expect { + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise ArgumentError, 'test error' } + }.not_to raise_error + end + + it 'handles custom exception classes' do + custom_error = Class.new(StandardError) + TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [custom_error] + + expect(sqs_queue).to receive(:delete_messages).with(entries: [ + { id: '0', receipt_handle: sqs_msg.receipt_handle } + ]) + + expect { + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise custom_error, 'test error' } + }.not_to raise_error + end + end + + context 'with batch messages' do + it 'deletes all messages in the batch' do + TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [StandardError] + + sqs_msg2 = build_message + sqs_msg3 = build_message + sqs_msgs = [sqs_msg, sqs_msg2, sqs_msg3] + + expect(sqs_queue).to receive(:delete_messages).with(entries: [ + { id: '0', receipt_handle: sqs_msg.receipt_handle }, + { id: '1', receipt_handle: sqs_msg2.receipt_handle }, + { id: '2', receipt_handle: sqs_msg3.receipt_handle } + ]) + + expect(Shoryuken.logger).to receive(:warn) do |&block| + expect(block.call).to match(/Non-retryable exception StandardError/) + expect(block.call).to match(/#{sqs_msg.message_id}/) + expect(block.call).to match(/#{sqs_msg2.message_id}/) + expect(block.call).to match(/#{sqs_msg3.message_id}/) + end + + expect { + subject.call(TestWorker.new, queue, sqs_msgs, [sqs_msg.body, sqs_msg2.body, sqs_msg3.body]) do + raise StandardError, 'test error' + end + }.not_to raise_error + end + end + + context 'when no exception occurs' do + it 'does not delete the message' do + TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [StandardError] + + expect(sqs_queue).not_to receive(:delete_messages) + + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) {} + end + end +end +