Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/getting_started/getting_started.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
AddressHelper,
AMQPMessagingHandler,
BindingSpecification,
Connection,
Environment,
Event,
ExchangeSpecification,
ExchangeToQueueBindingSpecification,
Message,
OutcomeState,
QuorumQueueSpecification,
Expand Down Expand Up @@ -102,7 +102,7 @@ def main() -> None:

print("binding queue to exchange")
bind_name = management.bind(
BindingSpecification(
ExchangeToQueueBindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_name,
binding_key=routing_key,
Expand Down
4 changes: 2 additions & 2 deletions examples/reconnection/reconnection_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
from rabbitmq_amqp_python_client import (
AddressHelper,
AMQPMessagingHandler,
BindingSpecification,
Connection,
ConnectionClosed,
Consumer,
Environment,
Event,
ExchangeSpecification,
ExchangeToQueueBindingSpecification,
Management,
Message,
Publisher,
Expand Down Expand Up @@ -160,7 +160,7 @@ def main() -> None:

print("binding queue to exchange")
bind_name = connection_configuration.management.bind(
BindingSpecification(
ExchangeToQueueBindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_name,
binding_key=routing_key,
Expand Down
4 changes: 2 additions & 2 deletions examples/tls/tls_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
AddressHelper,
AMQPMessagingHandler,
BindingSpecification,
ClientCert,
Connection,
Environment,
Event,
ExchangeSpecification,
ExchangeToQueueBindingSpecification,
Message,
QuorumQueueSpecification,
SslConfigurationContext,
Expand Down Expand Up @@ -102,7 +102,7 @@ def main() -> None:

print("binding queue to exchange")
bind_name = management.bind(
BindingSpecification(
ExchangeToQueueBindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_name,
binding_key=routing_key,
Expand Down
6 changes: 4 additions & 2 deletions rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from .connection import Connection
from .consumer import Consumer
from .entities import (
BindingSpecification,
ExchangeSpecification,
ExchangeToExchangeBindingSpecification,
ExchangeToQueueBindingSpecification,
OffsetSpecification,
StreamOptions,
)
Expand Down Expand Up @@ -52,7 +53,8 @@
"QuorumQueueSpecification",
"ClassicQueueSpecification",
"StreamSpecification",
"BindingSpecification",
"ExchangeToQueueBindingSpecification",
"ExchangeToExchangeBindingSpecification",
"QueueType",
"Publisher",
"Message",
Expand Down
54 changes: 42 additions & 12 deletions rabbitmq_amqp_python_client/address_helper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from .entities import BindingSpecification
from typing import Optional

from .entities import (
ExchangeToExchangeBindingSpecification,
ExchangeToQueueBindingSpecification,
)
from .qpid.proton._message import Message


Expand All @@ -7,19 +12,22 @@ def _is_unreserved(char: str) -> bool:
return char.isalnum() or char in "-._~"


def encode_path_segment(input_string: str) -> str:
def encode_path_segment(input_string: Optional[str]) -> str:
encoded = []

# Iterate over each character in the input string
for char in input_string:
# Check if the character is an unreserved character
if _is_unreserved(char):
encoded.append(char) # Append as is
else:
# Encode character to %HH format
encoded.append(f"%{ord(char):02X}")
if input_string is not None:
for char in input_string:
# Check if the character is an unreserved character
if _is_unreserved(char):
encoded.append(char) # Append as is
else:
# Encode character to %HH format
encoded.append(f"%{ord(char):02X}")

return "".join(encoded)

return "".join(encoded)
return ""


class AddressHelper:
Expand Down Expand Up @@ -58,8 +66,13 @@ def path_address() -> str:

@staticmethod
def binding_path_with_exchange_queue(
bind_specification: BindingSpecification,
bind_specification: ExchangeToQueueBindingSpecification,
) -> str:
if bind_specification.binding_key is not None:
key = ";key=" + encode_path_segment(bind_specification.binding_key)
else:
key = ";key="

binding_path_wth_exchange_queue_key = (
"/bindings"
+ "/"
Expand All @@ -68,11 +81,28 @@ def binding_path_with_exchange_queue(
+ ";"
+ "dstq="
+ encode_path_segment(bind_specification.destination_queue)
+ key
+ ";args="
)
return binding_path_wth_exchange_queue_key

@staticmethod
def binding_path_with_exchange_exchange(
bind_specification: ExchangeToExchangeBindingSpecification,
) -> str:
binding_path_wth_exchange_exchange_key = (
"/bindings"
+ "/"
+ "src="
+ encode_path_segment(bind_specification.source_exchange)
+ ";"
+ "dstq="
+ encode_path_segment(bind_specification.destination_exchange)
+ ";key="
+ encode_path_segment(bind_specification.binding_key)
+ ";args="
)
return binding_path_wth_exchange_queue_key
return binding_path_wth_exchange_exchange_key

@staticmethod
def message_to_address_helper(message: Message, address: str) -> Message:
Expand Down
19 changes: 13 additions & 6 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,32 @@ class OffsetSpecification(Enum):


@dataclass
class BindingSpecification:
class ExchangeToQueueBindingSpecification:
source_exchange: str
destination_queue: str
binding_key: str
binding_key: Optional[str] = None


@dataclass
class ExchangeToExchangeBindingSpecification:
source_exchange: str
destination_exchange: str
binding_key: Optional[str] = None


class StreamOptions:

def __init__(self): # type: ignore
self._filter_set: Dict[symbol, Described] = {}

def offset(self, offset_spefication: Union[OffsetSpecification, int]) -> None:
if isinstance(offset_spefication, int):
def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
if isinstance(offset_specification, int):
self._filter_set[symbol(STREAM_OFFSET_SPEC)] = Described(
symbol(STREAM_OFFSET_SPEC), offset_spefication
symbol(STREAM_OFFSET_SPEC), offset_specification
)
else:
self._filter_set[symbol(STREAM_OFFSET_SPEC)] = Described(
symbol(STREAM_OFFSET_SPEC), offset_spefication.name
symbol(STREAM_OFFSET_SPEC), offset_specification.name
)

def filter_values(self, filters: list[str]) -> None:
Expand Down
60 changes: 50 additions & 10 deletions rabbitmq_amqp_python_client/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from .address_helper import AddressHelper
from .common import CommonValues, QueueType
from .entities import (
BindingSpecification,
ExchangeSpecification,
ExchangeToExchangeBindingSpecification,
ExchangeToQueueBindingSpecification,
QueueInfo,
)
from .exceptions import ValidationCodeException
Expand Down Expand Up @@ -301,12 +302,25 @@ def _validate_reponse_code(
"wrong response code received: " + str(response_code)
)

def bind(self, bind_specification: BindingSpecification) -> str:
def bind(
self,
bind_specification: Union[
ExchangeToQueueBindingSpecification, ExchangeToExchangeBindingSpecification
],
) -> str:
logger.debug("Bind Operation called")

body = {}
body["binding_key"] = bind_specification.binding_key
if bind_specification.binding_key is not None:
body["binding_key"] = bind_specification.binding_key
else:
body["binding_key"] = ""
body["source"] = bind_specification.source_exchange
body["destination_queue"] = bind_specification.destination_queue
if isinstance(bind_specification, ExchangeToQueueBindingSpecification):
body["destination_queue"] = bind_specification.destination_queue
elif isinstance(bind_specification, ExchangeToExchangeBindingSpecification):
body["destination_exchange"] = bind_specification.destination_exchange

body["arguments"] = {} # type: ignore

path = AddressHelper.path_address()
Expand All @@ -319,17 +333,43 @@ def bind(self, bind_specification: BindingSpecification) -> str:
CommonValues.response_code_204.value,
],
)
binding_path = ""

binding_path_with_queue = AddressHelper.binding_path_with_exchange_queue(
bind_specification
)
return binding_path_with_queue
if isinstance(bind_specification, ExchangeToQueueBindingSpecification):
binding_path = AddressHelper.binding_path_with_exchange_queue(
bind_specification
)
elif isinstance(bind_specification, ExchangeToExchangeBindingSpecification):
binding_path = AddressHelper.binding_path_with_exchange_exchange(
bind_specification
)

def unbind(self, binding_exchange_queue_path: str) -> None:
return binding_path

def unbind(
self,
bind_specification: Union[
str,
ExchangeToQueueBindingSpecification,
ExchangeToExchangeBindingSpecification,
],
) -> None:
logger.debug("UnBind Operation called")
binding_name = ""
if isinstance(bind_specification, str):
binding_name = bind_specification
else:
if isinstance(bind_specification, ExchangeToQueueBindingSpecification):
binding_name = AddressHelper.binding_path_with_exchange_queue(
bind_specification
)
elif isinstance(bind_specification, ExchangeToExchangeBindingSpecification):
binding_name = AddressHelper.binding_path_with_exchange_exchange(
bind_specification
)
self.request(
None,
binding_exchange_queue_path,
binding_name,
CommonValues.command_delete.value,
[
CommonValues.response_code_204.value,
Expand Down
Loading