11from rabbitmq_amqp_python_client import (
2- BindingSpecification ,
32 Connection ,
4- ExchangeSpecification ,
5- ExchangeType ,
6- Message ,
73 QuorumQueueSpecification ,
84 queue_address ,
95)
1713 MyMessageHandlerRequeue ,
1814 MyMessageHandlerRequeueWithAnnotations ,
1915)
20- from .utils import create_connection
16+ from .utils import (
17+ cleanup_dead_lettering ,
18+ create_connection ,
19+ publish_messages ,
20+ setup_dead_lettering ,
21+ )
2122
2223
2324def test_consumer_sync_queue_accept (connection : Connection ) -> None :
@@ -29,17 +30,12 @@ def test_consumer_sync_queue_accept(connection: Connection) -> None:
2930 management .declare_queue (QuorumQueueSpecification (name = queue_name ))
3031
3132 addr_queue = queue_address (queue_name )
32-
33- publisher = connection .publisher ("/queues/" + queue_name )
3433 consumer = connection .consumer (addr_queue )
3534
3635 consumed = 0
3736
3837 # publish messages_to_send messages
39- for i in range (messages_to_send ):
40- publisher .publish (Message (body = "test" + str (i )))
41-
42- publisher .close ()
38+ publish_messages (connection , messages_to_send , queue_name )
4339
4440 # consumer synchronously without handler
4541 for i in range (messages_to_send ):
@@ -67,12 +63,7 @@ def test_consumer_async_queue_accept(connection: Connection) -> None:
6763
6864 addr_queue = queue_address (queue_name )
6965
70- publisher = connection .publisher ("/queues/" + queue_name )
71-
72- # publish messages_to_send messages
73- for i in range (messages_to_send ):
74- publisher .publish (Message (body = "test" + str (i )))
75- publisher .close ()
66+ publish_messages (connection , messages_to_send , queue_name )
7667
7768 # workaround: it looks like when the consumer finish to consume invalidate the connection
7869 # so for the moment we need to use one dedicated
@@ -108,12 +99,7 @@ def test_consumer_async_queue_no_ack(connection: Connection) -> None:
10899
109100 addr_queue = queue_address (queue_name )
110101
111- publisher = connection .publisher ("/queues/" + queue_name )
112-
113- # publish messages_to_send messages
114- for i in range (messages_to_send ):
115- publisher .publish (Message (body = "test" + str (i )))
116- publisher .close ()
102+ publish_messages (connection , messages_to_send , queue_name )
117103
118104 # workaround: it looks like when the consumer finish to consume invalidate the connection
119105 # so for the moment we need to use one dedicated
@@ -141,46 +127,16 @@ def test_consumer_async_queue_no_ack(connection: Connection) -> None:
141127def test_consumer_async_queue_with_discard (connection : Connection ) -> None :
142128 messages_to_send = 1000
143129
144- exchange_dead_lettering = "exchange-dead-letter"
145130 queue_dead_lettering = "queue-dead-letter"
146131 queue_name = "test-queue_async_discard"
147- binding_key = "key_dead_letter"
148132
149133 management = connection .management ()
150134
151135 # configuring dead lettering
152- management .declare_exchange (
153- ExchangeSpecification (
154- name = exchange_dead_lettering ,
155- exchange_type = ExchangeType .fanout ,
156- arguments = {},
157- )
158- )
159- management .declare_queue (QuorumQueueSpecification (name = queue_dead_lettering ))
160- bind_path = management .bind (
161- BindingSpecification (
162- source_exchange = exchange_dead_lettering ,
163- destination_queue = queue_dead_lettering ,
164- binding_key = binding_key ,
165- )
166- )
167-
168- management .declare_queue (
169- QuorumQueueSpecification (
170- name = queue_name ,
171- dead_letter_exchange = exchange_dead_lettering ,
172- dead_letter_routing_key = binding_key ,
173- )
174- )
175-
136+ bind_path = setup_dead_lettering (management )
176137 addr_queue = queue_address (queue_name )
177138
178- publisher = connection .publisher ("/queues/" + queue_name )
179-
180- # publish messages_to_send messages
181- for i in range (messages_to_send ):
182- publisher .publish (Message (body = "test" + str (i )))
183- publisher .close ()
139+ publish_messages (connection , messages_to_send , queue_name )
184140
185141 # workaround: it looks like when the consumer finish to consume invalidate the connection
186142 # so for the moment we need to use one dedicated
@@ -204,9 +160,7 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None:
204160
205161 message_count_dead_lettering = management .purge_queue (queue_dead_lettering )
206162
207- management .unbind (bind_path )
208- management .delete_exchange (exchange_dead_lettering )
209- management .delete_queue (queue_dead_lettering )
163+ cleanup_dead_lettering (management , bind_path )
210164
211165 management .close ()
212166
@@ -220,47 +174,16 @@ def test_consumer_async_queue_with_discard_with_annotations(
220174) -> None :
221175 messages_to_send = 1000
222176
223- exchange_dead_lettering = "exchange-dead-letter"
224177 queue_dead_lettering = "queue-dead-letter"
225178 queue_name = "test-queue_async_discard"
226- binding_key = "key_dead_letter"
227179
228180 management = connection .management ()
229181
230- # configuring dead lettering
231- management .declare_exchange (
232- ExchangeSpecification (
233- name = exchange_dead_lettering ,
234- exchange_type = ExchangeType .fanout ,
235- arguments = {},
236- )
237- )
238- management .declare_queue (QuorumQueueSpecification (name = queue_dead_lettering ))
239- bind_path = management .bind (
240- BindingSpecification (
241- source_exchange = exchange_dead_lettering ,
242- destination_queue = queue_dead_lettering ,
243- binding_key = binding_key ,
244- )
245- )
246-
247- management .declare_queue (
248- QuorumQueueSpecification (
249- name = queue_name ,
250- dead_letter_exchange = exchange_dead_lettering ,
251- dead_letter_routing_key = binding_key ,
252- )
253- )
254-
182+ bind_path = setup_dead_lettering (management )
255183 addr_queue = queue_address (queue_name )
256184 addr_queue_dl = queue_address (queue_dead_lettering )
257185
258- publisher = connection .publisher ("/queues/" + queue_name )
259-
260- # publish messages_to_send messages
261- for i in range (messages_to_send ):
262- publisher .publish (Message (body = "test" + str (i )))
263- publisher .close ()
186+ publish_messages (connection , messages_to_send , queue_name )
264187
265188 # workaround: it looks like when the consumer finish to consume invalidate the connection
266189 # so for the moment we need to use one dedicated
@@ -291,9 +214,7 @@ def test_consumer_async_queue_with_discard_with_annotations(
291214
292215 message_count_dead_lettering = management .purge_queue (queue_dead_lettering )
293216
294- management .unbind (bind_path )
295- management .delete_exchange (exchange_dead_lettering )
296- management .delete_queue (queue_dead_lettering )
217+ cleanup_dead_lettering (management , bind_path )
297218
298219 management .close ()
299220
@@ -313,12 +234,7 @@ def test_consumer_async_queue_with_requeue(connection: Connection) -> None:
313234
314235 addr_queue = queue_address (queue_name )
315236
316- publisher = connection .publisher ("/queues/" + queue_name )
317-
318- # publish messages_to_send messages
319- for i in range (messages_to_send ):
320- publisher .publish (Message (body = "test" + str (i )))
321- publisher .close ()
237+ publish_messages (connection , messages_to_send , queue_name )
322238
323239 # workaround: it looks like when the consumer finish to consume invalidate the connection
324240 # so for the moment we need to use one dedicated
@@ -357,12 +273,7 @@ def test_consumer_async_queue_with_requeue_with_annotations(
357273
358274 addr_queue = queue_address (queue_name )
359275
360- publisher = connection .publisher ("/queues/" + queue_name )
361-
362- # publish messages_to_send messages
363- for i in range (messages_to_send ):
364- publisher .publish (Message (body = "test" + str (i )))
365- publisher .close ()
276+ publish_messages (connection , messages_to_send , queue_name )
366277
367278 # workaround: it looks like when the consumer finish to consume invalidate the connection
368279 # so for the moment we need to use one dedicated
0 commit comments