Skip to content

Commit 2454544

Browse files
author
DanielePalaia
committed
refactoring tests
1 parent 148c436 commit 2454544

File tree

3 files changed

+89
-106
lines changed

3 files changed

+89
-106
lines changed

tests/test_consumer.py

Lines changed: 31 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
from rabbitmq_amqp_python_client import (
2-
BindingSpecification,
32
Connection,
4-
ExchangeSpecification,
5-
ExchangeType,
6-
Message,
73
QuorumQueueSpecification,
84
queue_address,
95
)
@@ -17,7 +13,12 @@
1713
MyMessageHandlerRequeue,
1814
MyMessageHandlerRequeueWithAnnotations,
1915
)
20-
from .utils import create_connection
16+
from .utils import (
17+
cleanup_dead_lettering,
18+
create_connection,
19+
publish_messages,
20+
setup_dead_lettering,
21+
)
2122

2223

2324
def test_consumer_sync_queue_accept(connection: Connection) -> None:
@@ -29,17 +30,12 @@ def test_consumer_sync_queue_accept(connection: Connection) -> None:
2930
management.declare_queue(QuorumQueueSpecification(name=queue_name))
3031

3132
addr_queue = queue_address(queue_name)
32-
33-
publisher = connection.publisher("/queues/" + queue_name)
3433
consumer = connection.consumer(addr_queue)
3534

3635
consumed = 0
3736

3837
# publish messages_to_send messages
39-
for i in range(messages_to_send):
40-
publisher.publish(Message(body="test" + str(i)))
41-
42-
publisher.close()
38+
publish_messages(connection, messages_to_send, queue_name)
4339

4440
# consumer synchronously without handler
4541
for i in range(messages_to_send):
@@ -59,20 +55,15 @@ def test_consumer_async_queue_accept(connection: Connection) -> None:
5955

6056
messages_to_send = 1000
6157

62-
queue_name = "test-queue_async_accept"
58+
queue_name = "test-queue-async-accept"
6359

6460
management = connection.management()
6561

6662
management.declare_queue(QuorumQueueSpecification(name=queue_name))
6763

6864
addr_queue = queue_address(queue_name)
6965

70-
publisher = connection.publisher("/queues/" + queue_name)
71-
72-
# publish messages_to_send messages
73-
for i in range(messages_to_send):
74-
publisher.publish(Message(body="test" + str(i)))
75-
publisher.close()
66+
publish_messages(connection, messages_to_send, queue_name)
7667

7768
# workaround: it looks like when the consumer finish to consume invalidate the connection
7869
# so for the moment we need to use one dedicated
@@ -91,6 +82,8 @@ def test_consumer_async_queue_accept(connection: Connection) -> None:
9182

9283
message_count = management.purge_queue(queue_name)
9384

85+
management.delete_queue(queue_name)
86+
9487
management.close()
9588

9689
assert message_count == 0
@@ -100,20 +93,15 @@ def test_consumer_async_queue_no_ack(connection: Connection) -> None:
10093

10194
messages_to_send = 1000
10295

103-
queue_name = "test-queue_async_no_ack"
96+
queue_name = "test-queue-async-no-ack"
10497

10598
management = connection.management()
10699

107100
management.declare_queue(QuorumQueueSpecification(name=queue_name))
108101

109102
addr_queue = queue_address(queue_name)
110103

111-
publisher = connection.publisher("/queues/" + queue_name)
112-
113-
# publish messages_to_send messages
114-
for i in range(messages_to_send):
115-
publisher.publish(Message(body="test" + str(i)))
116-
publisher.close()
104+
publish_messages(connection, messages_to_send, queue_name)
117105

118106
# workaround: it looks like when the consumer finish to consume invalidate the connection
119107
# so for the moment we need to use one dedicated
@@ -141,29 +129,16 @@ def test_consumer_async_queue_no_ack(connection: Connection) -> None:
141129
def test_consumer_async_queue_with_discard(connection: Connection) -> None:
142130
messages_to_send = 1000
143131

144-
exchange_dead_lettering = "exchange-dead-letter"
145132
queue_dead_lettering = "queue-dead-letter"
146-
queue_name = "test-queue_async_discard"
147-
binding_key = "key_dead_letter"
133+
queue_name = "test-queue-async-discard"
134+
exchange_dead_lettering = "exchange-dead-letter"
135+
binding_key = "key-dead-letter"
148136

149137
management = connection.management()
150138

151139
# configuring dead lettering
152-
management.declare_exchange(
153-
ExchangeSpecification(
154-
name=exchange_dead_lettering,
155-
exchange_type=ExchangeType.fanout,
156-
arguments={},
157-
)
158-
)
159-
management.declare_queue(QuorumQueueSpecification(name=queue_dead_lettering))
160-
bind_path = management.bind(
161-
BindingSpecification(
162-
source_exchange=exchange_dead_lettering,
163-
destination_queue=queue_dead_lettering,
164-
binding_key=binding_key,
165-
)
166-
)
140+
bind_path = setup_dead_lettering(management)
141+
addr_queue = queue_address(queue_name)
167142

168143
management.declare_queue(
169144
QuorumQueueSpecification(
@@ -173,14 +148,7 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None:
173148
)
174149
)
175150

176-
addr_queue = queue_address(queue_name)
177-
178-
publisher = connection.publisher("/queues/" + queue_name)
179-
180-
# publish messages_to_send messages
181-
for i in range(messages_to_send):
182-
publisher.publish(Message(body="test" + str(i)))
183-
publisher.close()
151+
publish_messages(connection, messages_to_send, queue_name)
184152

185153
# workaround: it looks like when the consumer finish to consume invalidate the connection
186154
# so for the moment we need to use one dedicated
@@ -204,9 +172,7 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None:
204172

205173
message_count_dead_lettering = management.purge_queue(queue_dead_lettering)
206174

207-
management.unbind(bind_path)
208-
management.delete_exchange(exchange_dead_lettering)
209-
management.delete_queue(queue_dead_lettering)
175+
cleanup_dead_lettering(management, bind_path)
210176

211177
management.close()
212178

@@ -220,30 +186,13 @@ def test_consumer_async_queue_with_discard_with_annotations(
220186
) -> None:
221187
messages_to_send = 1000
222188

223-
exchange_dead_lettering = "exchange-dead-letter"
224189
queue_dead_lettering = "queue-dead-letter"
225-
queue_name = "test-queue_async_discard"
226-
binding_key = "key_dead_letter"
190+
queue_name = "test-queue-async-discard"
191+
exchange_dead_lettering = "exchange-dead-letter"
192+
binding_key = "key-dead-letter"
227193

228194
management = connection.management()
229195

230-
# configuring dead lettering
231-
management.declare_exchange(
232-
ExchangeSpecification(
233-
name=exchange_dead_lettering,
234-
exchange_type=ExchangeType.fanout,
235-
arguments={},
236-
)
237-
)
238-
management.declare_queue(QuorumQueueSpecification(name=queue_dead_lettering))
239-
bind_path = management.bind(
240-
BindingSpecification(
241-
source_exchange=exchange_dead_lettering,
242-
destination_queue=queue_dead_lettering,
243-
binding_key=binding_key,
244-
)
245-
)
246-
247196
management.declare_queue(
248197
QuorumQueueSpecification(
249198
name=queue_name,
@@ -252,16 +201,12 @@ def test_consumer_async_queue_with_discard_with_annotations(
252201
)
253202
)
254203

204+
publish_messages(connection, messages_to_send, queue_name)
205+
206+
bind_path = setup_dead_lettering(management)
255207
addr_queue = queue_address(queue_name)
256208
addr_queue_dl = queue_address(queue_dead_lettering)
257209

258-
publisher = connection.publisher("/queues/" + queue_name)
259-
260-
# publish messages_to_send messages
261-
for i in range(messages_to_send):
262-
publisher.publish(Message(body="test" + str(i)))
263-
publisher.close()
264-
265210
# workaround: it looks like when the consumer finish to consume invalidate the connection
266211
# so for the moment we need to use one dedicated
267212
connection_consumer = create_connection()
@@ -291,9 +236,7 @@ def test_consumer_async_queue_with_discard_with_annotations(
291236

292237
message_count_dead_lettering = management.purge_queue(queue_dead_lettering)
293238

294-
management.unbind(bind_path)
295-
management.delete_exchange(exchange_dead_lettering)
296-
management.delete_queue(queue_dead_lettering)
239+
cleanup_dead_lettering(management, bind_path)
297240

298241
management.close()
299242

@@ -305,20 +248,15 @@ def test_consumer_async_queue_with_discard_with_annotations(
305248
def test_consumer_async_queue_with_requeue(connection: Connection) -> None:
306249
messages_to_send = 1000
307250

308-
queue_name = "test-queue_async_requeue"
251+
queue_name = "test-queue-async-requeue"
309252

310253
management = connection.management()
311254

312255
management.declare_queue(QuorumQueueSpecification(name=queue_name))
313256

314257
addr_queue = queue_address(queue_name)
315258

316-
publisher = connection.publisher("/queues/" + queue_name)
317-
318-
# publish messages_to_send messages
319-
for i in range(messages_to_send):
320-
publisher.publish(Message(body="test" + str(i)))
321-
publisher.close()
259+
publish_messages(connection, messages_to_send, queue_name)
322260

323261
# workaround: it looks like when the consumer finish to consume invalidate the connection
324262
# so for the moment we need to use one dedicated
@@ -349,20 +287,15 @@ def test_consumer_async_queue_with_requeue_with_annotations(
349287
) -> None:
350288
messages_to_send = 1000
351289

352-
queue_name = "test-queue_async_requeue"
290+
queue_name = "test-queue-async-requeue"
353291

354292
management = connection.management()
355293

356294
management.declare_queue(QuorumQueueSpecification(name=queue_name))
357295

358296
addr_queue = queue_address(queue_name)
359297

360-
publisher = connection.publisher("/queues/" + queue_name)
361-
362-
# publish messages_to_send messages
363-
for i in range(messages_to_send):
364-
publisher.publish(Message(body="test" + str(i)))
365-
publisher.close()
298+
publish_messages(connection, messages_to_send, queue_name)
366299

367300
# workaround: it looks like when the consumer finish to consume invalidate the connection
368301
# so for the moment we need to use one dedicated

tests/test_publisher.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ def test_publish_exchange(connection: Connection) -> None:
7373

7474
def test_publish_purge(connection: Connection) -> None:
7575
messages_to_publish = 20
76-
connection = Connection("amqp://guest:guest@localhost:5672/")
77-
connection.dial()
7876

7977
queue_name = "test-queue"
8078
management = connection.management()
@@ -92,12 +90,12 @@ def test_publish_purge(connection: Connection) -> None:
9290

9391
time.sleep(4)
9492

95-
message_purged = management.purge_queue(queue_name)
96-
9793
attempt = 0
98-
purged_messages = 0
99-
while purged_messages != messages_to_publish:
100-
purged_messages = management.purge_queue(queue_name)
94+
message_purged = 0
95+
# check right number of messages purged
96+
# publish may delay so we loop several times till the condition is met
97+
while message_purged != messages_to_publish:
98+
message_purged = management.purge_queue(queue_name)
10199
time.sleep(1)
102100
attempt = attempt + 1
103101
if attempt > 60:

tests/utils.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,60 @@
1-
from rabbitmq_amqp_python_client import Connection
1+
from rabbitmq_amqp_python_client import (
2+
BindingSpecification,
3+
Connection,
4+
ExchangeSpecification,
5+
ExchangeType,
6+
Management,
7+
Message,
8+
QuorumQueueSpecification,
9+
)
210

311

412
def create_connection() -> Connection:
513
connection_consumer = Connection("amqp://guest:guest@localhost:5672/")
614
connection_consumer.dial()
715

816
return connection_consumer
17+
18+
19+
def publish_messages(connection: Connection, messages_to_send: int, queue_name) -> None:
20+
publisher = connection.publisher("/queues/" + queue_name)
21+
# publish messages_to_send messages
22+
for i in range(messages_to_send):
23+
publisher.publish(Message(body="test" + str(i)))
24+
publisher.close()
25+
26+
27+
def setup_dead_lettering(management: Management) -> str:
28+
29+
exchange_dead_lettering = "exchange-dead-letter"
30+
queue_dead_lettering = "queue-dead-letter"
31+
binding_key = "key_dead_letter"
32+
33+
# configuring dead lettering
34+
management.declare_exchange(
35+
ExchangeSpecification(
36+
name=exchange_dead_lettering,
37+
exchange_type=ExchangeType.fanout,
38+
arguments={},
39+
)
40+
)
41+
management.declare_queue(QuorumQueueSpecification(name=queue_dead_lettering))
42+
bind_path = management.bind(
43+
BindingSpecification(
44+
source_exchange=exchange_dead_lettering,
45+
destination_queue=queue_dead_lettering,
46+
binding_key=binding_key,
47+
)
48+
)
49+
50+
return bind_path
51+
52+
53+
def cleanup_dead_lettering(management: Management, bind_path: str) -> None:
54+
55+
exchange_dead_lettering = "exchange-dead-letter"
56+
queue_dead_lettering = "queue-dead-letter"
57+
58+
management.unbind(bind_path)
59+
management.delete_exchange(exchange_dead_lettering)
60+
management.delete_queue(queue_dead_lettering)

0 commit comments

Comments
 (0)