Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ python-qpid-proton = "^0.39.0"
flake8 = "^7.1.1"
isort = "^5.9.3"
mypy = "^0.910"
pytest = "^7.4.0"
pytest = "^8.3.4"
black = "^24.3.0"
python-qpid-proton = "^0.39.0"
requests = "^2.31.0"
Expand Down
62 changes: 62 additions & 0 deletions rabbitmq_amqp_python_client/address_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,25 @@ def encode_path_segment(input_string: Optional[str]) -> str:


class AddressHelper:
"""
Helper class for constructing and managing AMQP addresses.
This class provides static methods for creating properly formatted addresses
for various AMQP operations including exchanges, queues, and bindings.
"""

@staticmethod
def exchange_address(exchange_name: str, routing_key: str = "") -> str:
"""
Create an address for an exchange, optionally with a routing key.
Args:
exchange_name: The name of the exchange
routing_key: Optional routing key
Returns:
str: The formatted exchange address
"""
if routing_key == "":
path = "/exchanges/" + encode_path_segment(exchange_name)
else:
Expand All @@ -48,12 +64,30 @@ def exchange_address(exchange_name: str, routing_key: str = "") -> str:

@staticmethod
def queue_address(name: str) -> str:
"""
Create an address for a queue.
Args:
name: The name of the queue
Returns:
str: The formatted queue address
"""
path = "/queues/" + encode_path_segment(name)

return path

@staticmethod
def purge_queue_address(name: str) -> str:
"""
Create an address for purging a queue.
Args:
name: The name of the queue to purge
Returns:
str: The formatted purge queue address
"""
path = "/queues/" + encode_path_segment(name) + "/messages"

return path
Expand All @@ -68,6 +102,15 @@ def path_address() -> str:
def binding_path_with_exchange_queue(
bind_specification: ExchangeToQueueBindingSpecification,
) -> str:
"""
Create a binding path for an exchange-to-queue binding.
Args:
bind_specification: The specification for the binding
Returns:
str: The formatted binding path
"""
if bind_specification.binding_key is not None:
key = ";key=" + encode_path_segment(bind_specification.binding_key)
else:
Expand All @@ -90,6 +133,15 @@ def binding_path_with_exchange_queue(
def binding_path_with_exchange_exchange(
bind_specification: ExchangeToExchangeBindingSpecification,
) -> str:
"""
Create a binding path for an exchange-to-exchange binding.
Args:
bind_specification: The specification for the binding
Returns:
str: The formatted binding path
"""
binding_path_wth_exchange_exchange_key = (
"/bindings"
+ "/"
Expand All @@ -106,6 +158,16 @@ def binding_path_with_exchange_exchange(

@staticmethod
def message_to_address_helper(message: Message, address: str) -> Message:
"""
Set the address on a message.
Args:
message: The message to modify
address: The address to set
Returns:
Message: The modified message with the new address
"""
message.address = address
return message

Expand Down
65 changes: 65 additions & 0 deletions rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@


class Connection:
"""
Main connection class for interacting with RabbitMQ via AMQP 1.0 protocol.

This class manages the connection to RabbitMQ and provides factory methods for
creating publishers, consumers, and management interfaces. It supports both
single-node and multi-node configurations, as well as SSL/TLS connections.

"""

def __init__(
self,
# single-node mode
Expand All @@ -28,6 +37,18 @@ def __init__(
ssl_context: Optional[SslConfigurationContext] = None,
on_disconnection_handler: Optional[CB] = None, # type: ignore
):
"""
Initialize a new Connection instance.

Args:
uri: Single node connection URI
uris: List of URIs for multi-node setup
ssl_context: SSL configuration for secure connections
on_disconnection_handler: Callback for handling disconnection events

Raises:
ValueError: If neither uri nor uris is provided
"""
if uri is None and uris is None:
raise ValueError("You need to specify at least an addr or a list of addr")
self._addr: Optional[str] = uri
Expand All @@ -44,6 +65,12 @@ def _set_environment_connection_list(self, connections: []): # type: ignore
self._connections = connections

def dial(self) -> None:
"""
Establish a connection to the AMQP server.

Configures SSL if specified and establishes the connection using the
provided URI(s). Also initializes the management interface.
"""
logger.debug("Establishing a connection to the amqp server")
if self._conf_ssl_context is not None:
logger.debug("Enabling SSL")
Expand Down Expand Up @@ -74,15 +101,38 @@ def _open(self) -> None:
self._management.open()

def management(self) -> Management:
"""
Get the management interface for this connection.

Returns:
Management: The management interface for performing administrative tasks
"""
return self._management

# closes the connection to the AMQP 1.0 server.
def close(self) -> None:
"""
Close the connection to the AMQP 1.0 server.

Closes the underlying connection and removes it from the connection list.
"""
logger.debug("Closing connection")
self._conn.close()
self._connections.remove(self)

def publisher(self, destination: str = "") -> Publisher:
"""
Create a new publisher instance.

Args:
destination: Optional default destination for published messages

Returns:
Publisher: A new publisher instance

Raises:
ArgumentOutOfRangeException: If destination address format is invalid
"""
if destination != "":
if validate_address(destination) is False:
raise ArgumentOutOfRangeException(
Expand All @@ -98,6 +148,21 @@ def consumer(
stream_filter_options: Optional[StreamOptions] = None,
credit: Optional[int] = None,
) -> Consumer:
"""
Create a new consumer instance.

Args:
destination: The address to consume from
message_handler: Optional handler for processing messages
stream_filter_options: Optional configuration for stream consumption
credit: Optional credit value for flow control

Returns:
Consumer: A new consumer instance

Raises:
ArgumentOutOfRangeException: If destination address format is invalid
"""
if validate_address(destination) is False:
raise ArgumentOutOfRangeException(
"destination address must start with /queues or /exchanges"
Expand Down
57 changes: 57 additions & 0 deletions rabbitmq_amqp_python_client/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@


class Consumer:
"""
A consumer class for receiving messages from RabbitMQ via AMQP 1.0 protocol.

This class handles the consumption of messages from a specified address in RabbitMQ.
It supports both standard message consumption and stream-based consumption with
optional filtering capabilities.

Attributes:
_receiver (Optional[BlockingReceiver]): The receiver for consuming messages
_conn (BlockingConnection): The underlying connection to RabbitMQ
_addr (str): The address to consume from
_handler (Optional[MessagingHandler]): Optional message handling callback
_stream_options (Optional[StreamOptions]): Configuration for stream consumption
_credit (Optional[int]): Flow control credit value
"""

def __init__(
self,
conn: BlockingConnection,
Expand All @@ -25,6 +41,16 @@ def __init__(
stream_options: Optional[StreamOptions] = None,
credit: Optional[int] = None,
):
"""
Initialize a new Consumer instance.

Args:
conn: The blocking connection to use for consuming
addr: The address to consume from
handler: Optional message handler for processing received messages
stream_options: Optional configuration for stream-based consumption
credit: Optional credit value for flow control
"""
self._receiver: Optional[BlockingReceiver] = None
self._conn = conn
self._addr = addr
Expand All @@ -39,21 +65,52 @@ def _open(self) -> None:
self._receiver = self._create_receiver(self._addr)

def consume(self, timeout: Union[None, Literal[False], float] = False) -> Message:
"""
Consume a message from the queue.

Args:
timeout: The time to wait for a message.
None: Defaults to 60s
float: Wait for specified number of seconds

Returns:
Message: The received message

Note:
The return type might be None if no message is available and timeout occurs,
but this is handled by the cast to Message.
"""
if self._receiver is not None:
message = self._receiver.receive(timeout=timeout)
return cast(Message, message)

def close(self) -> None:
"""
Close the consumer connection.

Closes the receiver if it exists and cleans up resources.
"""
logger.debug("Closing the receiver")
if self._receiver is not None:
self._receiver.close()

def run(self) -> None:
"""
Run the consumer in continuous mode.

Starts the consumer's container to process messages continuously.
"""
logger.debug("Running the consumer: starting to consume")
if self._receiver is not None:
self._receiver.container.run()

def stop(self) -> None:
"""
Stop the consumer's continuous processing.

Stops the consumer's container, halting message processing.
This should be called to cleanly stop a consumer that was started with run().
"""
logger.debug("Stopping the consumer: starting to consume")
if self._receiver is not None:
self._receiver.container.stop_events()
Expand Down
Loading