diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index 43938fb..df51e3c 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -1,28 +1,89 @@ +# type: ignore + + from rabbitmq_amqp_python_client import ( + AddressHelper, + AMQPMessagingHandler, BindingSpecification, Connection, + Event, ExchangeSpecification, Message, QuorumQueueSpecification, - exchange_address, ) +class MyMessageHandler(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._count = 0 + + def on_message(self, event: Event): + print("received message: " + str(event.message.annotations)) + + # accepting + self.delivery_context.accept(event) + + # in case of rejection (+eventually deadlettering) + # self.delivery_context.discard(event) + + # in case of requeuing + # self.delivery_context.requeue(event) + + # annotations = {} + # annotations[symbol('x-opt-string')] = 'x-test1' + # in case of requeuing with annotations added + # self.delivery_context.requeue_with_annotations(event, annotations) + + # in case of rejection with annotations added + # self.delivery_context.discard_with_annotations(event) + + print("count " + str(self._count)) + + self._count = self._count + 1 + + if self._count == 100: + print("closing receiver") + # if you want you can add cleanup operations here + # event.receiver.close() + # event.connection.close() + + def on_connection_closed(self, event: Event): + # if you want you can add cleanup operations here + print("connection closed") + + def on_link_closed(self, event: Event) -> None: + # if you want you can add cleanup operations here + print("link closed") + + +def create_connection() -> Connection: + connection = Connection("amqp://guest:guest@localhost:5672/") + connection.dial() + + return connection + + def main() -> None: + exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" - connection = Connection("amqp://guest:guest@localhost:5672/") + messages_to_publish = 100 print("connection to amqp server") - connection.dial() + connection = create_connection() management = connection.management() print("declaring exchange and queue") management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) - management.declare_queue(QuorumQueueSpecification(name=queue_name)) + management.declare_queue( + QuorumQueueSpecification(name=queue_name) + # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter") + ) print("binding queue to exchange") bind_name = management.bind( @@ -33,21 +94,44 @@ def main() -> None: ) ) - addr = exchange_address(exchange_name, routing_key) + addr = AddressHelper.exchange_address(exchange_name, routing_key) + + addr_queue = AddressHelper.queue_address(queue_name) print("create a publisher and publish a test message") publisher = connection.publisher(addr) - publisher.publish(Message(body="test")) + print("purging the queue") + messages_purged = management.purge_queue(queue_name) + + print("messages purged: " + str(messages_purged)) + # management.close() + + # publish 10 messages + for i in range(messages_to_publish): + publisher.publish(Message(body="test")) publisher.close() + print( + "create a consumer and consume the test message - press control + c to terminate to consume" + ) + consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + + try: + consumer.run() + except KeyboardInterrupt: + pass + + print("cleanup") + consumer.close() + # once we finish consuming if we close the connection we need to create a new one + # connection = create_connection() + # management = connection.management() + print("unbind") management.unbind(bind_name) - print("purging the queue") - management.purge_queue(queue_name) - print("delete queue") management.delete_queue(queue_name) @@ -56,7 +140,9 @@ def main() -> None: print("closing connections") management.close() + print("after management closing") connection.close() + print("after connection closing") if __name__ == "__main__": diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 91bf819..4d4c179 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -1,15 +1,22 @@ from importlib import metadata -from .address_helper import exchange_address, queue_address -from .common import QueueType +from .address_helper import AddressHelper +from .amqp_consumer_handler import AMQPMessagingHandler +from .common import ExchangeType, QueueType from .connection import Connection +from .consumer import Consumer from .entities import ( BindingSpecification, ExchangeSpecification, ) +from .exceptions import ArgumentOutOfRangeException from .management import Management from .publisher import Publisher +from .qpid.proton._data import symbol # noqa: E402 +from .qpid.proton._delivery import Delivery +from .qpid.proton._events import Event from .qpid.proton._message import Message +from .qpid.proton.handlers import MessagingHandler from .queues import ( ClassicQueueSpecification, QuorumQueueSpecification, @@ -35,7 +42,14 @@ "BindingSpecification", "QueueType", "Publisher", - "exchange_address", - "queue_address", "Message", + "Consumer", + "MessagingHandler", + "Event", + "Delivery", + "symbol", + "ExchangeType", + "AddressHelper", + "AMQPMessagingHandler", + "ArgumentOutOfRangeException", ] diff --git a/rabbitmq_amqp_python_client/address_helper.py b/rabbitmq_amqp_python_client/address_helper.py index 68a0ef6..9abba3e 100644 --- a/rabbitmq_amqp_python_client/address_helper.py +++ b/rabbitmq_amqp_python_client/address_helper.py @@ -1,7 +1,7 @@ from .entities import BindingSpecification -def is_unreserved(char: str) -> bool: +def _is_unreserved(char: str) -> bool: # According to RFC 3986, unreserved characters are A-Z, a-z, 0-9, '-', '.', '_', and '~' return char.isalnum() or char in "-._~" @@ -12,7 +12,7 @@ def encode_path_segment(input_string: str) -> str: # Iterate over each character in the input string for char in input_string: # Check if the character is an unreserved character - if is_unreserved(char): + if _is_unreserved(char): encoded.append(char) # Append as is else: # Encode character to %HH format @@ -21,49 +21,54 @@ def encode_path_segment(input_string: str) -> str: return "".join(encoded) -def exchange_address(exchange_name: str, routing_key: str = "") -> str: - if routing_key == "": - path = "/exchanges/" + encode_path_segment(exchange_name) - else: - path = ( - "/exchanges/" - + encode_path_segment(exchange_name) - + "/" - + encode_path_segment(routing_key) - ) - - return path - +class AddressHelper: -def queue_address(queue_name: str) -> str: - path = "/queues/" + encode_path_segment(queue_name) - - return path + @staticmethod + def exchange_address(exchange_name: str, routing_key: str = "") -> str: + if routing_key == "": + path = "/exchanges/" + encode_path_segment(exchange_name) + else: + path = ( + "/exchanges/" + + encode_path_segment(exchange_name) + + "/" + + encode_path_segment(routing_key) + ) + return path -def purge_queue_address(queue_name: str) -> str: - path = "/queues/" + encode_path_segment(queue_name) + "/messages" + @staticmethod + def queue_address(queue_name: str) -> str: + path = "/queues/" + encode_path_segment(queue_name) - return path + return path + @staticmethod + def purge_queue_address(queue_name: str) -> str: + path = "/queues/" + encode_path_segment(queue_name) + "/messages" -def path_address() -> str: - path = "/bindings" + return path - return path + @staticmethod + def path_address() -> str: + path = "/bindings" + return path -def binding_path_with_exchange_queue(bind_specification: BindingSpecification) -> str: - binding_path_wth_exchange_queue_key = ( - "/bindings" - + "/" - + "src=" - + encode_path_segment(bind_specification.source_exchange) - + ";" - + "dstq=" - + encode_path_segment(bind_specification.destination_queue) - + ";key=" - + encode_path_segment(bind_specification.binding_key) - + ";args=" - ) - return binding_path_wth_exchange_queue_key + @staticmethod + def binding_path_with_exchange_queue( + bind_specification: BindingSpecification, + ) -> str: + binding_path_wth_exchange_queue_key = ( + "/bindings" + + "/" + + "src=" + + encode_path_segment(bind_specification.source_exchange) + + ";" + + "dstq=" + + encode_path_segment(bind_specification.destination_queue) + + ";key=" + + encode_path_segment(bind_specification.binding_key) + + ";args=" + ) + return binding_path_wth_exchange_queue_key diff --git a/rabbitmq_amqp_python_client/amqp_consumer_handler.py b/rabbitmq_amqp_python_client/amqp_consumer_handler.py new file mode 100644 index 0000000..a1c4612 --- /dev/null +++ b/rabbitmq_amqp_python_client/amqp_consumer_handler.py @@ -0,0 +1,22 @@ +from .delivery_context import DeliveryContext +from .qpid.proton.handlers import MessagingHandler + +""" +AMQPMessagingHandler extends the QPID MessagingHandler. +It is an helper to set the default values needed for manually accepting and settling messages. +self.delivery_context is an instance of DeliveryContext, which is used to accept, reject, +requeue or requeue with annotations a message. +It is not mandatory to use this class, but it is a good practice to use it. +""" + + +class AMQPMessagingHandler(MessagingHandler): # type: ignore + + def __init__(self, auto_accept: bool = False, auto_settle: bool = True): + """ + :param auto_accept: if True, the message is automatically accepted + by default is false, so the user has to manually accept the message and decide with the + different methods of the delivery_context what to do with the message + """ + super().__init__(auto_accept=auto_accept, auto_settle=auto_settle) + self.delivery_context: DeliveryContext = DeliveryContext() diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index b9a9b96..45589dd 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -1,7 +1,10 @@ import logging +from typing import Optional +from .consumer import Consumer from .management import Management from .publisher import Publisher +from .qpid.proton._handlers import MessagingHandler from .qpid.proton.utils import BlockingConnection logger = logging.getLogger(__name__) @@ -34,3 +37,9 @@ def close(self) -> None: def publisher(self, destination: str) -> Publisher: publisher = Publisher(self._conn, destination) return publisher + + def consumer( + self, destination: str, handler: Optional[MessagingHandler] = None + ) -> Consumer: + consumer = Consumer(self._conn, destination, handler) + return consumer diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py new file mode 100644 index 0000000..3ea9b94 --- /dev/null +++ b/rabbitmq_amqp_python_client/consumer.py @@ -0,0 +1,54 @@ +import logging +from typing import Optional + +from .options import ReceiverOptionUnsettled +from .qpid.proton._handlers import MessagingHandler +from .qpid.proton._message import Message +from .qpid.proton.utils import ( + BlockingConnection, + BlockingReceiver, +) + +logger = logging.getLogger(__name__) + + +class Consumer: + def __init__( + self, + conn: BlockingConnection, + addr: str, + handler: Optional[MessagingHandler] = None, + ): + self._receiver: Optional[BlockingReceiver] = None + self._conn = conn + self._addr = addr + self._handler = handler + self._open() + + def _open(self) -> None: + if self._receiver is None: + logger.debug("Creating Sender") + self._receiver = self._create_receiver(self._addr) + + def consume(self) -> Message: + if self._receiver is not None: + return self._receiver.receive() + + def close(self) -> None: + logger.debug("Closing the receiver") + if self._receiver is not None: + self._receiver.close() + + def run(self) -> None: + if self._receiver is not None: + self._receiver.container.run() + + def stop(self) -> None: + if self._receiver is not None: + self._receiver.container.stop_events() + self._receiver.container.stop() + + def _create_receiver(self, addr: str) -> BlockingReceiver: + return self._conn.create_receiver( + addr, options=ReceiverOptionUnsettled(addr), handler=self._handler + ) diff --git a/rabbitmq_amqp_python_client/delivery_context.py b/rabbitmq_amqp_python_client/delivery_context.py new file mode 100644 index 0000000..188ba72 --- /dev/null +++ b/rabbitmq_amqp_python_client/delivery_context.py @@ -0,0 +1,114 @@ +from typing import Dict + +from .exceptions import ArgumentOutOfRangeException +from .qpid.proton._data import PythonAMQPData +from .qpid.proton._delivery import Delivery +from .qpid.proton._events import Event +from .utils import validate_annotations + +""" +DeliveryContext is a class that is used to accept, reject, requeue or requeue with annotations a message. +It is an helper to set the default values needed for manually accepting and settling messages. +""" + + +class DeliveryContext: + """ + Accept the message (AMQP 1.0 accepted outcome). + + This means the message has been processed and the broker can delete it. + """ + + def accept(self, event: Event) -> None: + dlv = event.delivery + dlv.update(Delivery.ACCEPTED) + dlv.settle() + + """ + Reject the message (AMQP 1.0 rejected outcome). + This means the message cannot be processed because it is invalid, the broker can drop it + or dead-letter it if it is configured. + """ + + def discard(self, event: Event) -> None: + dlv = event.delivery + dlv.update(Delivery.REJECTED) + dlv.settle() + + """ + Discard the message with annotations to combine with the existing message annotations. + This means the message cannot be processed because it is invalid, the broker can drop it + or dead-letter it if it is configured. + Application-specific annotation keys must start with the x-opt- prefix. + Annotation keys the broker understands starts with x-, but not with x-opt- + This maps to the AMQP 1.0 + modified{delivery-failed = true, undeliverable-here = true} outcome. + annotations message annotations to combine with existing ones + AMQP + 1.0 modified outcome + The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome + """ + + def discard_with_annotations( + self, event: Event, annotations: Dict[str, "PythonAMQPData"] + ) -> None: + dlv = event.delivery + dlv.local.annotations = annotations + dlv.local.failed = True + dlv.local.undeliverable = True + + validated = validate_annotations(annotations.keys()) + + if validated is False: + raise ArgumentOutOfRangeException( + "Message annotation key must start with 'x-'" + ) + + dlv.update(Delivery.MODIFIED) + dlv.settle() + + """ + Requeue the message (AMQP 1.0 released outcome). + This means the message has not been processed and the broker can requeue it and deliver it + to the same or a different consumer. + """ + + def requeue(self, event: Event) -> None: + dlv = event.delivery + dlv.update(Delivery.RELEASED) + dlv.settle() + + """ + Requeue the message with annotations to combine with the existing message annotations. + This means the message has not been processed and the broker can requeue it and deliver it + to the same or a different consumer. + Application-specific annotation keys must start with the x-opt- prefix. + Annotation keys the broker understands starts with x-, but not with x-opt- + . + This maps to the AMQP 1.0 + modified{delivery-failed = false, undeliverable-here = false} outcome. + annotations message annotations to combine with existing ones + AMQP + 1.0 modified outcome + The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome + """ + + def requeue_with_annotations( + self, event: Event, annotations: Dict[str, "PythonAMQPData"] + ) -> None: + dlv = event.delivery + dlv.local.annotations = annotations + dlv.local.failed = False + dlv.local.undeliverable = False + + validated = validate_annotations(annotations.keys()) + + if validated is False: + raise ArgumentOutOfRangeException( + "Message annotation key must start with 'x-'" + ) + + dlv.update(Delivery.MODIFIED) + dlv.settle() diff --git a/rabbitmq_amqp_python_client/exceptions.py b/rabbitmq_amqp_python_client/exceptions.py index 141e73f..9544dcc 100644 --- a/rabbitmq_amqp_python_client/exceptions.py +++ b/rabbitmq_amqp_python_client/exceptions.py @@ -5,3 +5,12 @@ def __init__(self, msg: str): def __str__(self) -> str: return repr(self.msg) + + +class ArgumentOutOfRangeException(BaseException): + # Constructor or Initializer + def __init__(self, msg: str): + self.msg = msg + + def __str__(self) -> str: + return repr(self.msg) diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 23d3067..8259688 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -2,13 +2,7 @@ import uuid from typing import Any, Optional, Union -from .address_helper import ( - binding_path_with_exchange_queue, - exchange_address, - path_address, - purge_queue_address, - queue_address, -) +from .address_helper import AddressHelper from .common import CommonValues, QueueType from .entities import ( BindingSpecification, @@ -113,7 +107,7 @@ def declare_exchange( body["internal"] = exchange_specification.is_internal body["arguments"] = exchange_specification.arguments # type: ignore - path = exchange_address(exchange_specification.name) + path = AddressHelper.exchange_address(exchange_specification.name) self.request( body, @@ -146,7 +140,7 @@ def declare_queue( elif isinstance(queue_specification, StreamSpecification): body = self._declare_stream(queue_specification) - path = queue_address(queue_specification.name) + path = AddressHelper.queue_address(queue_specification.name) self.request( body, @@ -255,7 +249,7 @@ def _declare_stream( def delete_exchange(self, exchange_name: str) -> None: logger.debug("delete_exchange operation called") - path = exchange_address(exchange_name) + path = AddressHelper.exchange_address(exchange_name) self.request( None, @@ -268,7 +262,7 @@ def delete_exchange(self, exchange_name: str) -> None: def delete_queue(self, queue_name: str) -> None: logger.debug("delete_queue operation called") - path = queue_address(queue_name) + path = AddressHelper.queue_address(queue_name) self.request( None, @@ -302,7 +296,7 @@ def bind(self, bind_specification: BindingSpecification) -> str: body["destination_queue"] = bind_specification.destination_queue body["arguments"] = {} # type: ignore - path = path_address() + path = AddressHelper.path_address() self.request( body, @@ -313,7 +307,9 @@ def bind(self, bind_specification: BindingSpecification) -> str: ], ) - binding_path_with_queue = binding_path_with_exchange_queue(bind_specification) + binding_path_with_queue = AddressHelper.binding_path_with_exchange_queue( + bind_specification + ) return binding_path_with_queue def unbind(self, binding_exchange_queue_path: str) -> None: @@ -329,7 +325,7 @@ def unbind(self, binding_exchange_queue_path: str) -> None: def purge_queue(self, queue_name: str) -> int: logger.debug("purge_queue operation called") - path = purge_queue_address(queue_name) + path = AddressHelper.purge_queue_address(queue_name) response = self.request( None, @@ -344,7 +340,7 @@ def purge_queue(self, queue_name: str) -> int: def queue_info(self, queue_name: str) -> QueueInfo: logger.debug("queue_info operation called") - path = queue_address(queue_name) + path = AddressHelper.queue_address(queue_name) message = self.request( None, diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index 17398c8..4a2a7df 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -32,5 +32,17 @@ def apply(self, link: Link) -> None: link.properties = PropertyDict({symbol("paired"): True}) link.source.dynamic = False + +class ReceiverOptionUnsettled(LinkOption): # type: ignore + def __init__(self, addr: str): + self._addr = addr + + def apply(self, link: Link) -> None: + link.target.address = self._addr + link.snd_settle_mode = Link.SND_UNSETTLED + link.rcv_settle_mode = Link.RCV_FIRST + link.properties = PropertyDict({symbol("paired"): True}) + link.source.dynamic = False + def test(self, link: Link) -> bool: return bool(link.is_receiver) diff --git a/rabbitmq_amqp_python_client/qpid/proton/_handlers.py b/rabbitmq_amqp_python_client/qpid/proton/_handlers.py index 6bf5904..b27e002 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_handlers.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_handlers.py @@ -711,7 +711,7 @@ class MessagingHandler(Handler, Acking): def __init__( self, prefetch: int = 10, - auto_accept: bool = True, + auto_accept: bool = False, auto_settle: bool = True, peer_close_is_error: bool = False, ) -> None: diff --git a/rabbitmq_amqp_python_client/qpid/proton/_utils.py b/rabbitmq_amqp_python_client/qpid/proton/_utils.py index 38da406..29d0464 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_utils.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_utils.py @@ -272,6 +272,7 @@ def __init__( ) if credit: receiver.flow(credit) + self.fetcher = fetcher self.container = connection.container @@ -526,7 +527,7 @@ def create_receiver( handler=handler or fetcher, options=options, ), - fetcher, + handler or fetcher, credit=prefetch, ) @@ -552,7 +553,7 @@ def close(self) -> None: finally: self.conn.free() # Nothing left to block on. Allow reactor to clean up. - self.run() + # self.run() # why is this necessary here? if self.conn: self.conn.handler = None # break cyclical reference self.conn = None diff --git a/rabbitmq_amqp_python_client/utils.py b/rabbitmq_amqp_python_client/utils.py new file mode 100644 index 0000000..1804eb3 --- /dev/null +++ b/rabbitmq_amqp_python_client/utils.py @@ -0,0 +1,9 @@ +def validate_annotations(annotations: []) -> bool: # type: ignore + validated = True + for annotation in annotations: + if len(annotation) > 0 and annotation[:2] == "x-": + pass + else: + validated = False + return validated + return validated diff --git a/tests/conftest.py b/tests/conftest.py index 0929ca7..eb266cd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,12 @@ import pytest -from rabbitmq_amqp_python_client import Connection +from rabbitmq_amqp_python_client import ( + AddressHelper, + AMQPMessagingHandler, + Connection, + Event, + symbol, +) @pytest.fixture() @@ -25,3 +31,132 @@ def management(pytestconfig): finally: management.close() connection.close() + + +@pytest.fixture() +def consumer(pytestconfig): + connection = Connection("amqp://guest:guest@localhost:5672/") + connection.dial() + try: + queue_name = "test-queue" + addr_queue = AddressHelper.queue_address(queue_name) + consumer = connection.consumer(addr_queue) + yield consumer + + finally: + consumer.close() + connection.close() + + +class ConsumerTestException(BaseException): + # Constructor or Initializer + def __init__(self, msg: str): + self.msg = msg + + def __str__(self) -> str: + return repr(self.msg) + + +class MyMessageHandlerAccept(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._received = 0 + + def on_message(self, event: Event): + self.delivery_context.accept(event) + self._received = self._received + 1 + if self._received == 1000: + event.connection.close() + raise ConsumerTestException("consumed") + + +class MyMessageHandlerNoack(AMQPMessagingHandler): + + def __init__(self): + super().__init__(auto_settle=False) + self._received = 0 + + def on_message(self, event: Event): + self._received = self._received + 1 + if self._received == 1000: + event.receiver.close() + event.connection.close() + # Workaround to terminate the Consumer and notify the test when all messages are consumed + raise ConsumerTestException("consumed") + + +class MyMessageHandlerDiscard(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._received = 0 + + def on_message(self, event: Event): + self.delivery_context.discard(event) + self._received = self._received + 1 + if self._received == 1000: + event.connection.close() + raise ConsumerTestException("consumed") + + +class MyMessageHandlerDiscardWithAnnotations(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._received = 0 + + def on_message(self, event: Event): + annotations = {} + annotations[symbol("x-opt-string")] = "x-test1" + self.delivery_context.discard_with_annotations(event, annotations) + self._received = self._received + 1 + if self._received == 1000: + event.connection.close() + raise ConsumerTestException("consumed") + + +class MyMessageHandlerRequeue(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._received = 0 + + def on_message(self, event: Event): + self.delivery_context.requeue(event) + self._received = self._received + 1 + if self._received == 1000: + event.connection.close() + raise ConsumerTestException("consumed") + + +class MyMessageHandlerRequeueWithAnnotations(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._received = 0 + + def on_message(self, event: Event): + annotations = {} + annotations[symbol("x-opt-string")] = "x-test1" + self.delivery_context.requeue_with_annotations(event, annotations) + self._received = self._received + 1 + if self._received == 1000: + event.connection.close() + raise ConsumerTestException("consumed") + + +class MyMessageHandlerRequeueWithInvalidAnnotations(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._received = 0 + + def on_message(self, event: Event): + annotations = {} + annotations[symbol("invalid")] = "x-test1" + self.delivery_context.requeue_with_annotations(event, annotations) + self._received = self._received + 1 + if self._received == 1000: + event.connection.close() + raise ConsumerTestException("consumed") diff --git a/tests/test_address_helper.py b/tests/test_address_helper.py index 5ef5c70..6a6a044 100644 --- a/tests/test_address_helper.py +++ b/tests/test_address_helper.py @@ -1,13 +1,10 @@ -from rabbitmq_amqp_python_client import ( - exchange_address, - queue_address, -) +from rabbitmq_amqp_python_client import AddressHelper def test_encoding_queue_simple() -> None: queue = "my_queue" - address = queue_address(queue) + address = AddressHelper.queue_address(queue) assert address == "/queues/my_queue" @@ -15,7 +12,7 @@ def test_encoding_queue_simple() -> None: def test_encoding_queue_hex() -> None: queue = "my_queue>" - address = queue_address(queue) + address = AddressHelper.queue_address(queue) assert address == "/queues/my_queue%3E" @@ -23,6 +20,6 @@ def test_encoding_queue_hex() -> None: def test_encoding_exchange_hex() -> None: queue = "my_exchange/()" - address = exchange_address(queue) + address = AddressHelper.exchange_address(queue) assert address == "/exchanges/my_exchange%2F%28%29" diff --git a/tests/test_consumer.py b/tests/test_consumer.py new file mode 100644 index 0000000..ff276a7 --- /dev/null +++ b/tests/test_consumer.py @@ -0,0 +1,364 @@ +from rabbitmq_amqp_python_client import ( + AddressHelper, + ArgumentOutOfRangeException, + Connection, + QuorumQueueSpecification, +) + +from .conftest import ( + ConsumerTestException, + MyMessageHandlerAccept, + MyMessageHandlerDiscard, + MyMessageHandlerDiscardWithAnnotations, + MyMessageHandlerNoack, + MyMessageHandlerRequeue, + MyMessageHandlerRequeueWithAnnotations, + MyMessageHandlerRequeueWithInvalidAnnotations, +) +from .utils import ( + cleanup_dead_lettering, + create_connection, + publish_messages, + setup_dead_lettering, +) + + +def test_consumer_sync_queue_accept(connection: Connection) -> None: + + queue_name = "test-queue-sync-accept" + messages_to_send = 100 + management = connection.management() + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + addr_queue = AddressHelper.queue_address(queue_name) + consumer = connection.consumer(addr_queue) + + consumed = 0 + + # publish messages_to_send messages + publish_messages(connection, messages_to_send, queue_name) + + # consumer synchronously without handler + for i in range(messages_to_send): + message = consumer.consume() + if message.body == "test" + str(i): + consumed = consumed + 1 + + consumer.close() + + management.delete_queue(queue_name) + management.close() + + assert consumed > 0 + + +def test_consumer_async_queue_accept(connection: Connection) -> None: + + messages_to_send = 1000 + + queue_name = "test-queue-async-accept" + + management = connection.management() + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + addr_queue = AddressHelper.queue_address(queue_name) + + publish_messages(connection, messages_to_send, queue_name) + + # we closed the connection so we need to open a new one + connection_consumer = create_connection() + consumer = connection_consumer.consumer( + addr_queue, handler=MyMessageHandlerAccept() + ) + + try: + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + message_count = management.purge_queue(queue_name) + + management.delete_queue(queue_name) + + management.close() + + assert message_count == 0 + + +def test_consumer_async_queue_no_ack(connection: Connection) -> None: + + messages_to_send = 1000 + + queue_name = "test-queue-async-no-ack" + + management = connection.management() + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + addr_queue = AddressHelper.queue_address(queue_name) + + publish_messages(connection, messages_to_send, queue_name) + + # we closed the connection so we need to open a new one + connection_consumer = create_connection() + + consumer = connection_consumer.consumer(addr_queue, handler=MyMessageHandlerNoack()) + + try: + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + message_count = management.purge_queue(queue_name) + + management.delete_queue(queue_name) + + management.close() + + assert message_count > 0 + + +def test_consumer_async_queue_with_discard(connection: Connection) -> None: + messages_to_send = 1000 + + queue_dead_lettering = "queue-dead-letter" + queue_name = "test-queue-async-discard" + exchange_dead_lettering = "exchange-dead-letter" + binding_key = "key-dead-letter" + + management = connection.management() + + # configuring dead lettering + bind_path = setup_dead_lettering(management) + addr_queue = AddressHelper.queue_address(queue_name) + + management.declare_queue( + QuorumQueueSpecification( + name=queue_name, + dead_letter_exchange=exchange_dead_lettering, + dead_letter_routing_key=binding_key, + ) + ) + + publish_messages(connection, messages_to_send, queue_name) + + # we closed the connection so we need to open a new one + connection_consumer = create_connection() + + consumer = connection_consumer.consumer( + addr_queue, handler=MyMessageHandlerDiscard() + ) + + try: + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + message_count = management.purge_queue(queue_name) + + management.delete_queue(queue_name) + + message_count_dead_lettering = management.purge_queue(queue_dead_lettering) + + cleanup_dead_lettering(management, bind_path) + + management.close() + + assert message_count == 0 + # check dead letter queue + assert message_count_dead_lettering == messages_to_send + + +def test_consumer_async_queue_with_discard_with_annotations( + connection: Connection, +) -> None: + messages_to_send = 1000 + + queue_dead_lettering = "queue-dead-letter" + queue_name = "test-queue-async-discard" + exchange_dead_lettering = "exchange-dead-letter" + binding_key = "key-dead-letter" + + management = connection.management() + + management.declare_queue( + QuorumQueueSpecification( + name=queue_name, + dead_letter_exchange=exchange_dead_lettering, + dead_letter_routing_key=binding_key, + ) + ) + + publish_messages(connection, messages_to_send, queue_name) + + bind_path = setup_dead_lettering(management) + addr_queue = AddressHelper.queue_address(queue_name) + addr_queue_dl = AddressHelper.queue_address(queue_dead_lettering) + + # we closed the connection so we need to open a new one + connection_consumer = create_connection() + + consumer = connection_consumer.consumer( + addr_queue, handler=MyMessageHandlerDiscardWithAnnotations() + ) + + try: + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + # check for added annotation + new_consumer = connection.consumer(addr_queue_dl) + message = new_consumer.consume() + new_consumer.close() + + message_count = management.purge_queue(queue_name) + + management.delete_queue(queue_name) + + message_count_dead_lettering = management.purge_queue(queue_dead_lettering) + + cleanup_dead_lettering(management, bind_path) + + management.close() + + assert "x-opt-string" in message.annotations + + assert message_count == 0 + # check dead letter queue + assert message_count_dead_lettering == messages_to_send + + +def test_consumer_async_queue_with_requeue(connection: Connection) -> None: + messages_to_send = 1000 + + queue_name = "test-queue-async-requeue" + + management = connection.management() + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + addr_queue = AddressHelper.queue_address(queue_name) + + publish_messages(connection, messages_to_send, queue_name) + + # we closed the connection so we need to open a new one + connection_consumer = create_connection() + + consumer = connection_consumer.consumer( + addr_queue, handler=MyMessageHandlerRequeue() + ) + + try: + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + message_count = management.purge_queue(queue_name) + + management.delete_queue(queue_name) + management.close() + + assert message_count > 0 + + +def test_consumer_async_queue_with_requeue_with_annotations( + connection: Connection, +) -> None: + messages_to_send = 1000 + + queue_name = "test-queue-async-requeue" + + management = connection.management() + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + addr_queue = AddressHelper.queue_address(queue_name) + + publish_messages(connection, messages_to_send, queue_name) + + # we closed the connection so we need to open a new one + connection_consumer = create_connection() + + consumer = connection_consumer.consumer( + addr_queue, handler=MyMessageHandlerRequeueWithAnnotations() + ) + + try: + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + consumer.close() + + # check for added annotation + new_consumer = connection.consumer(addr_queue) + message = new_consumer.consume() + new_consumer.close() + + message_count = management.purge_queue(queue_name) + + management.delete_queue(queue_name) + management.close() + + assert "x-opt-string" in message.annotations + + assert message_count > 0 + + +def test_consumer_async_queue_with_requeue_with_invalid_annotations( + connection: Connection, +) -> None: + messages_to_send = 1000 + test_failure = True + + queue_name = "test-queue-async-requeue" + + management = connection.management() + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + addr_queue = AddressHelper.queue_address(queue_name) + + publish_messages(connection, messages_to_send, queue_name) + + # we closed the connection so we need to open a new one + connection_consumer = create_connection() + + try: + consumer = connection_consumer.consumer( + addr_queue, handler=MyMessageHandlerRequeueWithInvalidAnnotations() + ) + + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + except ArgumentOutOfRangeException: + test_failure = False + + consumer.close() + + management.delete_queue(queue_name) + management.close() + + assert test_failure is False diff --git a/tests/test_management.py b/tests/test_management.py index a810368..6c5de51 100644 --- a/tests/test_management.py +++ b/tests/test_management.py @@ -55,8 +55,6 @@ def test_bind_exchange_to_queue(management: Management) -> None: ) ) - print(binding_exchange_queue_path) - assert ( binding_exchange_queue_path == "/bindings/src=" diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 048d7a9..0cc2a57 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -1,12 +1,10 @@ -import time - from rabbitmq_amqp_python_client import ( + AddressHelper, BindingSpecification, Connection, ExchangeSpecification, Message, QuorumQueueSpecification, - exchange_address, ) @@ -25,13 +23,13 @@ def test_publish_queue(connection: Connection) -> None: except Exception: raised = True - assert raised is False - publisher.close() management.delete_queue(queue_name) management.close() + assert raised is False + def test_publish_exchange(connection: Connection) -> None: @@ -52,7 +50,7 @@ def test_publish_exchange(connection: Connection) -> None: ) ) - addr = exchange_address(exchange_name, routing_key) + addr = AddressHelper.exchange_address(exchange_name, routing_key) raised = False @@ -62,18 +60,17 @@ def test_publish_exchange(connection: Connection) -> None: except Exception: raised = True - assert raised is False - publisher.close() management.delete_exchange(exchange_name) management.delete_queue(queue_name) management.close() + assert raised is False + def test_publish_purge(connection: Connection) -> None: - connection = Connection("amqp://guest:guest@localhost:5672/") - connection.dial() + messages_to_publish = 20 queue_name = "test-queue" management = connection.management() @@ -84,19 +81,17 @@ def test_publish_purge(connection: Connection) -> None: try: publisher = connection.publisher("/queues/" + queue_name) - for i in range(20): + for i in range(messages_to_publish): publisher.publish(Message(body="test")) except Exception: raised = True - time.sleep(4) + publisher.close() message_purged = management.purge_queue(queue_name) - assert raised is False - assert message_purged == 20 - - publisher.close() - management.delete_queue(queue_name) management.close() + + assert raised is False + assert message_purged == 20 diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..b9b3b29 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,60 @@ +from rabbitmq_amqp_python_client import ( + BindingSpecification, + Connection, + ExchangeSpecification, + ExchangeType, + Management, + Message, + QuorumQueueSpecification, +) + + +def create_connection() -> Connection: + connection_consumer = Connection("amqp://guest:guest@localhost:5672/") + connection_consumer.dial() + + return connection_consumer + + +def publish_messages(connection: Connection, messages_to_send: int, queue_name) -> None: + publisher = connection.publisher("/queues/" + queue_name) + # publish messages_to_send messages + for i in range(messages_to_send): + publisher.publish(Message(body="test" + str(i))) + publisher.close() + + +def setup_dead_lettering(management: Management) -> str: + + exchange_dead_lettering = "exchange-dead-letter" + queue_dead_lettering = "queue-dead-letter" + binding_key = "key_dead_letter" + + # configuring dead lettering + management.declare_exchange( + ExchangeSpecification( + name=exchange_dead_lettering, + exchange_type=ExchangeType.fanout, + arguments={}, + ) + ) + management.declare_queue(QuorumQueueSpecification(name=queue_dead_lettering)) + bind_path = management.bind( + BindingSpecification( + source_exchange=exchange_dead_lettering, + destination_queue=queue_dead_lettering, + binding_key=binding_key, + ) + ) + + return bind_path + + +def cleanup_dead_lettering(management: Management, bind_path: str) -> None: + + exchange_dead_lettering = "exchange-dead-letter" + queue_dead_lettering = "queue-dead-letter" + + management.unbind(bind_path) + management.delete_exchange(exchange_dead_lettering) + management.delete_queue(queue_dead_lettering)