Skip to content

Commit bc3e913

Browse files
authored
Add DLQ and ActiveRecord middleware integration specs (#919)
Add 2 new integration specs to improve test coverage: - dead_letter_queue: Tests DLQ redrive policy with maxReceiveCount - active_record_middleware: Tests ActiveRecord connection clearing Ref #787
1 parent b108ae4 commit bc3e913

File tree

2 files changed

+175
-0
lines changed

2 files changed

+175
-0
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# frozen_string_literal: true
2+
3+
# This spec tests the ActiveRecord middleware functionality.
4+
# The middleware clears database connections after each message is processed.
5+
6+
setup_localstack
7+
8+
queue_name = DT.queue
9+
create_test_queue(queue_name)
10+
Shoryuken.add_group('default', 1)
11+
Shoryuken.add_queue(queue_name, 1, 'default')
12+
13+
# Mock ActiveRecord module to track connection clearing
14+
module ActiveRecord
15+
VERSION = Gem::Version.new('7.2.0')
16+
17+
def self.version
18+
VERSION
19+
end
20+
21+
class Base
22+
class << self
23+
attr_accessor :connections_cleared
24+
25+
def connection_handler
26+
@connection_handler ||= ConnectionHandler.new
27+
end
28+
end
29+
30+
self.connections_cleared = []
31+
end
32+
33+
class ConnectionHandler
34+
def clear_active_connections!(scope)
35+
ActiveRecord::Base.connections_cleared << { scope: scope, time: Time.now }
36+
end
37+
end
38+
end
39+
40+
# Add the ActiveRecord middleware to the chain
41+
require 'shoryuken/middleware/server/active_record'
42+
Shoryuken.configure_server do |config|
43+
config.server_middleware do |chain|
44+
chain.add Shoryuken::Middleware::Server::ActiveRecord
45+
end
46+
end
47+
48+
worker_class = Class.new do
49+
include Shoryuken::Worker
50+
51+
shoryuken_options auto_delete: true, batch: false
52+
53+
def perform(sqs_msg, body)
54+
DT[:processed] << { message_id: sqs_msg.message_id, body: body }
55+
end
56+
end
57+
58+
worker_class.get_shoryuken_options['queue'] = queue_name
59+
Shoryuken.register_worker(queue_name, worker_class)
60+
61+
# Clear any prior connection clearing records
62+
ActiveRecord::Base.connections_cleared.clear
63+
64+
# Send multiple messages
65+
3.times { |i| Shoryuken::Client.queues(queue_name).send_message(message_body: "ar-test-#{i}") }
66+
67+
sleep 1
68+
69+
poll_queues_until { DT[:processed].size >= 3 }
70+
71+
# Verify all messages were processed
72+
assert_equal(3, DT[:processed].size)
73+
74+
# Verify ActiveRecord connections were cleared after each message
75+
# The middleware should have called clear_active_connections! for each message
76+
assert(
77+
ActiveRecord::Base.connections_cleared.size >= 3,
78+
"ActiveRecord connections should be cleared after each message (cleared #{ActiveRecord::Base.connections_cleared.size} times)"
79+
)
80+
81+
# Verify the :all scope was used (Rails 7.1+ behavior)
82+
ActiveRecord::Base.connections_cleared.each do |record|
83+
assert_equal(:all, record[:scope], 'Should use :all scope for Rails 7.1+')
84+
end
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# frozen_string_literal: true
2+
3+
# This spec tests dead letter queue (DLQ) functionality.
4+
# When a message exceeds maxReceiveCount, it should be moved to the DLQ.
5+
# Note: This test doesn't use poll_queues_until because messages should fail
6+
# and be moved to DLQ rather than being successfully processed.
7+
8+
setup_localstack
9+
10+
main_queue_name = DT.queues[0]
11+
dlq_name = DT.queues[1]
12+
13+
# Create the dead letter queue first
14+
create_test_queue(dlq_name)
15+
16+
dlq_url = Shoryuken::Client.sqs.get_queue_url(queue_name: dlq_name).queue_url
17+
dlq_arn = Shoryuken::Client.sqs.get_queue_attributes(
18+
queue_url: dlq_url,
19+
attribute_names: ['QueueArn']
20+
).attributes['QueueArn']
21+
22+
# Create main queue with redrive policy - move to DLQ after 2 receives
23+
redrive_policy = { maxReceiveCount: 2, deadLetterTargetArn: dlq_arn }.to_json
24+
create_test_queue(main_queue_name, attributes: {
25+
'VisibilityTimeout' => '1',
26+
'RedrivePolicy' => redrive_policy
27+
})
28+
29+
main_queue_url = Shoryuken::Client.sqs.get_queue_url(queue_name: main_queue_name).queue_url
30+
31+
# Send a message
32+
Shoryuken::Client.sqs.send_message(
33+
queue_url: main_queue_url,
34+
message_body: 'dlq test message'
35+
)
36+
37+
# Manually receive the message multiple times to trigger DLQ
38+
# maxReceiveCount = 2, so after 2 receives without deletion, it goes to DLQ
39+
3.times do |i|
40+
msgs = Shoryuken::Client.sqs.receive_message(
41+
queue_url: main_queue_url,
42+
max_number_of_messages: 1,
43+
wait_time_seconds: 3,
44+
attribute_names: ['ApproximateReceiveCount']
45+
).messages
46+
47+
if msgs.any?
48+
receive_count = msgs.first.attributes['ApproximateReceiveCount'].to_i
49+
DT[:receives] << { attempt: i + 1, receive_count: receive_count }
50+
# Don't delete - let visibility timeout expire
51+
sleep 2
52+
else
53+
DT[:receives] << { attempt: i + 1, no_message: true }
54+
break
55+
end
56+
end
57+
58+
# Verify message was received at least twice
59+
actual_receives = DT[:receives].reject { |r| r[:no_message] }
60+
assert(actual_receives.size >= 2, "Message should have been received at least twice (was #{actual_receives.size})")
61+
62+
# Wait for message to be moved to DLQ
63+
sleep 3
64+
65+
# Check that message is now in the DLQ
66+
dlq_messages = Shoryuken::Client.sqs.receive_message(
67+
queue_url: dlq_url,
68+
max_number_of_messages: 10,
69+
wait_time_seconds: 5,
70+
attribute_names: ['All']
71+
).messages
72+
73+
assert(dlq_messages.size >= 1, 'Message should have been moved to DLQ')
74+
assert_equal('dlq test message', dlq_messages.first.body)
75+
76+
# Verify message is no longer in main queue
77+
main_attrs = Shoryuken::Client.sqs.get_queue_attributes(
78+
queue_url: main_queue_url,
79+
attribute_names: %w[ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible]
80+
).attributes
81+
main_count = main_attrs['ApproximateNumberOfMessages'].to_i +
82+
main_attrs['ApproximateNumberOfMessagesNotVisible'].to_i
83+
assert_equal(0, main_count, 'Main queue should be empty after DLQ move')
84+
85+
# Clean up DLQ message
86+
dlq_messages.each do |msg|
87+
Shoryuken::Client.sqs.delete_message(
88+
queue_url: dlq_url,
89+
receipt_handle: msg.receipt_handle
90+
)
91+
end

0 commit comments

Comments
 (0)