Skip to content

Commit 148c436

Browse files
author
DanielePalaia
committed
adding discard with annotations test
1 parent 2eb1a96 commit 148c436

File tree

2 files changed

+107
-2
lines changed

2 files changed

+107
-2
lines changed

tests/conftest.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,22 @@ def on_message(self, event: Event):
101101
raise ConsumerTestException("consumed")
102102

103103

104+
class MyMessageHandlerDiscardWithAnnotations(MessagingHandler):
105+
106+
def __init__(self):
107+
super().__init__(auto_accept=False, auto_settle=False)
108+
self._received = 0
109+
110+
def on_message(self, event: Event):
111+
annotations = {}
112+
annotations[symbol("x-opt-string")] = "x-test1"
113+
MessageAck.discard_with_annotations(event, annotations)
114+
self._received = self._received + 1
115+
if self._received == 1000:
116+
event.connection.close()
117+
raise ConsumerTestException("consumed")
118+
119+
104120
class MyMessageHandlerRequeue(MessagingHandler):
105121

106122
def __init__(self):
@@ -125,7 +141,6 @@ def on_message(self, event: Event):
125141
annotations = {}
126142
annotations[symbol("x-opt-string")] = "x-test1"
127143
MessageAck.requeue_with_annotations(event, annotations)
128-
MessageAck.requeue_with_annotations(event, annotations)
129144
self._received = self._received + 1
130145
if self._received == 1000:
131146
event.connection.close()

tests/test_consumer.py

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
ConsumerTestException,
1313
MyMessageHandlerAccept,
1414
MyMessageHandlerDiscard,
15+
MyMessageHandlerDiscardWithAnnotations,
1516
MyMessageHandlerNoack,
1617
MyMessageHandlerRequeue,
1718
MyMessageHandlerRequeueWithAnnotations,
@@ -156,7 +157,7 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None:
156157
)
157158
)
158159
management.declare_queue(QuorumQueueSpecification(name=queue_dead_lettering))
159-
management.bind(
160+
bind_path = management.bind(
160161
BindingSpecification(
161162
source_exchange=exchange_dead_lettering,
162163
destination_queue=queue_dead_lettering,
@@ -203,6 +204,95 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None:
203204

204205
message_count_dead_lettering = management.purge_queue(queue_dead_lettering)
205206

207+
management.unbind(bind_path)
208+
management.delete_exchange(exchange_dead_lettering)
209+
management.delete_queue(queue_dead_lettering)
210+
211+
management.close()
212+
213+
assert message_count == 0
214+
# check dead letter queue
215+
assert message_count_dead_lettering == messages_to_send
216+
217+
218+
def test_consumer_async_queue_with_discard_with_annotations(
219+
connection: Connection,
220+
) -> None:
221+
messages_to_send = 1000
222+
223+
exchange_dead_lettering = "exchange-dead-letter"
224+
queue_dead_lettering = "queue-dead-letter"
225+
queue_name = "test-queue_async_discard"
226+
binding_key = "key_dead_letter"
227+
228+
management = connection.management()
229+
230+
# configuring dead lettering
231+
management.declare_exchange(
232+
ExchangeSpecification(
233+
name=exchange_dead_lettering,
234+
exchange_type=ExchangeType.fanout,
235+
arguments={},
236+
)
237+
)
238+
management.declare_queue(QuorumQueueSpecification(name=queue_dead_lettering))
239+
bind_path = management.bind(
240+
BindingSpecification(
241+
source_exchange=exchange_dead_lettering,
242+
destination_queue=queue_dead_lettering,
243+
binding_key=binding_key,
244+
)
245+
)
246+
247+
management.declare_queue(
248+
QuorumQueueSpecification(
249+
name=queue_name,
250+
dead_letter_exchange=exchange_dead_lettering,
251+
dead_letter_routing_key=binding_key,
252+
)
253+
)
254+
255+
addr_queue = queue_address(queue_name)
256+
addr_queue_dl = queue_address(queue_dead_lettering)
257+
258+
publisher = connection.publisher("/queues/" + queue_name)
259+
260+
# publish messages_to_send messages
261+
for i in range(messages_to_send):
262+
publisher.publish(Message(body="test" + str(i)))
263+
publisher.close()
264+
265+
# workaround: it looks like when the consumer finish to consume invalidate the connection
266+
# so for the moment we need to use one dedicated
267+
connection_consumer = create_connection()
268+
269+
consumer = connection_consumer.consumer(
270+
addr_queue, handler=MyMessageHandlerDiscardWithAnnotations()
271+
)
272+
273+
try:
274+
consumer.run()
275+
# ack to terminate the consumer
276+
except ConsumerTestException:
277+
pass
278+
279+
consumer.close()
280+
281+
# check for added annotation
282+
new_consumer = connection.consumer(addr_queue_dl)
283+
message = new_consumer.consume()
284+
new_consumer.close()
285+
286+
assert "x-opt-string" in message.annotations
287+
288+
message_count = management.purge_queue(queue_name)
289+
290+
management.delete_queue(queue_name)
291+
292+
message_count_dead_lettering = management.purge_queue(queue_dead_lettering)
293+
294+
management.unbind(bind_path)
295+
management.delete_exchange(exchange_dead_lettering)
206296
management.delete_queue(queue_dead_lettering)
207297

208298
management.close()

0 commit comments

Comments
 (0)