|
11 | 11 | MyMessageHandlerDiscard, |
12 | 12 | MyMessageHandlerNoack, |
13 | 13 | MyMessageHandlerRequeue, |
14 | | - create_connection, |
| 14 | + MyMessageHandlerRequeueWithAnnotations, |
15 | 15 | ) |
| 16 | +from .utils import create_connection |
16 | 17 |
|
17 | 18 |
|
18 | 19 | def test_consumer_sync_queue_ack(connection: Connection) -> None: |
@@ -81,9 +82,9 @@ def test_consumer_async_queue_accept(connection: Connection) -> None: |
81 | 82 | except ConsumerTestException: |
82 | 83 | pass |
83 | 84 |
|
84 | | - message_count = management.purge_queue(queue_name) |
| 85 | + consumer.close() |
85 | 86 |
|
86 | | - management.delete_queue(queue_name) |
| 87 | + message_count = management.purge_queue(queue_name) |
87 | 88 |
|
88 | 89 | management.close() |
89 | 90 |
|
@@ -215,3 +216,54 @@ def test_consumer_async_queue_with_requeue(connection: Connection) -> None: |
215 | 216 | management.close() |
216 | 217 |
|
217 | 218 | assert message_count > 0 |
| 219 | + |
| 220 | + |
| 221 | +def test_consumer_async_queue_with_requeue_with_annotations( |
| 222 | + connection: Connection, |
| 223 | +) -> None: |
| 224 | + messages_to_send = 1000 |
| 225 | + |
| 226 | + queue_name = "test-queue_async_requeue" |
| 227 | + |
| 228 | + management = connection.management() |
| 229 | + |
| 230 | + management.declare_queue(QuorumQueueSpecification(name=queue_name)) |
| 231 | + |
| 232 | + addr_queue = queue_address(queue_name) |
| 233 | + |
| 234 | + publisher = connection.publisher("/queues/" + queue_name) |
| 235 | + |
| 236 | + # publish messages_to_send messages |
| 237 | + for i in range(messages_to_send): |
| 238 | + publisher.publish(Message(body="test" + str(i))) |
| 239 | + publisher.close() |
| 240 | + |
| 241 | + # workaround: it looks like when the consumer finish to consume invalidate the connection |
| 242 | + # so for the moment we need to use one dedicated |
| 243 | + connection_consumer = create_connection() |
| 244 | + |
| 245 | + consumer = connection_consumer.consumer( |
| 246 | + addr_queue, handler=MyMessageHandlerRequeueWithAnnotations() |
| 247 | + ) |
| 248 | + |
| 249 | + try: |
| 250 | + consumer.run() |
| 251 | + # ack to terminate the consumer |
| 252 | + except ConsumerTestException: |
| 253 | + pass |
| 254 | + |
| 255 | + consumer.close() |
| 256 | + |
| 257 | + # check for added annotation |
| 258 | + new_consumer = connection.consumer(addr_queue) |
| 259 | + message = new_consumer.consume() |
| 260 | + new_consumer.close() |
| 261 | + |
| 262 | + assert "x-opt-string" in message.annotations |
| 263 | + |
| 264 | + message_count = management.purge_queue(queue_name) |
| 265 | + |
| 266 | + management.delete_queue(queue_name) |
| 267 | + management.close() |
| 268 | + |
| 269 | + assert message_count > 0 |
0 commit comments