Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions lib/shoryuken/middleware/server/non_retryable_exception.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true

module Shoryuken
module Middleware
module Server
# Middleware that handles non-retryable exceptions by deleting messages immediately.
# When a configured exception occurs, the message is deleted instead of being retried.
#
# Configure non-retryable exceptions per worker:
#
# class MyWorker
# include Shoryuken::Worker
#
# shoryuken_options queue: 'my_queue',
# non_retryable_exceptions: [InvalidInputError, RecordNotFoundError]
#
# def perform(sqs_msg, body)
# # ...
# end
# end
class NonRetryableException
include Util

# Processes a message and handles non-retryable exceptions
#
# @param worker [Object] the worker instance
# @param queue [String] the queue name
# @param sqs_msg [Shoryuken::Message, Array<Shoryuken::Message] the message or batch
# @param _body [Object] the parsed message body (unused)
# @yield continues to the next middleware in the chain
# @return [void]
def call(worker, queue, sqs_msg, _body)
yield
rescue => e
non_retryable_exceptions = worker.class.get_shoryuken_options['non_retryable_exceptions']

return raise unless non_retryable_exceptions

exception_classes = Array(non_retryable_exceptions)
return raise unless exception_classes.any? { |klass| e.is_a?(klass) }

# Handle batch messages
messages = sqs_msg.is_a?(Array) ? sqs_msg : [sqs_msg]

logger.warn do
"Non-retryable exception #{e.class} occurred for message(s) #{messages.map(&:message_id).join(', ')}. " \
"Deleting message(s) immediately. Error: #{e.message}"
end

logger.debug { e.backtrace.join("\n") } if e.backtrace

# Delete the message(s) immediately
entries = messages.map.with_index { |message, i| { id: i.to_s, receipt_handle: message.receipt_handle } }
Shoryuken::Client.queues(queue).delete_messages(entries: entries)
Comment on lines +52 to +54
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Consider handling delete_messages failures.

The delete_messages method (from lib/shoryuken/queue.rb:50-59) can fail and returns a truthy value when failures occur. Currently, if the delete fails, the exception is still swallowed, potentially leaving the message in the queue to be retried anyway.

Consider checking the return value or wrapping in a rescue to at least log a warning if deletion fails:

🛡️ Suggested improvement
       # Delete the message(s) immediately
       entries = messages.map.with_index { |message, i| { id: i.to_s, receipt_handle: message.receipt_handle } }
-      Shoryuken::Client.queues(queue).delete_messages(entries: entries)
+      if Shoryuken::Client.queues(queue).delete_messages(entries: entries)
+        logger.error { "Failed to delete some non-retryable message(s) for #{messages.map(&:message_id).join(', ')}" }
+      end

       # Don't re-raise - the exception has been handled by deleting the message
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Delete the message(s) immediately
entries = messages.map.with_index { |message, i| { id: i.to_s, receipt_handle: message.receipt_handle } }
Shoryuken::Client.queues(queue).delete_messages(entries: entries)
# Delete the message(s) immediately
entries = messages.map.with_index { |message, i| { id: i.to_s, receipt_handle: message.receipt_handle } }
if Shoryuken::Client.queues(queue).delete_messages(entries: entries)
logger.error { "Failed to delete some non-retryable message(s) for #{messages.map(&:message_id).join(', ')}" }
end
🤖 Prompt for AI Agents
In `@lib/shoryuken/middleware/server/non_retryable_exception.rb` around lines 52 -
54, Wrap the call to Shoryuken::Client.queues(queue).delete_messages(entries:
entries) in a safe-check: inspect the returned response for failures and log a
warning if deletion reports failures (include queue name, entries and response
details), and also rescue any exceptions around the call to log an error/warning
instead of swallowing; update the code in non_retryable_exception.rb around the
entries creation/delete call to perform these checks and logging so failed
deletes are visible.


# Don't re-raise - the exception has been handled by deleting the message
end
end
end
end
end

1 change: 1 addition & 0 deletions lib/shoryuken/options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ def active_job_queue_name_prefixing?
def default_server_middleware
Middleware::Chain.new do |m|
m.add Middleware::Server::Timing
m.add Middleware::Server::NonRetryableException
m.add Middleware::Server::ExponentialBackoffRetry
m.add Middleware::Server::AutoDelete
m.add Middleware::Server::AutoExtendVisibility
Expand Down
14 changes: 14 additions & 0 deletions lib/shoryuken/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def server_middleware
# @option opts [Boolean] :auto_delete (false) Automatically delete messages after processing
# @option opts [Boolean] :auto_visibility_timeout (false) Automatically extend message visibility
# @option opts [Array<Integer>] :retry_intervals Exponential backoff retry intervals in seconds
# @option opts [Array<Class>] :non_retryable_exceptions Exception classes that should skip retries and delete message immediately
# @option opts [Hash] :sqs Additional SQS client options
#
# @example Basic worker configuration
Expand Down Expand Up @@ -171,6 +172,19 @@ def server_middleware
# complex_processing(body)
# end
# end
#
# @example Worker with non-retryable exceptions
# class ValidationWorker
# include Shoryuken::Worker
# shoryuken_options queue: 'validation_queue',
# non_retryable_exceptions: [InvalidInputError, RecordNotFoundError]
#
# def perform(sqs_msg, body)
# # If InvalidInputError or RecordNotFoundError is raised,
# # the message will be deleted immediately instead of retrying
# validate_and_process(body)
# end
# end
def shoryuken_options(opts = {})
self.shoryuken_options_hash = get_shoryuken_options.merge(stringify_keys(opts || {}))
normalize_worker_queue!
Expand Down
143 changes: 143 additions & 0 deletions spec/lib/shoryuken/middleware/server/non_retryable_exception_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# frozen_string_literal: true

RSpec.describe Shoryuken::Middleware::Server::NonRetryableException do
let(:queue) { 'default' }
let(:sqs_queue) { double Shoryuken::Queue }

def build_message
double Shoryuken::Message,
queue_url: queue,
body: 'test',
message_id: SecureRandom.uuid,
receipt_handle: SecureRandom.uuid
end

let(:sqs_msg) { build_message }

before do
allow(Shoryuken::Client).to receive(:queues).with(queue).and_return(sqs_queue)
end

context 'when non_retryable_exceptions is not configured' do
it 're-raises the exception' do
TestWorker.get_shoryuken_options['non_retryable_exceptions'] = nil

expect(sqs_queue).not_to receive(:delete_messages)

expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise StandardError, 'test error' }
}.to raise_error(StandardError, 'test error')
end
end

context 'when exception is not in non_retryable_exceptions list' do
it 're-raises the exception' do
TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [ArgumentError]

expect(sqs_queue).not_to receive(:delete_messages)

expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise StandardError, 'test error' }
}.to raise_error(StandardError, 'test error')
end
end

context 'when exception is in non_retryable_exceptions list' do
it 'deletes the message and does not re-raise' do
TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [StandardError]

expect(sqs_queue).to receive(:delete_messages).with(entries: [
{ id: '0', receipt_handle: sqs_msg.receipt_handle }
])

expect(Shoryuken.logger).to receive(:warn) do |&block|
expect(block.call).to match(/Non-retryable exception StandardError/)
end

expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise StandardError, 'test error' }
}.not_to raise_error
end

it 'logs the exception backtrace in debug mode' do
TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [StandardError]

error = StandardError.new('test error')
error.set_backtrace(['backtrace line 1', 'backtrace line 2'])

allow(sqs_queue).to receive(:delete_messages)

expect(Shoryuken.logger).to receive(:warn)
expect(Shoryuken.logger).to receive(:debug) do |&block|
expect(block.call).to eq("backtrace line 1\nbacktrace line 2")
end

subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise error }
end

it 'handles multiple exception classes' do
TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [ArgumentError, StandardError]

expect(sqs_queue).to receive(:delete_messages).with(entries: [
{ id: '0', receipt_handle: sqs_msg.receipt_handle }
])

expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise ArgumentError, 'test error' }
}.not_to raise_error
end

it 'handles custom exception classes' do
custom_error = Class.new(StandardError)
TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [custom_error]

expect(sqs_queue).to receive(:delete_messages).with(entries: [
{ id: '0', receipt_handle: sqs_msg.receipt_handle }
])

expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise custom_error, 'test error' }
}.not_to raise_error
end
end

context 'with batch messages' do
it 'deletes all messages in the batch' do
TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [StandardError]

sqs_msg2 = build_message
sqs_msg3 = build_message
sqs_msgs = [sqs_msg, sqs_msg2, sqs_msg3]

expect(sqs_queue).to receive(:delete_messages).with(entries: [
{ id: '0', receipt_handle: sqs_msg.receipt_handle },
{ id: '1', receipt_handle: sqs_msg2.receipt_handle },
{ id: '2', receipt_handle: sqs_msg3.receipt_handle }
])

expect(Shoryuken.logger).to receive(:warn) do |&block|
expect(block.call).to match(/Non-retryable exception StandardError/)
expect(block.call).to match(/#{sqs_msg.message_id}/)
expect(block.call).to match(/#{sqs_msg2.message_id}/)
expect(block.call).to match(/#{sqs_msg3.message_id}/)
end

expect {
subject.call(TestWorker.new, queue, sqs_msgs, [sqs_msg.body, sqs_msg2.body, sqs_msg3.body]) do
raise StandardError, 'test error'
end
}.not_to raise_error
end
end

context 'when no exception occurs' do
it 'does not delete the message' do
TestWorker.get_shoryuken_options['non_retryable_exceptions'] = [StandardError]

expect(sqs_queue).not_to receive(:delete_messages)

subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) {}
end
end
end