Skip to content
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9d16acd
connection layer implementation
Dec 6, 2024
5a83c7f
WIP
Dec 6, 2024
d89b75e
implementing Declare Exchange/Queue
Dec 9, 2024
f6e103b
test for gabriele
Dec 10, 2024
2365b53
body encodiding
Gsantomaggio Dec 10, 2024
34fccee
adding basic tests
Dec 10, 2024
d4e4ccf
implementing bind
Dec 11, 2024
2097a63
adding user-defined exception
Dec 12, 2024
79a73cd
Adding debugging info
Dec 12, 2024
e0be74f
publisher implementation
Dec 12, 2024
1d28d64
adding publisher basic test
Dec 13, 2024
32ded40
improve help_address utility functions
Dec 16, 2024
9043c94
modify example
Jan 7, 2025
078ca17
integrate qpid-proton lib
Jan 7, 2025
b01834d
implementing purge operation
Jan 8, 2025
4a953c7
improving url helper
Jan 8, 2025
6c86328
cleaning up qpid unecessary folders and files
Jan 8, 2025
f8fd4c2
some improvements
Jan 9, 2025
a031ce4
implementing queue_info
Jan 9, 2025
078b70d
fixing queue arguments management
Jan 9, 2025
ca3c9e6
better management of arguments
Jan 9, 2025
43efb26
improved arguments management during declare_queue
Jan 10, 2025
12b73d7
adding purge test
Jan 10, 2025
ecbc423
adding fixtures in tests
Jan 10, 2025
4d1c5d5
adding a publisher test
Jan 13, 2025
056d455
removing useless queue_type parameter
Jan 13, 2025
84a224e
removing receiver from publisher
Jan 13, 2025
4de3ece
consumer implementation
Jan 14, 2025
8465bf3
Implementing Connection, Management and Publisher modules (#10)
DanielePalaia Jan 14, 2025
1be8172
Merge branch 'consumer_implementation' into consumer_imp
Jan 14, 2025
0a3e6d9
consumer implementation
Jan 14, 2025
b1975af
modification for testing purpose
Jan 15, 2025
da56b4c
change option
Gsantomaggio Jan 15, 2025
03faa76
adding tests and improvementes
Jan 17, 2025
4335862
improving ack implementation
Jan 17, 2025
4512408
adding requeue with annotations test
Jan 18, 2025
2eb1a96
adding dead lettering test
Jan 18, 2025
148c436
adding discard with annotations test
Jan 18, 2025
71c532d
refactoring tests
Jan 18, 2025
2983576
improving AddressHelper utility functions
Jan 19, 2025
0db39d3
Change the class name for the consumer
Gsantomaggio Jan 20, 2025
088eefd
adding validations for annotations
Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 95 additions & 9 deletions examples/getting_started/main.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,89 @@
# type: ignore


from rabbitmq_amqp_python_client import (
AddressHelper,
BindingSpecification,
Connection,
DeliveryConsumerHandler,
Event,
ExchangeSpecification,
Message,
QuorumQueueSpecification,
exchange_address,
)


class MyMessageHandler(DeliveryConsumerHandler):

def __init__(self):
super().__init__()
self._count = 0

def on_message(self, event: Event):
print("received message: " + str(event.message.annotations))

# accepting
self.delivery_context.accept(event)

# in case of rejection (+eventually deadlettering)
# self.delivery_context.discard(event)

# in case of requeuing
# self.delivery_context.requeue(event)

# annotations = {}
# annotations[symbol('x-opt-string')] = 'x-test1'
# in case of requeuing with annotations added
# self.delivery_context.requeue_with_annotations(event, annotations)

# in case of rejection with annotations added
# self.delivery_context.discard_with_annotations(event)

print("count " + str(self._count))

self._count = self._count + 1

if self._count == 100:
print("closing receiver")
# if you want you can add cleanup operations here
# event.receiver.close()
# event.connection.close()

def on_connection_closed(self, event: Event):
# if you want you can add cleanup operations here
print("connection closed")

def on_link_closed(self, event: Event) -> None:
# if you want you can add cleanup operations here
print("link closed")


def create_connection() -> Connection:
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()

return connection


def main() -> None:

exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"
connection = Connection("amqp://guest:guest@localhost:5672/")
messages_to_publish = 100

print("connection to amqp server")
connection.dial()
connection = create_connection()

management = connection.management()

print("declaring exchange and queue")
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))

management.declare_queue(QuorumQueueSpecification(name=queue_name))
management.declare_queue(
QuorumQueueSpecification(name=queue_name)
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
)

print("binding queue to exchange")
bind_name = management.bind(
Expand All @@ -33,21 +94,44 @@ def main() -> None:
)
)

addr = exchange_address(exchange_name, routing_key)
addr = AddressHelper.exchange_address(exchange_name, routing_key)

addr_queue = AddressHelper.queue_address(queue_name)

print("create a publisher and publish a test message")
publisher = connection.publisher(addr)

publisher.publish(Message(body="test"))
print("purging the queue")
messages_purged = management.purge_queue(queue_name)

print("messages purged: " + str(messages_purged))
# management.close()

# publish 10 messages
for i in range(messages_to_publish):
publisher.publish(Message(body="test"))

publisher.close()

print(
"create a consumer and consume the test message - press control + c to terminate to consume"
)
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())

try:
consumer.run()
except KeyboardInterrupt:
pass

print("cleanup")
consumer.close()
# once we finish consuming if we close the connection we need to create a new one
# connection = create_connection()
# management = connection.management()

print("unbind")
management.unbind(bind_name)

print("purging the queue")
management.purge_queue(queue_name)

print("delete queue")
management.delete_queue(queue_name)

Expand All @@ -56,7 +140,9 @@ def main() -> None:

print("closing connections")
management.close()
print("after management closing")
connection.close()
print("after connection closing")


if __name__ == "__main__":
Expand Down
22 changes: 18 additions & 4 deletions rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
from importlib import metadata

from .address_helper import exchange_address, queue_address
from .common import QueueType
from .address_helper import AddressHelper
from .common import ExchangeType, QueueType
from .connection import Connection
from .consumer import Consumer
from .delivery_consumer_handler import (
DeliveryConsumerHandler,
)
from .entities import (
BindingSpecification,
ExchangeSpecification,
)
from .management import Management
from .publisher import Publisher
from .qpid.proton._data import symbol # noqa: E402
from .qpid.proton._delivery import Delivery
from .qpid.proton._events import Event
from .qpid.proton._message import Message
from .qpid.proton.handlers import MessagingHandler
from .queues import (
ClassicQueueSpecification,
QuorumQueueSpecification,
Expand All @@ -35,7 +43,13 @@
"BindingSpecification",
"QueueType",
"Publisher",
"exchange_address",
"queue_address",
"Message",
"Consumer",
"MessagingHandler",
"Event",
"Delivery",
"symbol",
"ExchangeType",
"AddressHelper",
"DeliveryConsumerHandler",
]
83 changes: 44 additions & 39 deletions rabbitmq_amqp_python_client/address_helper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .entities import BindingSpecification


def is_unreserved(char: str) -> bool:
def _is_unreserved(char: str) -> bool:
# According to RFC 3986, unreserved characters are A-Z, a-z, 0-9, '-', '.', '_', and '~'
return char.isalnum() or char in "-._~"

Expand All @@ -12,7 +12,7 @@ def encode_path_segment(input_string: str) -> str:
# Iterate over each character in the input string
for char in input_string:
# Check if the character is an unreserved character
if is_unreserved(char):
if _is_unreserved(char):
encoded.append(char) # Append as is
else:
# Encode character to %HH format
Expand All @@ -21,49 +21,54 @@ def encode_path_segment(input_string: str) -> str:
return "".join(encoded)


def exchange_address(exchange_name: str, routing_key: str = "") -> str:
if routing_key == "":
path = "/exchanges/" + encode_path_segment(exchange_name)
else:
path = (
"/exchanges/"
+ encode_path_segment(exchange_name)
+ "/"
+ encode_path_segment(routing_key)
)

return path

class AddressHelper:

def queue_address(queue_name: str) -> str:
path = "/queues/" + encode_path_segment(queue_name)

return path
@staticmethod
def exchange_address(exchange_name: str, routing_key: str = "") -> str:
if routing_key == "":
path = "/exchanges/" + encode_path_segment(exchange_name)
else:
path = (
"/exchanges/"
+ encode_path_segment(exchange_name)
+ "/"
+ encode_path_segment(routing_key)
)

return path

def purge_queue_address(queue_name: str) -> str:
path = "/queues/" + encode_path_segment(queue_name) + "/messages"
@staticmethod
def queue_address(queue_name: str) -> str:
path = "/queues/" + encode_path_segment(queue_name)

return path
return path

@staticmethod
def purge_queue_address(queue_name: str) -> str:
path = "/queues/" + encode_path_segment(queue_name) + "/messages"

def path_address() -> str:
path = "/bindings"
return path

return path
@staticmethod
def path_address() -> str:
path = "/bindings"

return path

def binding_path_with_exchange_queue(bind_specification: BindingSpecification) -> str:
binding_path_wth_exchange_queue_key = (
"/bindings"
+ "/"
+ "src="
+ encode_path_segment(bind_specification.source_exchange)
+ ";"
+ "dstq="
+ encode_path_segment(bind_specification.destination_queue)
+ ";key="
+ encode_path_segment(bind_specification.binding_key)
+ ";args="
)
return binding_path_wth_exchange_queue_key
@staticmethod
def binding_path_with_exchange_queue(
bind_specification: BindingSpecification,
) -> str:
binding_path_wth_exchange_queue_key = (
"/bindings"
+ "/"
+ "src="
+ encode_path_segment(bind_specification.source_exchange)
+ ";"
+ "dstq="
+ encode_path_segment(bind_specification.destination_queue)
+ ";key="
+ encode_path_segment(bind_specification.binding_key)
+ ";args="
)
return binding_path_wth_exchange_queue_key
11 changes: 11 additions & 0 deletions rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
from typing import Optional

from .consumer import Consumer
from .management import Management
from .publisher import Publisher
from .qpid.proton._handlers import MessagingHandler
from .qpid.proton.utils import BlockingConnection

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -29,8 +32,16 @@ def management(self) -> Management:
# closes the connection to the AMQP 1.0 server.
def close(self) -> None:
logger.debug("Closing connection")
print("closing connection")
self._conn.close()
print("after closing connection")

def publisher(self, destination: str) -> Publisher:
publisher = Publisher(self._conn, destination)
return publisher

def consumer(
self, destination: str, handler: Optional[MessagingHandler] = None
) -> Consumer:
consumer = Consumer(self._conn, destination, handler)
return consumer
54 changes: 54 additions & 0 deletions rabbitmq_amqp_python_client/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging
from typing import Optional

from .options import ReceiverOptionUnsettled
from .qpid.proton._handlers import MessagingHandler
from .qpid.proton._message import Message
from .qpid.proton.utils import (
BlockingConnection,
BlockingReceiver,
)

logger = logging.getLogger(__name__)


class Consumer:
def __init__(
self,
conn: BlockingConnection,
addr: str,
handler: Optional[MessagingHandler] = None,
):
self._receiver: Optional[BlockingReceiver] = None
self._conn = conn
self._addr = addr
self._handler = handler
self._open()

def _open(self) -> None:
if self._receiver is None:
logger.debug("Creating Sender")
self._receiver = self._create_receiver(self._addr)

def consume(self) -> Message:
if self._receiver is not None:
return self._receiver.receive()

def close(self) -> None:
logger.debug("Closing the receiver")
if self._receiver is not None:
self._receiver.close()

def run(self) -> None:
if self._receiver is not None:
self._receiver.container.run()

def stop(self) -> None:
if self._receiver is not None:
self._receiver.container.stop_events()
self._receiver.container.stop()

def _create_receiver(self, addr: str) -> BlockingReceiver:
return self._conn.create_receiver(
addr, options=ReceiverOptionUnsettled(addr), handler=self._handler
)
10 changes: 10 additions & 0 deletions rabbitmq_amqp_python_client/delivery_consumer_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .delivery_context import DeliveryContext
from .qpid.proton.handlers import MessagingHandler


class DeliveryConsumerHandler(MessagingHandler): # type: ignore

def __init__(self, auto_accept: bool = False, auto_settle: bool = True):
super().__init__(auto_accept=auto_accept, auto_settle=auto_settle)
self._count = 0
self.delivery_context: DeliveryContext = DeliveryContext()
Loading