|
1 | 1 | import asyncio |
| 2 | +import time |
2 | 3 |
|
3 | 4 | import pytest |
4 | 5 |
|
|
7 | 8 | ArgumentOutOfRangeException, |
8 | 9 | AsyncConnection, |
9 | 10 | AsyncEnvironment, |
| 11 | + ConnectionClosed, |
10 | 12 | Message, |
11 | 13 | OutcomeState, |
12 | 14 | QuorumQueueSpecification, |
| 15 | + RecoveryConfiguration, |
13 | 16 | StreamSpecification, |
14 | 17 | ValidationCodeException, |
15 | 18 | ) |
|
18 | 21 | ) |
19 | 22 | from rabbitmq_amqp_python_client.utils import Converter |
20 | 23 |
|
| 24 | +from ..http_requests import delete_all_connections |
21 | 25 | from ..utils import create_binding |
22 | 26 | from .fixtures import * # noqa: F401, F403 |
23 | 27 | from .utils import async_publish_per_message |
@@ -136,33 +140,31 @@ async def test_publish_per_message_async(async_connection: AsyncConnection) -> N |
136 | 140 | assert raised is False |
137 | 141 |
|
138 | 142 |
|
139 | | -# @pytest.mark.asyncio |
140 | | -# async def test_publish_ssl(async_connection_ssl: AsyncConnection) -> None: |
141 | | -# queue_name = "test-queue" |
142 | | -# management = await async_connection_ssl.management() |
| 143 | +@pytest.mark.asyncio |
| 144 | +async def test_publish_ssl(async_connection_ssl: AsyncConnection) -> None: |
| 145 | + queue_name = "test-queue" |
| 146 | + management = await async_connection_ssl.management() |
143 | 147 |
|
144 | | -# await management.declare_queue(QuorumQueueSpecification(name=queue_name)) |
| 148 | + await management.declare_queue(QuorumQueueSpecification(name=queue_name)) |
145 | 149 |
|
146 | | -# raised = False |
147 | | -# publisher = None |
| 150 | + raised = False |
| 151 | + publisher = None |
148 | 152 |
|
149 | | -# try: |
150 | | -# publisher = await async_connection_ssl.publisher( |
151 | | -# destination=AddressHelper.queue_address(queue_name) |
152 | | -# ) |
153 | | -# await publisher.publish( |
154 | | -# Message(body=Converter.string_to_bytes("test")) |
155 | | -# ) |
156 | | -# except Exception: |
157 | | -# raised = True |
| 153 | + try: |
| 154 | + publisher = await async_connection_ssl.publisher( |
| 155 | + destination=AddressHelper.queue_address(queue_name) |
| 156 | + ) |
| 157 | + await publisher.publish(Message(body=Converter.string_to_bytes("test"))) |
| 158 | + except Exception: |
| 159 | + raised = True |
158 | 160 |
|
159 | | -# if publisher is not None: |
160 | | -# await publisher.close() |
| 161 | + if publisher is not None: |
| 162 | + await publisher.close() |
161 | 163 |
|
162 | | -# await management.delete_queue(queue_name) |
163 | | -# await management.close() |
| 164 | + await management.delete_queue(queue_name) |
| 165 | + await management.close() |
164 | 166 |
|
165 | | -# assert raised is False |
| 167 | + assert raised is False |
166 | 168 |
|
167 | 169 |
|
168 | 170 | @pytest.mark.asyncio |
@@ -319,62 +321,63 @@ async def test_publish_purge_async(async_connection: AsyncConnection) -> None: |
319 | 321 | assert message_purged == 20 |
320 | 322 |
|
321 | 323 |
|
322 | | -# @pytest.mark.asyncio |
323 | | -# async def test_disconnection_reconnection_async(async_connection: AsyncConnection) -> None: |
324 | | -# disconnected = False |
325 | | -# generic_exception_raised = False |
326 | | - |
327 | | -# environment = AsyncEnvironment( |
328 | | -# uri="amqp://guest:guest@localhost:5672/", |
329 | | -# recovery_configuration=RecoveryConfiguration(active_recovery=True) |
330 | | -# ) |
331 | | - |
332 | | -# connection_test = await environment.connection() |
333 | | - |
334 | | -# await connection_test.dial() |
335 | | -# # delay |
336 | | -# time.sleep(5) |
337 | | -# messages_to_publish = 10000 |
338 | | -# queue_name = "test-queue-reconnection" |
339 | | -# management = await connection_test.management() |
340 | | - |
341 | | -# await management.declare_queue(QuorumQueueSpecification(name=queue_name)) |
342 | | - |
343 | | -# publisher = await connection_test.publisher( |
344 | | -# destination=AddressHelper.queue_address(queue_name) |
345 | | -# ) |
346 | | -# while True: |
347 | | -# for i in range(messages_to_publish): |
348 | | -# if i == 5: |
349 | | -# # simulate a disconnection |
350 | | -# delete_all_connections() |
351 | | -# try: |
352 | | -# await publisher.publish( |
353 | | -# Message(body=Converter.string_to_bytes("test")) |
354 | | -# ) |
355 | | -# except ConnectionClosed: |
356 | | -# disconnected = True |
357 | | -# # TODO: check if this behavior is correct |
358 | | -# # The underlying sync Connection handles all recovery automatically, |
359 | | -# # hence the async wrapper transparently benefits from it. |
360 | | -# # so the exception should is not raised |
361 | | -# continue |
362 | | -# except Exception: |
363 | | -# generic_exception_raised = True |
364 | | - |
365 | | -# break |
366 | | - |
367 | | -# await publisher.close() |
368 | | - |
369 | | -# # purge the queue and check number of published messages |
370 | | -# message_purged = await management.purge_queue(queue_name) |
371 | | - |
372 | | -# await management.delete_queue(queue_name) |
373 | | -# await management.close() |
374 | | - |
375 | | -# assert generic_exception_raised is False |
376 | | -# # assert disconnected is True |
377 | | -# assert message_purged == messages_to_publish - 1 |
| 324 | +@pytest.mark.asyncio |
| 325 | +async def test_disconnection_reconnection_async( |
| 326 | + async_connection: AsyncConnection, |
| 327 | +) -> None: |
| 328 | + # disconnected = False |
| 329 | + generic_exception_raised = False |
| 330 | + |
| 331 | + environment = AsyncEnvironment( |
| 332 | + uri="amqp://guest:guest@localhost:5672/", |
| 333 | + recovery_configuration=RecoveryConfiguration(active_recovery=True), |
| 334 | + ) |
| 335 | + |
| 336 | + connection_test = await environment.connection() |
| 337 | + |
| 338 | + await connection_test.dial() |
| 339 | + # delay |
| 340 | + time.sleep(5) |
| 341 | + messages_to_publish = 10000 |
| 342 | + queue_name = "test-queue-reconnection" |
| 343 | + management = await connection_test.management() |
| 344 | + |
| 345 | + await management.declare_queue(QuorumQueueSpecification(name=queue_name)) |
| 346 | + |
| 347 | + publisher = await connection_test.publisher( |
| 348 | + destination=AddressHelper.queue_address(queue_name) |
| 349 | + ) |
| 350 | + while True: |
| 351 | + for i in range(messages_to_publish): |
| 352 | + if i == 5: |
| 353 | + # simulate a disconnection |
| 354 | + delete_all_connections() |
| 355 | + try: |
| 356 | + await publisher.publish(Message(body=Converter.string_to_bytes("test"))) |
| 357 | + except ConnectionClosed: |
| 358 | + # disconnected = True |
| 359 | + |
| 360 | + # TODO: check if this behavior is correct |
| 361 | + # The underlying sync Connection handles all recovery automatically, |
| 362 | + # hence the async wrapper transparently benefits from it. |
| 363 | + # so the exception should is not raised |
| 364 | + continue |
| 365 | + except Exception: |
| 366 | + generic_exception_raised = True |
| 367 | + |
| 368 | + break |
| 369 | + |
| 370 | + await publisher.close() |
| 371 | + |
| 372 | + # purge the queue and check number of published messages |
| 373 | + message_purged = await management.purge_queue(queue_name) |
| 374 | + |
| 375 | + await management.delete_queue(queue_name) |
| 376 | + await management.close() |
| 377 | + |
| 378 | + assert generic_exception_raised is False |
| 379 | + # assert disconnected is True |
| 380 | + assert message_purged == messages_to_publish - 1 |
378 | 381 |
|
379 | 382 |
|
380 | 383 | @pytest.mark.asyncio |
|
0 commit comments