Skip to content

Commit 9510cf1

Browse files
committed
add tests for async consumer
1 parent 7c45ec0 commit 9510cf1

File tree

2 files changed

+450
-0
lines changed

2 files changed

+450
-0
lines changed

tests/asyncio/test_consumer.py

Lines changed: 383 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
import pytest
2+
3+
from rabbitmq_amqp_python_client import (
4+
AddressHelper,
5+
ArgumentOutOfRangeException,
6+
AsyncConnection,
7+
AsyncEnvironment,
8+
QuorumQueueSpecification,
9+
)
10+
from rabbitmq_amqp_python_client.utils import Converter
11+
12+
from ..conftest import (
13+
ConsumerTestException,
14+
MyMessageHandlerAccept,
15+
MyMessageHandlerDiscard,
16+
MyMessageHandlerDiscardWithAnnotations,
17+
MyMessageHandlerNoack,
18+
MyMessageHandlerRequeue,
19+
MyMessageHandlerRequeueWithAnnotations,
20+
MyMessageHandlerRequeueWithInvalidAnnotations,
21+
)
22+
from .fixtures import * # noqa: F401, F403
23+
from .utils import (
24+
async_cleanup_dead_lettering,
25+
async_publish_messages,
26+
async_setup_dead_lettering,
27+
)
28+
29+
30+
@pytest.mark.asyncio
31+
async def test_async_consumer_sync_queue_accept(
32+
async_connection: AsyncConnection,
33+
) -> None:
34+
queue_name = "test-queue-sync-accept"
35+
messages_to_send = 100
36+
management = await async_connection.management()
37+
38+
await management.declare_queue(QuorumQueueSpecification(name=queue_name))
39+
40+
consumer = await async_connection.consumer(
41+
destination=AddressHelper.queue_address(queue_name)
42+
)
43+
44+
consumed = 0
45+
46+
# publish messages_to_send messages
47+
await async_publish_messages(async_connection, messages_to_send, queue_name)
48+
49+
# consumer synchronously without handler
50+
for i in range(messages_to_send):
51+
message = await consumer.consume()
52+
if Converter.bytes_to_string(message.body) == "test{}".format(i): # type: ignore
53+
consumed += 1
54+
55+
await consumer.close()
56+
await management.delete_queue(queue_name)
57+
await management.close()
58+
59+
assert consumed == messages_to_send
60+
61+
62+
@pytest.mark.asyncio
63+
async def test_async_consumer_invalid_destination(
64+
async_connection: AsyncConnection,
65+
) -> None:
66+
queue_name = "test-queue-sync-invalid-accept"
67+
consumer = None
68+
69+
with pytest.raises(ArgumentOutOfRangeException):
70+
consumer = await async_connection.consumer(destination="/invalid/" + queue_name)
71+
72+
if consumer is not None:
73+
await consumer.close()
74+
75+
76+
@pytest.mark.asyncio
77+
async def test_async_consumer_async_queue_accept(
78+
async_connection: AsyncConnection, async_environment: AsyncEnvironment
79+
) -> None:
80+
messages_to_send = 1000
81+
queue_name = "test-queue-async-accept"
82+
83+
management = await async_connection.management()
84+
await management.declare_queue(QuorumQueueSpecification(name=queue_name))
85+
86+
await async_publish_messages(async_connection, messages_to_send, queue_name)
87+
88+
# we closed the connection so we need to open a new one
89+
connection_consumer = await async_environment.connection()
90+
await connection_consumer.dial()
91+
consumer = await connection_consumer.consumer(
92+
destination=AddressHelper.queue_address(queue_name),
93+
message_handler=MyMessageHandlerAccept(),
94+
)
95+
96+
try:
97+
await consumer.run()
98+
# ack to terminate the consumer
99+
except ConsumerTestException:
100+
pass
101+
102+
await consumer.close()
103+
104+
message_count = await management.purge_queue(queue_name)
105+
await management.delete_queue(queue_name)
106+
await management.close()
107+
108+
assert message_count == 0
109+
110+
111+
@pytest.mark.asyncio
112+
async def test_async_consumer_async_queue_no_ack(
113+
async_connection: AsyncConnection, async_environment: AsyncEnvironment
114+
) -> None:
115+
messages_to_send = 1000
116+
queue_name = "test-queue-async-no-ack"
117+
118+
management = await async_connection.management()
119+
await management.declare_queue(QuorumQueueSpecification(name=queue_name))
120+
121+
await async_publish_messages(async_connection, messages_to_send, queue_name)
122+
123+
# we closed the connection so we need to open a new one
124+
connection_consumer = await async_environment.connection()
125+
await connection_consumer.dial()
126+
consumer = await connection_consumer.consumer(
127+
destination=AddressHelper.queue_address(queue_name),
128+
message_handler=MyMessageHandlerNoack(),
129+
)
130+
131+
try:
132+
await consumer.run()
133+
# ack to terminate the consumer
134+
except ConsumerTestException:
135+
pass
136+
137+
await consumer.close()
138+
message_count = await management.purge_queue(queue_name)
139+
await management.delete_queue(queue_name)
140+
await management.close()
141+
142+
assert message_count == messages_to_send
143+
144+
145+
@pytest.mark.asyncio
146+
async def test_async_consumer_async_queue_with_discard(
147+
async_connection: AsyncConnection, async_environment: AsyncEnvironment
148+
) -> None:
149+
messages_to_send = 1000
150+
151+
queue_dead_lettering = "queue-dead-letter"
152+
queue_name = "test-queue-async-discard"
153+
exchange_dead_lettering = "exchange-dead-letter"
154+
binding_key = "key-dead-letter"
155+
156+
management = await async_connection.management()
157+
158+
# configuring dead lettering
159+
bind_path = await async_setup_dead_lettering(management)
160+
addr_queue = AddressHelper.queue_address(queue_name)
161+
162+
await management.declare_queue(
163+
QuorumQueueSpecification(
164+
name=queue_name,
165+
dead_letter_exchange=exchange_dead_lettering,
166+
dead_letter_routing_key=binding_key,
167+
)
168+
)
169+
170+
await async_publish_messages(async_connection, messages_to_send, queue_name)
171+
172+
# we closed the connection so we need to open a new one
173+
connection_consumer = await async_environment.connection()
174+
await connection_consumer.dial()
175+
consumer = await connection_consumer.consumer(
176+
destination=addr_queue,
177+
message_handler=MyMessageHandlerDiscard(),
178+
)
179+
180+
try:
181+
await consumer.run()
182+
# ack to terminate the consumer
183+
except ConsumerTestException:
184+
pass
185+
186+
await consumer.close()
187+
188+
message_count = await management.purge_queue(queue_name)
189+
await management.delete_queue(queue_name)
190+
191+
message_count_dead_lettering = await management.purge_queue(queue_dead_lettering)
192+
await async_cleanup_dead_lettering(management, bind_path)
193+
194+
await management.close()
195+
196+
assert message_count == 0
197+
# check dead letter queue
198+
assert message_count_dead_lettering == messages_to_send
199+
200+
201+
@pytest.mark.asyncio
202+
async def test__async_consumer_async_queue_with_discard_with_annotations(
203+
async_connection: AsyncConnection, async_environment: AsyncEnvironment
204+
) -> None:
205+
messages_to_send = 1000
206+
207+
queue_dead_lettering = "queue-dead-letter"
208+
queue_name = "test-queue-async-discard-annotations"
209+
exchange_dead_lettering = "exchange-dead-letter"
210+
binding_key = "key-dead-letter"
211+
212+
management = await async_connection.management()
213+
214+
await management.declare_queue(
215+
QuorumQueueSpecification(
216+
name=queue_name,
217+
dead_letter_exchange=exchange_dead_lettering,
218+
dead_letter_routing_key=binding_key,
219+
)
220+
)
221+
222+
await async_publish_messages(async_connection, messages_to_send, queue_name)
223+
224+
bind_path = await async_setup_dead_lettering(management)
225+
addr_queue = AddressHelper.queue_address(queue_name)
226+
addr_queue_dl = AddressHelper.queue_address(queue_dead_lettering)
227+
228+
# we closed the connection so we need to open a new one
229+
connection_consumer = await async_environment.connection()
230+
await connection_consumer.dial()
231+
consumer = await connection_consumer.consumer(
232+
destination=addr_queue,
233+
message_handler=MyMessageHandlerDiscardWithAnnotations(),
234+
)
235+
236+
try:
237+
await consumer.run()
238+
# ack to terminate the consumer
239+
except ConsumerTestException:
240+
pass
241+
242+
await consumer.close()
243+
244+
# check for added annotation
245+
new_consumer = await async_connection.consumer(addr_queue_dl)
246+
message = await new_consumer.consume()
247+
await new_consumer.close()
248+
249+
message_count = await management.purge_queue(queue_name)
250+
await management.delete_queue(queue_name)
251+
252+
message_count_dead_lettering = await management.purge_queue(queue_dead_lettering)
253+
await async_cleanup_dead_lettering(management, bind_path)
254+
255+
await management.close()
256+
257+
assert "x-opt-string" in message.annotations # type: ignore
258+
assert message_count == 0
259+
# check dead letter queue
260+
assert message_count_dead_lettering == messages_to_send
261+
262+
263+
@pytest.mark.asyncio
264+
async def test_async_consumer_async_queue_with_requeue(
265+
async_connection: AsyncConnection, async_environment: AsyncEnvironment
266+
) -> None:
267+
messages_to_send = 1000
268+
queue_name = "test-queue-async-requeue"
269+
270+
management = await async_connection.management()
271+
await management.declare_queue(QuorumQueueSpecification(name=queue_name))
272+
273+
addr_queue = AddressHelper.queue_address(queue_name)
274+
await async_publish_messages(async_connection, messages_to_send, queue_name)
275+
276+
# we closed the connection so we need to open a new one
277+
connection_consumer = await async_environment.connection()
278+
await connection_consumer.dial()
279+
280+
consumer = await connection_consumer.consumer(
281+
destination=addr_queue,
282+
message_handler=MyMessageHandlerRequeue(),
283+
)
284+
285+
try:
286+
await consumer.run()
287+
# ack to terminate the consumer
288+
except ConsumerTestException:
289+
pass
290+
291+
await consumer.close()
292+
293+
message_count = await management.purge_queue(queue_name)
294+
295+
await management.delete_queue(queue_name)
296+
await management.close()
297+
298+
assert message_count > 0
299+
300+
301+
@pytest.mark.asyncio
302+
async def test_async_consumer_async_queue_with_requeue_with_annotations(
303+
async_connection: AsyncConnection, async_environment: AsyncEnvironment
304+
) -> None:
305+
messages_to_send = 1000
306+
queue_name = "test-queue-async-requeue"
307+
308+
management = await async_connection.management()
309+
await management.declare_queue(QuorumQueueSpecification(name=queue_name))
310+
311+
addr_queue = AddressHelper.queue_address(queue_name)
312+
await async_publish_messages(async_connection, messages_to_send, queue_name)
313+
314+
# we closed the connection so we need to open a new one
315+
connection_consumer = await async_environment.connection()
316+
await connection_consumer.dial()
317+
318+
consumer = await connection_consumer.consumer(
319+
destination=addr_queue,
320+
message_handler=MyMessageHandlerRequeueWithAnnotations(),
321+
)
322+
323+
try:
324+
await consumer.run()
325+
# ack to terminate the consumer
326+
except ConsumerTestException:
327+
pass
328+
329+
await consumer.close()
330+
331+
# check for added annotation
332+
new_consumer = await async_connection.consumer(addr_queue)
333+
message = await new_consumer.consume()
334+
await new_consumer.close()
335+
336+
message_count = await management.purge_queue(queue_name)
337+
338+
await management.delete_queue(queue_name)
339+
await management.close()
340+
341+
assert "x-opt-string" in message.annotations # type: ignore
342+
assert message_count > 0
343+
344+
345+
@pytest.mark.asyncio
346+
async def test_async_consumer_async_queue_with_requeue_with_invalid_annotations(
347+
async_connection: AsyncConnection, async_environment: AsyncEnvironment
348+
) -> None:
349+
messages_to_send = 1000
350+
test_failure = True
351+
queue_name = "test-queue-async-requeue"
352+
353+
management = await async_connection.management()
354+
await management.declare_queue(QuorumQueueSpecification(name=queue_name))
355+
356+
addr_queue = AddressHelper.queue_address(queue_name)
357+
await async_publish_messages(async_connection, messages_to_send, queue_name)
358+
359+
# we closed the connection so we need to open a new one
360+
connection_consumer = await async_environment.connection()
361+
await connection_consumer.dial()
362+
363+
consumer = None
364+
try:
365+
consumer = await connection_consumer.consumer(
366+
destination=addr_queue,
367+
message_handler=MyMessageHandlerRequeueWithInvalidAnnotations(),
368+
)
369+
370+
await consumer.run()
371+
# ack to terminate the consumer
372+
except ConsumerTestException:
373+
pass
374+
except ArgumentOutOfRangeException:
375+
test_failure = False
376+
377+
if consumer is not None:
378+
await consumer.close()
379+
380+
await management.delete_queue(queue_name)
381+
await management.close()
382+
383+
assert test_failure is False

0 commit comments

Comments
 (0)