Skip to content

Commit aec72f6

Browse files
committed
add tests for amqp 0.9.1
1 parent 2835843 commit aec72f6

File tree

1 file changed

+72
-0
lines changed

1 file changed

+72
-0
lines changed

tests/asyncio/test_amqp_091.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import functools
2+
3+
import pika
4+
import pytest
5+
6+
from rabbitmq_amqp_python_client import (
7+
AddressHelper,
8+
AsyncConnection,
9+
Converter,
10+
OutcomeState,
11+
QuorumQueueSpecification,
12+
)
13+
from rabbitmq_amqp_python_client.qpid.proton import Message
14+
15+
from .fixtures import * # noqa: F401, F403
16+
17+
18+
@pytest.mark.asyncio
19+
async def test_publish_queue(async_connection: AsyncConnection) -> None:
20+
queue_name = "amqp091-queue"
21+
management = await async_connection.management()
22+
23+
await management.declare_queue(QuorumQueueSpecification(name=queue_name))
24+
25+
raised = False
26+
27+
publisher = None
28+
accepted = False
29+
30+
try:
31+
publisher = await async_connection.publisher(
32+
destination=AddressHelper.queue_address(queue_name)
33+
)
34+
status = await publisher.publish(
35+
Message(body=Converter.string_to_bytes("my_test_string_for_amqp"))
36+
)
37+
if status.remote_state == OutcomeState.ACCEPTED:
38+
accepted = True
39+
except Exception:
40+
raised = True
41+
42+
if publisher is not None:
43+
await publisher.close()
44+
45+
assert accepted is True
46+
assert raised is False
47+
48+
credentials = pika.PlainCredentials("guest", "guest")
49+
parameters = pika.ConnectionParameters("localhost", credentials=credentials)
50+
connection = pika.BlockingConnection(parameters)
51+
channel = connection.channel()
52+
53+
def on_message(chan, method_frame, header_frame, body, userdata=None):
54+
"""Called when a message is received. Log message and ack it."""
55+
chan.basic_ack(delivery_tag=method_frame.delivery_tag)
56+
assert body is not None
57+
body_text = Converter.bytes_to_string(body)
58+
assert body_text is not None
59+
assert body_text == "my_test_string_for_amqp"
60+
channel.stop_consuming()
61+
62+
on_message_callback = functools.partial(on_message, userdata="on_message_userdata")
63+
channel.basic_qos(
64+
prefetch_count=1,
65+
)
66+
channel.basic_consume(queue_name, on_message_callback)
67+
68+
channel.start_consuming()
69+
connection.close()
70+
71+
await management.delete_queue(queue_name)
72+
await management.close()

0 commit comments

Comments
 (0)