11from rabbitmq_amqp_python_client import (
2- BindingSpecification ,
32 Connection ,
4- ExchangeSpecification ,
5- ExchangeType ,
6- Message ,
73 QuorumQueueSpecification ,
84 queue_address ,
95)
1713 MyMessageHandlerRequeue ,
1814 MyMessageHandlerRequeueWithAnnotations ,
1915)
20- from .utils import create_connection
16+ from .utils import (
17+ cleanup_dead_lettering ,
18+ create_connection ,
19+ publish_messages ,
20+ setup_dead_lettering ,
21+ )
2122
2223
2324def test_consumer_sync_queue_accept (connection : Connection ) -> None :
@@ -29,17 +30,12 @@ def test_consumer_sync_queue_accept(connection: Connection) -> None:
2930 management .declare_queue (QuorumQueueSpecification (name = queue_name ))
3031
3132 addr_queue = queue_address (queue_name )
32-
33- publisher = connection .publisher ("/queues/" + queue_name )
3433 consumer = connection .consumer (addr_queue )
3534
3635 consumed = 0
3736
3837 # publish messages_to_send messages
39- for i in range (messages_to_send ):
40- publisher .publish (Message (body = "test" + str (i )))
41-
42- publisher .close ()
38+ publish_messages (connection , messages_to_send , queue_name )
4339
4440 # consumer synchronously without handler
4541 for i in range (messages_to_send ):
@@ -49,30 +45,25 @@ def test_consumer_sync_queue_accept(connection: Connection) -> None:
4945
5046 consumer .close ()
5147
52- assert consumed > 0
53-
5448 management .delete_queue (queue_name )
5549 management .close ()
5650
51+ assert consumed > 0
52+
5753
5854def test_consumer_async_queue_accept (connection : Connection ) -> None :
5955
6056 messages_to_send = 1000
6157
62- queue_name = "test-queue_async_accept "
58+ queue_name = "test-queue-async-accept "
6359
6460 management = connection .management ()
6561
6662 management .declare_queue (QuorumQueueSpecification (name = queue_name ))
6763
6864 addr_queue = queue_address (queue_name )
6965
70- publisher = connection .publisher ("/queues/" + queue_name )
71-
72- # publish messages_to_send messages
73- for i in range (messages_to_send ):
74- publisher .publish (Message (body = "test" + str (i )))
75- publisher .close ()
66+ publish_messages (connection , messages_to_send , queue_name )
7667
7768 # workaround: it looks like when the consumer finish to consume invalidate the connection
7869 # so for the moment we need to use one dedicated
@@ -91,6 +82,8 @@ def test_consumer_async_queue_accept(connection: Connection) -> None:
9182
9283 message_count = management .purge_queue (queue_name )
9384
85+ management .delete_queue (queue_name )
86+
9487 management .close ()
9588
9689 assert message_count == 0
@@ -100,20 +93,15 @@ def test_consumer_async_queue_no_ack(connection: Connection) -> None:
10093
10194 messages_to_send = 1000
10295
103- queue_name = "test-queue_async_no_ack "
96+ queue_name = "test-queue-async-no-ack "
10497
10598 management = connection .management ()
10699
107100 management .declare_queue (QuorumQueueSpecification (name = queue_name ))
108101
109102 addr_queue = queue_address (queue_name )
110103
111- publisher = connection .publisher ("/queues/" + queue_name )
112-
113- # publish messages_to_send messages
114- for i in range (messages_to_send ):
115- publisher .publish (Message (body = "test" + str (i )))
116- publisher .close ()
104+ publish_messages (connection , messages_to_send , queue_name )
117105
118106 # workaround: it looks like when the consumer finish to consume invalidate the connection
119107 # so for the moment we need to use one dedicated
@@ -141,29 +129,16 @@ def test_consumer_async_queue_no_ack(connection: Connection) -> None:
141129def test_consumer_async_queue_with_discard (connection : Connection ) -> None :
142130 messages_to_send = 1000
143131
144- exchange_dead_lettering = "exchange-dead-letter"
145132 queue_dead_lettering = "queue-dead-letter"
146- queue_name = "test-queue_async_discard"
147- binding_key = "key_dead_letter"
133+ queue_name = "test-queue-async-discard"
134+ exchange_dead_lettering = "exchange-dead-letter"
135+ binding_key = "key-dead-letter"
148136
149137 management = connection .management ()
150138
151139 # configuring dead lettering
152- management .declare_exchange (
153- ExchangeSpecification (
154- name = exchange_dead_lettering ,
155- exchange_type = ExchangeType .fanout ,
156- arguments = {},
157- )
158- )
159- management .declare_queue (QuorumQueueSpecification (name = queue_dead_lettering ))
160- bind_path = management .bind (
161- BindingSpecification (
162- source_exchange = exchange_dead_lettering ,
163- destination_queue = queue_dead_lettering ,
164- binding_key = binding_key ,
165- )
166- )
140+ bind_path = setup_dead_lettering (management )
141+ addr_queue = queue_address (queue_name )
167142
168143 management .declare_queue (
169144 QuorumQueueSpecification (
@@ -173,14 +148,7 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None:
173148 )
174149 )
175150
176- addr_queue = queue_address (queue_name )
177-
178- publisher = connection .publisher ("/queues/" + queue_name )
179-
180- # publish messages_to_send messages
181- for i in range (messages_to_send ):
182- publisher .publish (Message (body = "test" + str (i )))
183- publisher .close ()
151+ publish_messages (connection , messages_to_send , queue_name )
184152
185153 # workaround: it looks like when the consumer finish to consume invalidate the connection
186154 # so for the moment we need to use one dedicated
@@ -204,9 +172,7 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None:
204172
205173 message_count_dead_lettering = management .purge_queue (queue_dead_lettering )
206174
207- management .unbind (bind_path )
208- management .delete_exchange (exchange_dead_lettering )
209- management .delete_queue (queue_dead_lettering )
175+ cleanup_dead_lettering (management , bind_path )
210176
211177 management .close ()
212178
@@ -220,30 +186,13 @@ def test_consumer_async_queue_with_discard_with_annotations(
220186) -> None :
221187 messages_to_send = 1000
222188
223- exchange_dead_lettering = "exchange-dead-letter"
224189 queue_dead_lettering = "queue-dead-letter"
225- queue_name = "test-queue_async_discard"
226- binding_key = "key_dead_letter"
190+ queue_name = "test-queue-async-discard"
191+ exchange_dead_lettering = "exchange-dead-letter"
192+ binding_key = "key-dead-letter"
227193
228194 management = connection .management ()
229195
230- # configuring dead lettering
231- management .declare_exchange (
232- ExchangeSpecification (
233- name = exchange_dead_lettering ,
234- exchange_type = ExchangeType .fanout ,
235- arguments = {},
236- )
237- )
238- management .declare_queue (QuorumQueueSpecification (name = queue_dead_lettering ))
239- bind_path = management .bind (
240- BindingSpecification (
241- source_exchange = exchange_dead_lettering ,
242- destination_queue = queue_dead_lettering ,
243- binding_key = binding_key ,
244- )
245- )
246-
247196 management .declare_queue (
248197 QuorumQueueSpecification (
249198 name = queue_name ,
@@ -252,16 +201,12 @@ def test_consumer_async_queue_with_discard_with_annotations(
252201 )
253202 )
254203
204+ publish_messages (connection , messages_to_send , queue_name )
205+
206+ bind_path = setup_dead_lettering (management )
255207 addr_queue = queue_address (queue_name )
256208 addr_queue_dl = queue_address (queue_dead_lettering )
257209
258- publisher = connection .publisher ("/queues/" + queue_name )
259-
260- # publish messages_to_send messages
261- for i in range (messages_to_send ):
262- publisher .publish (Message (body = "test" + str (i )))
263- publisher .close ()
264-
265210 # workaround: it looks like when the consumer finish to consume invalidate the connection
266211 # so for the moment we need to use one dedicated
267212 connection_consumer = create_connection ()
@@ -283,20 +228,18 @@ def test_consumer_async_queue_with_discard_with_annotations(
283228 message = new_consumer .consume ()
284229 new_consumer .close ()
285230
286- assert "x-opt-string" in message .annotations
287-
288231 message_count = management .purge_queue (queue_name )
289232
290233 management .delete_queue (queue_name )
291234
292235 message_count_dead_lettering = management .purge_queue (queue_dead_lettering )
293236
294- management .unbind (bind_path )
295- management .delete_exchange (exchange_dead_lettering )
296- management .delete_queue (queue_dead_lettering )
237+ cleanup_dead_lettering (management , bind_path )
297238
298239 management .close ()
299240
241+ assert "x-opt-string" in message .annotations
242+
300243 assert message_count == 0
301244 # check dead letter queue
302245 assert message_count_dead_lettering == messages_to_send
@@ -305,20 +248,15 @@ def test_consumer_async_queue_with_discard_with_annotations(
305248def test_consumer_async_queue_with_requeue (connection : Connection ) -> None :
306249 messages_to_send = 1000
307250
308- queue_name = "test-queue_async_requeue "
251+ queue_name = "test-queue-async-requeue "
309252
310253 management = connection .management ()
311254
312255 management .declare_queue (QuorumQueueSpecification (name = queue_name ))
313256
314257 addr_queue = queue_address (queue_name )
315258
316- publisher = connection .publisher ("/queues/" + queue_name )
317-
318- # publish messages_to_send messages
319- for i in range (messages_to_send ):
320- publisher .publish (Message (body = "test" + str (i )))
321- publisher .close ()
259+ publish_messages (connection , messages_to_send , queue_name )
322260
323261 # workaround: it looks like when the consumer finish to consume invalidate the connection
324262 # so for the moment we need to use one dedicated
@@ -349,20 +287,15 @@ def test_consumer_async_queue_with_requeue_with_annotations(
349287) -> None :
350288 messages_to_send = 1000
351289
352- queue_name = "test-queue_async_requeue "
290+ queue_name = "test-queue-async-requeue "
353291
354292 management = connection .management ()
355293
356294 management .declare_queue (QuorumQueueSpecification (name = queue_name ))
357295
358296 addr_queue = queue_address (queue_name )
359297
360- publisher = connection .publisher ("/queues/" + queue_name )
361-
362- # publish messages_to_send messages
363- for i in range (messages_to_send ):
364- publisher .publish (Message (body = "test" + str (i )))
365- publisher .close ()
298+ publish_messages (connection , messages_to_send , queue_name )
366299
367300 # workaround: it looks like when the consumer finish to consume invalidate the connection
368301 # so for the moment we need to use one dedicated
@@ -385,11 +318,11 @@ def test_consumer_async_queue_with_requeue_with_annotations(
385318 message = new_consumer .consume ()
386319 new_consumer .close ()
387320
388- assert "x-opt-string" in message .annotations
389-
390321 message_count = management .purge_queue (queue_name )
391322
392323 management .delete_queue (queue_name )
393324 management .close ()
394325
326+ assert "x-opt-string" in message .annotations
327+
395328 assert message_count > 0
0 commit comments