Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/reconnection/reconnection_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def create_connection() -> Connection:
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)

connection = environment.connection(
url="amqp://guest:guest@localhost:5672/",
uri="amqp://guest:guest@localhost:5672/",
on_disconnection_handler=on_disconnection,
)
connection.dial()
Expand Down
6 changes: 5 additions & 1 deletion rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
StreamOptions,
)
from .environment import Environment
from .exceptions import ArgumentOutOfRangeException
from .exceptions import (
ArgumentOutOfRangeException,
ValidationCodeException,
)
from .management import Management
from .publisher import Publisher
from .qpid.proton._data import symbol # noqa: E402
Expand Down Expand Up @@ -62,6 +65,7 @@
"AddressHelper",
"AMQPMessagingHandler",
"ArgumentOutOfRangeException",
"ValidationCodeException",
"SslConfigurationContext",
"ClientCert",
"ConnectionClosed",
Expand Down
6 changes: 6 additions & 0 deletions rabbitmq_amqp_python_client/address_helper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .entities import BindingSpecification
from .qpid.proton._message import Message


def _is_unreserved(char: str) -> bool:
Expand Down Expand Up @@ -73,6 +74,11 @@ def binding_path_with_exchange_queue(
)
return binding_path_wth_exchange_queue_key

@staticmethod
def message_to_address_helper(message: Message, address: str) -> Message:
message.address = address
return message


def validate_address(address: str) -> bool:
if address.startswith("/queues") or address.startswith("/exchanges"):
Expand Down
11 changes: 6 additions & 5 deletions rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ def close(self) -> None:
self._conn.close()
self._connections.remove(self)

def publisher(self, destination: str) -> Publisher:
if validate_address(destination) is False:
raise ArgumentOutOfRangeException(
"destination address must start with /queues or /exchanges"
)
def publisher(self, destination: str = "") -> Publisher:
if destination != "":
if validate_address(destination) is False:
raise ArgumentOutOfRangeException(
"destination address must start with /queues or /exchanges"
)
publisher = Publisher(self._conn, destination)
return publisher

Expand Down
5 changes: 3 additions & 2 deletions rabbitmq_amqp_python_client/consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Literal, Optional, Union
from typing import Literal, Optional, Union, cast

from .entities import StreamOptions
from .options import (
Expand Down Expand Up @@ -40,7 +40,8 @@ def _open(self) -> None:

def consume(self, timeout: Union[None, Literal[False], float] = False) -> Message:
if self._receiver is not None:
return self._receiver.receive(timeout=timeout)
message = self._receiver.receive(timeout=timeout)
return cast(Message, message)

def close(self) -> None:
logger.debug("Closing the receiver")
Expand Down
26 changes: 23 additions & 3 deletions rabbitmq_amqp_python_client/publisher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import logging
from typing import Optional

from .address_helper import validate_address
from .exceptions import (
ArgumentOutOfRangeException,
ValidationCodeException,
)
from .options import SenderOptionUnseattle
from .qpid.proton._delivery import Delivery
from .qpid.proton._message import Message
Expand All @@ -13,7 +18,7 @@


class Publisher:
def __init__(self, conn: BlockingConnection, addr: str):
def __init__(self, conn: BlockingConnection, addr: str = ""):
self._sender: Optional[BlockingSender] = None
self._conn = conn
self._addr = addr
Expand All @@ -25,8 +30,23 @@ def _open(self) -> None:
self._sender = self._create_sender(self._addr)

def publish(self, message: Message) -> Delivery:
if self._sender is not None:
return self._sender.send(message)
if (self._addr != "") and (message.address is not None):
raise ValidationCodeException(
"address specified in both message and publisher"
)

if self._addr != "":
if self._sender is not None:
return self._sender.send(message)
else:
if message.address != "":
if validate_address(message.address) is False:
raise ArgumentOutOfRangeException(
"destination address must start with /queues or /exchanges"
)
if self._sender is not None:
delivery = self._sender.send(message)
return delivery

def close(self) -> None:
logger.debug("Closing Sender")
Expand Down
1 change: 0 additions & 1 deletion rabbitmq_amqp_python_client/qpid/proton/_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ def _check(self, err: int) -> int:
def _check_property_keys(self) -> None:
"""
AMQP allows only string keys for properties. This function checks that this requirement is met
and raises a MessageException if not. However, in certain cases, conversions to string are
automatically performed:

1. When a key is a user-defined (non-AMQP) subclass of str.
Expand Down
162 changes: 148 additions & 14 deletions tests/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
from rabbitmq_amqp_python_client import (
AddressHelper,
ArgumentOutOfRangeException,
BindingSpecification,
Connection,
ConnectionClosed,
Environment,
ExchangeSpecification,
Message,
OutcomeState,
QuorumQueueSpecification,
StreamSpecification,
ValidationCodeException,
)

from .http_requests import delete_all_connections
from .utils import create_binding, publish_per_message


def test_publish_queue(connection: Connection) -> None:
Expand All @@ -31,18 +32,64 @@ def test_publish_queue(connection: Connection) -> None:
try:
publisher = connection.publisher("/queues/" + queue_name)
status = publisher.publish(Message(body="test"))
if status.ACCEPTED:
if status.remote_state == OutcomeState.ACCEPTED:
accepted = True
except Exception:
raised = True

if publisher is not None:
publisher.close()

management.delete_queue(queue_name)
management.close()

assert accepted is True
assert raised is False


def test_publish_per_message(connection: Connection) -> None:

queue_name = "test-queue-1"
queue_name_2 = "test-queue-2"
management = connection.management()

management.declare_queue(QuorumQueueSpecification(name=queue_name))
management.declare_queue(QuorumQueueSpecification(name=queue_name_2))

raised = False

publisher = None
accepted = False
accepted_2 = True

try:
publisher = connection.publisher()
status = publish_per_message(
publisher, addr=AddressHelper.queue_address(queue_name)
)
if status.remote_state == OutcomeState.ACCEPTED:
accepted = True
status = publish_per_message(
publisher, addr=AddressHelper.queue_address(queue_name_2)
)
if status.remote_state == OutcomeState.ACCEPTED:
accepted_2 = True
except Exception:
raised = True

if publisher is not None:
publisher.close()

purged_messages_queue_1 = management.purge_queue(queue_name)
purged_messages_queue_2 = management.purge_queue(queue_name_2)
management.delete_queue(queue_name)
management.delete_queue(queue_name_2)
management.close()

assert accepted is True
assert accepted_2 is True
assert purged_messages_queue_1 == 1
assert purged_messages_queue_2 == 1
assert raised is False


Expand Down Expand Up @@ -90,24 +137,64 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
assert raised is True


def test_publish_per_message_to_invalid_destination(connection: Connection) -> None:

queue_name = "test-queue-1"
raised = False

message = Message(body="test")
message = AddressHelper.message_to_address_helper(
message, "/invalid_destination/" + queue_name
)
publisher = connection.publisher()

try:
publisher.publish(message)
except ArgumentOutOfRangeException:
raised = True
except Exception:
raised = False

if publisher is not None:
publisher.close()

assert raised is True


def test_publish_per_message_both_address(connection: Connection) -> None:

queue_name = "test-queue-1"
raised = False

management = connection.management()
management.declare_queue(QuorumQueueSpecification(name=queue_name))

message = Message(body="test")
message = AddressHelper.message_to_address_helper(message, "/queues/" + queue_name)
publisher = connection.publisher("/queues/" + queue_name)

try:
publisher.publish(message)
except ValidationCodeException:
raised = True

if publisher is not None:
publisher.close()

management.delete_queue(queue_name)
management.close()

assert raised is True


def test_publish_exchange(connection: Connection) -> None:

exchange_name = "test-exchange"
queue_name = "test-queue"
management = connection.management()
routing_key = "routing-key"

management.declare_exchange(ExchangeSpecification(name=exchange_name))

management.declare_queue(QuorumQueueSpecification(name=queue_name))

management.bind(
BindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_name,
binding_key=routing_key,
)
)
bind_name = create_binding(management, exchange_name, queue_name, routing_key)

addr = AddressHelper.exchange_address(exchange_name, routing_key)

Expand All @@ -124,6 +211,7 @@ def test_publish_exchange(connection: Connection) -> None:

publisher.close()

management.unbind(bind_name)
management.delete_exchange(exchange_name)
management.delete_queue(queue_name)
management.close()
Expand Down Expand Up @@ -265,3 +353,49 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
for i in range(messages_to_send):

publisher.publish(Message(body="test"))


def test_publish_per_message_exchange(connection: Connection) -> None:

exchange_name = "test-exchange-per-message"
queue_name = "test-queue-per-message"
management = connection.management()
routing_key = "routing-key-per-message"

bind_name = create_binding(management, exchange_name, queue_name, routing_key)

raised = False

publisher = None
accepted = False
accepted_2 = False

try:
publisher = connection.publisher()
status = publish_per_message(
publisher, addr=AddressHelper.exchange_address(exchange_name, routing_key)
)
if status.remote_state == OutcomeState.ACCEPTED:
accepted = True
status = publish_per_message(
publisher, addr=AddressHelper.queue_address(queue_name)
)
if status.remote_state == OutcomeState.ACCEPTED:
accepted_2 = True
except Exception:
raised = True

# if publisher is not None:
publisher.close()

purged_messages_queue = management.purge_queue(queue_name)
management.unbind(bind_name)
management.delete_exchange(exchange_name)
management.delete_queue(queue_name)

management.close()

assert accepted is True
assert accepted_2 is True
assert purged_messages_queue == 2
assert raised is False
Loading