Skip to content

Commit 2eb1a96

Browse files
author
DanielePalaia
committed
adding dead lettering test
1 parent 4512408 commit 2eb1a96

File tree

5 files changed

+81
-33
lines changed

5 files changed

+81
-33
lines changed

examples/getting_started/main.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,22 @@ def __init__(self):
2222
def on_message(self, event: Event):
2323
print("received message: " + str(event.message.annotations))
2424

25+
# accepting
26+
MessageAck.accept(event)
27+
28+
# in case of rejection (+eventually deadlettering)
29+
# MessageAck.discard(event)
30+
31+
# in case of requeuing
32+
# MessageAck.requeue(event)
33+
2534
# annotations = {}
2635
# annotations[symbol('x-opt-string')] = 'x-test1'
36+
# in case of requeuing with annotations added
2737
# MessageAck.requeue_with_annotations(event, annotations)
2838

29-
MessageAck.accept(event)
39+
# in case of rejection with annotations added
40+
# MessageAck.discard_with_annotations(event)
3041

3142
print("count " + str(self._count))
3243

@@ -67,9 +78,8 @@ def main() -> None:
6778
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
6879

6980
management.declare_queue(
70-
QuorumQueueSpecification(
71-
name=queue_name, dead_letter_exchange="dead-letter-test"
72-
)
81+
QuorumQueueSpecification(name=queue_name)
82+
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
7383
)
7484

7585
print("binding queue to exchange")

rabbitmq_amqp_python_client/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from importlib import metadata
22

33
from .address_helper import exchange_address, queue_address
4-
from .common import QueueType
4+
from .common import ExchangeType, QueueType
55
from .connection import Connection
66
from .consumer import Consumer
77
from .entities import (
@@ -50,4 +50,5 @@
5050
"Delivery",
5151
"MessageAck",
5252
"symbol",
53+
"ExchangeType",
5354
]

tests/conftest.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
Event,
66
MessageAck,
77
MessagingHandler,
8+
queue_address,
89
symbol,
910
)
1011

@@ -33,6 +34,21 @@ def management(pytestconfig):
3334
connection.close()
3435

3536

37+
@pytest.fixture()
38+
def consumer(pytestconfig):
39+
connection = Connection("amqp://guest:guest@localhost:5672/")
40+
connection.dial()
41+
try:
42+
queue_name = "test-queue"
43+
addr_queue = queue_address(queue_name)
44+
consumer = connection.consumer(addr_queue)
45+
yield consumer
46+
47+
finally:
48+
consumer.close()
49+
connection.close()
50+
51+
3652
class ConsumerTestException(BaseException):
3753
# Constructor or Initializer
3854
def __init__(self, msg: str):
@@ -67,17 +83,9 @@ def on_message(self, event: Event):
6783
if self._received == 1000:
6884
event.receiver.close()
6985
event.connection.close()
86+
# Workaround to terminate the Consumer and notify the test when all messages are consumed
7087
raise ConsumerTestException("consumed")
7188

72-
def on_connection_closed(self, event: Event):
73-
print("connection closed")
74-
75-
def on_link_closed(self, event: Event) -> None:
76-
print("link closed")
77-
78-
def on_rejected(self, event: Event) -> None:
79-
print("rejected")
80-
8189

8290
class MyMessageHandlerDiscard(MessagingHandler):
8391

@@ -107,13 +115,6 @@ def on_message(self, event: Event):
107115
raise ConsumerTestException("consumed")
108116

109117

110-
def create_connection() -> Connection:
111-
connection_consumer = Connection("amqp://guest:guest@localhost:5672/")
112-
connection_consumer.dial()
113-
114-
return connection_consumer
115-
116-
117118
class MyMessageHandlerRequeueWithAnnotations(MessagingHandler):
118119

119120
def __init__(self):

tests/test_consumer.py

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
from rabbitmq_amqp_python_client import (
2+
BindingSpecification,
23
Connection,
4+
ExchangeSpecification,
5+
ExchangeType,
36
Message,
47
QuorumQueueSpecification,
58
queue_address,
@@ -16,9 +19,9 @@
1619
from .utils import create_connection
1720

1821

19-
def test_consumer_sync_queue_ack(connection: Connection) -> None:
22+
def test_consumer_sync_queue_accept(connection: Connection) -> None:
2023

21-
queue_name = "test-queue-ack"
24+
queue_name = "test-queue-sync-accept"
2225
messages_to_send = 100
2326
management = connection.management()
2427

@@ -31,21 +34,22 @@ def test_consumer_sync_queue_ack(connection: Connection) -> None:
3134

3235
consumed = 0
3336

34-
# publish 10 messages
37+
# publish messages_to_send messages
3538
for i in range(messages_to_send):
3639
publisher.publish(Message(body="test" + str(i)))
3740

41+
publisher.close()
42+
3843
# consumer synchronously without handler
3944
for i in range(messages_to_send):
4045
message = consumer.consume()
4146
if message.body == "test" + str(i):
4247
consumed = consumed + 1
4348

44-
assert consumed > 0
45-
46-
publisher.close()
4749
consumer.close()
4850

51+
assert consumed > 0
52+
4953
management.delete_queue(queue_name)
5054
management.close()
5155

@@ -91,11 +95,11 @@ def test_consumer_async_queue_accept(connection: Connection) -> None:
9195
assert message_count == 0
9296

9397

94-
def test_consumer_async_queue_noack(connection: Connection) -> None:
98+
def test_consumer_async_queue_no_ack(connection: Connection) -> None:
9599

96100
messages_to_send = 1000
97101

98-
queue_name = "test-queue_async_noack"
102+
queue_name = "test-queue_async_no_ack"
99103

100104
management = connection.management()
101105

@@ -136,11 +140,37 @@ def test_consumer_async_queue_noack(connection: Connection) -> None:
136140
def test_consumer_async_queue_with_discard(connection: Connection) -> None:
137141
messages_to_send = 1000
138142

143+
exchange_dead_lettering = "exchange-dead-letter"
144+
queue_dead_lettering = "queue-dead-letter"
139145
queue_name = "test-queue_async_discard"
146+
binding_key = "key_dead_letter"
140147

141148
management = connection.management()
142149

143-
management.declare_queue(QuorumQueueSpecification(name=queue_name))
150+
# configuring dead lettering
151+
management.declare_exchange(
152+
ExchangeSpecification(
153+
name=exchange_dead_lettering,
154+
exchange_type=ExchangeType.fanout,
155+
arguments={},
156+
)
157+
)
158+
management.declare_queue(QuorumQueueSpecification(name=queue_dead_lettering))
159+
management.bind(
160+
BindingSpecification(
161+
source_exchange=exchange_dead_lettering,
162+
destination_queue=queue_dead_lettering,
163+
binding_key=binding_key,
164+
)
165+
)
166+
167+
management.declare_queue(
168+
QuorumQueueSpecification(
169+
name=queue_name,
170+
dead_letter_exchange=exchange_dead_lettering,
171+
dead_letter_routing_key=binding_key,
172+
)
173+
)
144174

145175
addr_queue = queue_address(queue_name)
146176

@@ -171,9 +201,15 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None:
171201

172202
management.delete_queue(queue_name)
173203

204+
message_count_dead_lettering = management.purge_queue(queue_dead_lettering)
205+
206+
management.delete_queue(queue_dead_lettering)
207+
174208
management.close()
175209

176210
assert message_count == 0
211+
# check dead letter queue
212+
assert message_count_dead_lettering == messages_to_send
177213

178214

179215
def test_consumer_async_queue_with_requeue(connection: Connection) -> None:

tests/test_publisher.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import time
2+
13
from rabbitmq_amqp_python_client import (
24
BindingSpecification,
35
Connection,
@@ -69,7 +71,6 @@ def test_publish_exchange(connection: Connection) -> None:
6971
management.close()
7072

7173

72-
"""
7374
def test_publish_purge(connection: Connection) -> None:
7475
messages_to_publish = 20
7576
connection = Connection("amqp://guest:guest@localhost:5672/")
@@ -98,7 +99,7 @@ def test_publish_purge(connection: Connection) -> None:
9899
while purged_messages != messages_to_publish:
99100
purged_messages = management.purge_queue(queue_name)
100101
time.sleep(1)
101-
attempt = attempt+1
102+
attempt = attempt + 1
102103
if attempt > 60:
103104
break
104105

@@ -109,4 +110,3 @@ def test_publish_purge(connection: Connection) -> None:
109110

110111
management.delete_queue(queue_name)
111112
management.close()
112-
"""

0 commit comments

Comments
 (0)