diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index ecbbc4a..9960a7b 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -4,11 +4,11 @@ from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert, AddressHelper, AMQPMessagingHandler, - BindingSpecification, Connection, Environment, Event, ExchangeSpecification, + ExchangeToQueueBindingSpecification, Message, OutcomeState, QuorumQueueSpecification, @@ -102,7 +102,7 @@ def main() -> None: print("binding queue to exchange") bind_name = management.bind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, binding_key=routing_key, diff --git a/examples/reconnection/reconnection_example.py b/examples/reconnection/reconnection_example.py index 34bfb40..3cf851b 100644 --- a/examples/reconnection/reconnection_example.py +++ b/examples/reconnection/reconnection_example.py @@ -8,13 +8,13 @@ from rabbitmq_amqp_python_client import ( AddressHelper, AMQPMessagingHandler, - BindingSpecification, Connection, ConnectionClosed, Consumer, Environment, Event, ExchangeSpecification, + ExchangeToQueueBindingSpecification, Management, Message, Publisher, @@ -160,7 +160,7 @@ def main() -> None: print("binding queue to exchange") bind_name = connection_configuration.management.bind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, binding_key=routing_key, diff --git a/examples/tls/tls_example.py b/examples/tls/tls_example.py index 48ec488..d118c71 100644 --- a/examples/tls/tls_example.py +++ b/examples/tls/tls_example.py @@ -4,12 +4,12 @@ from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert, AddressHelper, AMQPMessagingHandler, - BindingSpecification, ClientCert, Connection, Environment, Event, ExchangeSpecification, + ExchangeToQueueBindingSpecification, Message, QuorumQueueSpecification, SslConfigurationContext, @@ -102,7 +102,7 @@ def main() -> None: print("binding queue to exchange") bind_name = management.bind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, binding_key=routing_key, diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 6bf5811..dca93f4 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -6,8 +6,9 @@ from .connection import Connection from .consumer import Consumer from .entities import ( - BindingSpecification, ExchangeSpecification, + ExchangeToExchangeBindingSpecification, + ExchangeToQueueBindingSpecification, OffsetSpecification, StreamOptions, ) @@ -52,7 +53,8 @@ "QuorumQueueSpecification", "ClassicQueueSpecification", "StreamSpecification", - "BindingSpecification", + "ExchangeToQueueBindingSpecification", + "ExchangeToExchangeBindingSpecification", "QueueType", "Publisher", "Message", diff --git a/rabbitmq_amqp_python_client/address_helper.py b/rabbitmq_amqp_python_client/address_helper.py index 4e05470..138f724 100644 --- a/rabbitmq_amqp_python_client/address_helper.py +++ b/rabbitmq_amqp_python_client/address_helper.py @@ -1,4 +1,9 @@ -from .entities import BindingSpecification +from typing import Optional + +from .entities import ( + ExchangeToExchangeBindingSpecification, + ExchangeToQueueBindingSpecification, +) from .qpid.proton._message import Message @@ -7,19 +12,22 @@ def _is_unreserved(char: str) -> bool: return char.isalnum() or char in "-._~" -def encode_path_segment(input_string: str) -> str: +def encode_path_segment(input_string: Optional[str]) -> str: encoded = [] # Iterate over each character in the input string - for char in input_string: - # Check if the character is an unreserved character - if _is_unreserved(char): - encoded.append(char) # Append as is - else: - # Encode character to %HH format - encoded.append(f"%{ord(char):02X}") + if input_string is not None: + for char in input_string: + # Check if the character is an unreserved character + if _is_unreserved(char): + encoded.append(char) # Append as is + else: + # Encode character to %HH format + encoded.append(f"%{ord(char):02X}") + + return "".join(encoded) - return "".join(encoded) + return "" class AddressHelper: @@ -58,8 +66,13 @@ def path_address() -> str: @staticmethod def binding_path_with_exchange_queue( - bind_specification: BindingSpecification, + bind_specification: ExchangeToQueueBindingSpecification, ) -> str: + if bind_specification.binding_key is not None: + key = ";key=" + encode_path_segment(bind_specification.binding_key) + else: + key = ";key=" + binding_path_wth_exchange_queue_key = ( "/bindings" + "/" @@ -68,11 +81,28 @@ def binding_path_with_exchange_queue( + ";" + "dstq=" + encode_path_segment(bind_specification.destination_queue) + + key + + ";args=" + ) + return binding_path_wth_exchange_queue_key + + @staticmethod + def binding_path_with_exchange_exchange( + bind_specification: ExchangeToExchangeBindingSpecification, + ) -> str: + binding_path_wth_exchange_exchange_key = ( + "/bindings" + + "/" + + "src=" + + encode_path_segment(bind_specification.source_exchange) + + ";" + + "dstq=" + + encode_path_segment(bind_specification.destination_exchange) + ";key=" + encode_path_segment(bind_specification.binding_key) + ";args=" ) - return binding_path_wth_exchange_queue_key + return binding_path_wth_exchange_exchange_key @staticmethod def message_to_address_helper(message: Message, address: str) -> Message: diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 54c3e4a..8f15a3c 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -41,10 +41,17 @@ class OffsetSpecification(Enum): @dataclass -class BindingSpecification: +class ExchangeToQueueBindingSpecification: source_exchange: str destination_queue: str - binding_key: str + binding_key: Optional[str] = None + + +@dataclass +class ExchangeToExchangeBindingSpecification: + source_exchange: str + destination_exchange: str + binding_key: Optional[str] = None class StreamOptions: @@ -52,14 +59,14 @@ class StreamOptions: def __init__(self): # type: ignore self._filter_set: Dict[symbol, Described] = {} - def offset(self, offset_spefication: Union[OffsetSpecification, int]) -> None: - if isinstance(offset_spefication, int): + def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: + if isinstance(offset_specification, int): self._filter_set[symbol(STREAM_OFFSET_SPEC)] = Described( - symbol(STREAM_OFFSET_SPEC), offset_spefication + symbol(STREAM_OFFSET_SPEC), offset_specification ) else: self._filter_set[symbol(STREAM_OFFSET_SPEC)] = Described( - symbol(STREAM_OFFSET_SPEC), offset_spefication.name + symbol(STREAM_OFFSET_SPEC), offset_specification.name ) def filter_values(self, filters: list[str]) -> None: diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 70f9d46..333f0a6 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -5,8 +5,9 @@ from .address_helper import AddressHelper from .common import CommonValues, QueueType from .entities import ( - BindingSpecification, ExchangeSpecification, + ExchangeToExchangeBindingSpecification, + ExchangeToQueueBindingSpecification, QueueInfo, ) from .exceptions import ValidationCodeException @@ -301,12 +302,25 @@ def _validate_reponse_code( "wrong response code received: " + str(response_code) ) - def bind(self, bind_specification: BindingSpecification) -> str: + def bind( + self, + bind_specification: Union[ + ExchangeToQueueBindingSpecification, ExchangeToExchangeBindingSpecification + ], + ) -> str: logger.debug("Bind Operation called") + body = {} - body["binding_key"] = bind_specification.binding_key + if bind_specification.binding_key is not None: + body["binding_key"] = bind_specification.binding_key + else: + body["binding_key"] = "" body["source"] = bind_specification.source_exchange - body["destination_queue"] = bind_specification.destination_queue + if isinstance(bind_specification, ExchangeToQueueBindingSpecification): + body["destination_queue"] = bind_specification.destination_queue + elif isinstance(bind_specification, ExchangeToExchangeBindingSpecification): + body["destination_exchange"] = bind_specification.destination_exchange + body["arguments"] = {} # type: ignore path = AddressHelper.path_address() @@ -319,17 +333,43 @@ def bind(self, bind_specification: BindingSpecification) -> str: CommonValues.response_code_204.value, ], ) + binding_path = "" - binding_path_with_queue = AddressHelper.binding_path_with_exchange_queue( - bind_specification - ) - return binding_path_with_queue + if isinstance(bind_specification, ExchangeToQueueBindingSpecification): + binding_path = AddressHelper.binding_path_with_exchange_queue( + bind_specification + ) + elif isinstance(bind_specification, ExchangeToExchangeBindingSpecification): + binding_path = AddressHelper.binding_path_with_exchange_exchange( + bind_specification + ) - def unbind(self, binding_exchange_queue_path: str) -> None: + return binding_path + + def unbind( + self, + bind_specification: Union[ + str, + ExchangeToQueueBindingSpecification, + ExchangeToExchangeBindingSpecification, + ], + ) -> None: logger.debug("UnBind Operation called") + binding_name = "" + if isinstance(bind_specification, str): + binding_name = bind_specification + else: + if isinstance(bind_specification, ExchangeToQueueBindingSpecification): + binding_name = AddressHelper.binding_path_with_exchange_queue( + bind_specification + ) + elif isinstance(bind_specification, ExchangeToExchangeBindingSpecification): + binding_name = AddressHelper.binding_path_with_exchange_exchange( + bind_specification + ) self.request( None, - binding_exchange_queue_path, + binding_name, CommonValues.command_delete.value, [ CommonValues.response_code_204.value, diff --git a/tests/test_management.py b/tests/test_management.py index e91a131..6396780 100644 --- a/tests/test_management.py +++ b/tests/test_management.py @@ -1,9 +1,10 @@ from datetime import timedelta from rabbitmq_amqp_python_client import ( - BindingSpecification, ClassicQueueSpecification, ExchangeSpecification, + ExchangeToExchangeBindingSpecification, + ExchangeToQueueBindingSpecification, ExchangeType, Management, QueueType, @@ -73,7 +74,7 @@ def test_bind_exchange_to_queue(management: Management) -> None: management.declare_queue(QuorumQueueSpecification(name=queue_name)) binding_exchange_queue_path = management.bind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, binding_key=routing_key, @@ -98,6 +99,169 @@ def test_bind_exchange_to_queue(management: Management) -> None: management.unbind(binding_exchange_queue_path) +def test_bind_exchange_to_queue_without_key(management: Management) -> None: + + exchange_name = "test-bind-exchange-to-queue-exchange" + queue_name = "test-bind-exchange-to-queue-queue" + + management.declare_exchange(ExchangeSpecification(name=exchange_name)) + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + binding_exchange_queue_path = management.bind( + ExchangeToQueueBindingSpecification( + source_exchange=exchange_name, + destination_queue=queue_name, + ) + ) + + assert ( + binding_exchange_queue_path + == "/bindings/src=" + exchange_name + ";dstq=" + queue_name + ";key=" + ";args=" + ) + + management.unbind(binding_exchange_queue_path) + + management.delete_exchange(exchange_name) + + management.delete_queue(queue_name) + + +def test_bind_exchange_to_exchange_without_key(management: Management) -> None: + + source_exchange_name = "test-bind-exchange-to-queue-exchange" + destination_exchange_name = "test-bind-exchange-to-queue-queue" + + management.declare_exchange(ExchangeSpecification(name=source_exchange_name)) + + management.declare_exchange(ExchangeSpecification(name=destination_exchange_name)) + + binding_exchange_queue_path = management.bind( + ExchangeToExchangeBindingSpecification( + source_exchange=source_exchange_name, + destination_exchange=destination_exchange_name, + ) + ) + + assert ( + binding_exchange_queue_path + == "/bindings/src=" + + source_exchange_name + + ";dstq=" + + destination_exchange_name + + ";key=" + + ";args=" + ) + + management.unbind(binding_exchange_queue_path) + + management.delete_exchange(source_exchange_name) + + management.delete_exchange(destination_exchange_name) + + +def test_bind_unbind_by_binding_spec(management: Management) -> None: + + exchange_name = "test-bind-exchange-to-queue-exchange" + queue_name = "test-bind-exchange-to-queue-queue" + + management.declare_exchange(ExchangeSpecification(name=exchange_name)) + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + management.bind( + ExchangeToQueueBindingSpecification( + source_exchange=exchange_name, + destination_queue=queue_name, + ) + ) + + management.unbind( + ExchangeToQueueBindingSpecification( + source_exchange=exchange_name, + destination_queue=queue_name, + ) + ) + + management.delete_exchange(exchange_name) + + management.delete_queue(queue_name) + + +def test_bind_unbind_exchange_by_exchange_spec(management: Management) -> None: + + source_exchange_name = "test-bind-exchange-to-queue-exchange" + destination_exchange_name = "test-bind-exchange-to-queue-queue" + + management.declare_exchange(ExchangeSpecification(name=source_exchange_name)) + + management.declare_exchange(ExchangeSpecification(name=destination_exchange_name)) + + binding_exchange_queue_path = management.bind( + ExchangeToExchangeBindingSpecification( + source_exchange=source_exchange_name, + destination_exchange=destination_exchange_name, + ) + ) + + assert ( + binding_exchange_queue_path + == "/bindings/src=" + + source_exchange_name + + ";dstq=" + + destination_exchange_name + + ";key=" + + ";args=" + ) + + management.unbind( + ExchangeToExchangeBindingSpecification( + source_exchange=source_exchange_name, + destination_exchange=destination_exchange_name, + ) + ) + + management.delete_exchange(source_exchange_name) + + management.delete_exchange(destination_exchange_name) + + +def test_bind_exchange_to_exchange(management: Management) -> None: + + source_exchange_name = "source_exchange" + destination_exchange_name = "destination_exchange" + routing_key = "routing-key" + + management.declare_exchange(ExchangeSpecification(name=source_exchange_name)) + + management.declare_exchange(ExchangeSpecification(name=destination_exchange_name)) + + binding_exchange_exchange_path = management.bind( + ExchangeToExchangeBindingSpecification( + source_exchange=source_exchange_name, + destination_exchange=destination_exchange_name, + binding_key=routing_key, + ) + ) + + assert ( + binding_exchange_exchange_path + == "/bindings/src=" + + source_exchange_name + + ";dstq=" + + destination_exchange_name + + ";key=" + + routing_key + + ";args=" + ) + + management.unbind(binding_exchange_exchange_path) + + management.delete_exchange(source_exchange_name) + + management.delete_exchange(destination_exchange_name) + + def test_queue_info_with_validations(management: Management) -> None: queue_name = "test_queue_info_with_validation" diff --git a/tests/utils.py b/tests/utils.py index a70ece0..bc4ecca 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,10 +2,10 @@ from rabbitmq_amqp_python_client import ( AddressHelper, - BindingSpecification, Connection, Delivery, ExchangeSpecification, + ExchangeToQueueBindingSpecification, ExchangeType, Management, Message, @@ -55,7 +55,7 @@ def setup_dead_lettering(management: Management) -> str: ) management.declare_queue(QuorumQueueSpecification(name=queue_dead_lettering)) bind_path = management.bind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_dead_lettering, destination_queue=queue_dead_lettering, binding_key=binding_key, @@ -74,7 +74,7 @@ def create_binding( management.declare_queue(QuorumQueueSpecification(name=queue_name)) bind_name = management.bind( - BindingSpecification( + ExchangeToQueueBindingSpecification( source_exchange=exchange_name, destination_queue=queue_name, binding_key=routing_key,