Skip to content

Commit 515e7fe

Browse files
author
DanielePalaia
committed
improve binding/unbinding operarations
1 parent d6722a0 commit 515e7fe

File tree

6 files changed

+230
-21
lines changed

6 files changed

+230
-21
lines changed

rabbitmq_amqp_python_client/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
)
1414
from .environment import Environment
1515
from .exceptions import (
16+
AmqpValidationException,
1617
ArgumentOutOfRangeException,
1718
ValidationCodeException,
1819
)
@@ -73,4 +74,5 @@
7374
"OffsetSpecification",
7475
"OutcomeState",
7576
"Environment",
77+
"AmqpValidationException",
7678
]

rabbitmq_amqp_python_client/address_helper.py

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Optional
2+
13
from .entities import BindingSpecification
24
from .qpid.proton._message import Message
35

@@ -7,19 +9,22 @@ def _is_unreserved(char: str) -> bool:
79
return char.isalnum() or char in "-._~"
810

911

10-
def encode_path_segment(input_string: str) -> str:
12+
def encode_path_segment(input_string: Optional[str]) -> str:
1113
encoded = []
1214

1315
# Iterate over each character in the input string
14-
for char in input_string:
15-
# Check if the character is an unreserved character
16-
if _is_unreserved(char):
17-
encoded.append(char) # Append as is
18-
else:
19-
# Encode character to %HH format
20-
encoded.append(f"%{ord(char):02X}")
16+
if input_string is not None:
17+
for char in input_string:
18+
# Check if the character is an unreserved character
19+
if _is_unreserved(char):
20+
encoded.append(char) # Append as is
21+
else:
22+
# Encode character to %HH format
23+
encoded.append(f"%{ord(char):02X}")
24+
25+
return "".join(encoded)
2126

22-
return "".join(encoded)
27+
return ""
2328

2429

2530
class AddressHelper:
@@ -60,6 +65,11 @@ def path_address() -> str:
6065
def binding_path_with_exchange_queue(
6166
bind_specification: BindingSpecification,
6267
) -> str:
68+
if bind_specification.binding_key is not None:
69+
key = ";key=" + encode_path_segment(bind_specification.binding_key)
70+
else:
71+
key = ";key="
72+
6373
binding_path_wth_exchange_queue_key = (
6474
"/bindings"
6575
+ "/"
@@ -68,11 +78,28 @@ def binding_path_with_exchange_queue(
6878
+ ";"
6979
+ "dstq="
7080
+ encode_path_segment(bind_specification.destination_queue)
81+
+ key
82+
+ ";args="
83+
)
84+
return binding_path_wth_exchange_queue_key
85+
86+
@staticmethod
87+
def binding_path_with_exchange_exchange(
88+
bind_specification: BindingSpecification,
89+
) -> str:
90+
binding_path_wth_exchange_exchange_key = (
91+
"/bindings"
92+
+ "/"
93+
+ "src="
94+
+ encode_path_segment(bind_specification.source_exchange)
95+
+ ";"
96+
+ "dstq="
97+
+ encode_path_segment(bind_specification.destination_exchange)
7198
+ ";key="
7299
+ encode_path_segment(bind_specification.binding_key)
73100
+ ";args="
74101
)
75-
return binding_path_wth_exchange_queue_key
102+
return binding_path_wth_exchange_exchange_key
76103

77104
@staticmethod
78105
def message_to_address_helper(message: Message, address: str) -> Message:

rabbitmq_amqp_python_client/entities.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ class OffsetSpecification(Enum):
4343
@dataclass
4444
class BindingSpecification:
4545
source_exchange: str
46-
destination_queue: str
47-
binding_key: str
46+
binding_key: Optional[str] = None
47+
destination_exchange: Optional[str] = None
48+
destination_queue: Optional[str] = None
4849

4950

5051
class StreamOptions:

rabbitmq_amqp_python_client/exceptions.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,12 @@ def __init__(self, msg: str):
1414

1515
def __str__(self) -> str:
1616
return repr(self.msg)
17+
18+
19+
class AmqpValidationException(BaseException):
20+
# Constructor or Initializer
21+
def __init__(self, msg: str):
22+
self.msg = msg
23+
24+
def __str__(self) -> str:
25+
return repr(self.msg)

rabbitmq_amqp_python_client/management.py

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
ExchangeSpecification,
1010
QueueInfo,
1111
)
12-
from .exceptions import ValidationCodeException
12+
from .exceptions import (
13+
AmqpValidationException,
14+
ValidationCodeException,
15+
)
1316
from .options import ReceiverOption, SenderOption
1417
from .qpid.proton._message import Message
1518
from .qpid.proton.utils import (
@@ -303,10 +306,19 @@ def _validate_reponse_code(
303306

304307
def bind(self, bind_specification: BindingSpecification) -> str:
305308
logger.debug("Bind Operation called")
309+
self._validate_binding(bind_specification)
310+
306311
body = {}
307-
body["binding_key"] = bind_specification.binding_key
312+
if bind_specification.binding_key is not None:
313+
body["binding_key"] = bind_specification.binding_key
314+
else:
315+
body["binding_key"] = ""
308316
body["source"] = bind_specification.source_exchange
309-
body["destination_queue"] = bind_specification.destination_queue
317+
if bind_specification.destination_queue is not None:
318+
body["destination_queue"] = bind_specification.destination_queue
319+
elif bind_specification.destination_exchange is not None:
320+
body["destination_exchange"] = bind_specification.destination_exchange
321+
310322
body["arguments"] = {} # type: ignore
311323

312324
path = AddressHelper.path_address()
@@ -320,16 +332,53 @@ def bind(self, bind_specification: BindingSpecification) -> str:
320332
],
321333
)
322334

323-
binding_path_with_queue = AddressHelper.binding_path_with_exchange_queue(
324-
bind_specification
325-
)
326-
return binding_path_with_queue
335+
if bind_specification.destination_queue is not None:
336+
binding_path = AddressHelper.binding_path_with_exchange_queue(
337+
bind_specification
338+
)
339+
elif bind_specification.destination_exchange is not None:
340+
binding_path = AddressHelper.binding_path_with_exchange_exchange(
341+
bind_specification
342+
)
327343

328-
def unbind(self, binding_exchange_queue_path: str) -> None:
344+
return binding_path
345+
346+
def _validate_binding(self, bind_specification: BindingSpecification) -> None:
347+
if (
348+
bind_specification.destination_queue is not None
349+
and bind_specification.destination_exchange is not None
350+
):
351+
raise AmqpValidationException(
352+
"just one of destination_queue and destination_exchange of BindingSpecification must be set "
353+
"for a binding operation"
354+
)
355+
356+
if (
357+
bind_specification.destination_queue is None
358+
and bind_specification.destination_exchange is None
359+
):
360+
raise AmqpValidationException(
361+
"at least one of destination_queue and destination_exchange of BindingSpecification must be set "
362+
"for a binding operation"
363+
)
364+
365+
def unbind(self, bind_specification: Union[BindingSpecification, str]) -> None:
329366
logger.debug("UnBind Operation called")
367+
if isinstance(bind_specification, str):
368+
binding_name = bind_specification
369+
else:
370+
self._validate_binding(bind_specification)
371+
if bind_specification.destination_queue is not None:
372+
binding_name = AddressHelper.binding_path_with_exchange_queue(
373+
bind_specification
374+
)
375+
elif bind_specification.destination_exchange is not None:
376+
binding_name = AddressHelper.binding_path_with_exchange_exchange(
377+
bind_specification
378+
)
330379
self.request(
331380
None,
332-
binding_exchange_queue_path,
381+
binding_name,
333382
CommonValues.command_delete.value,
334383
[
335384
CommonValues.response_code_204.value,

tests/test_management.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from datetime import timedelta
22

33
from rabbitmq_amqp_python_client import (
4+
AmqpValidationException,
45
BindingSpecification,
56
ClassicQueueSpecification,
67
ExchangeSpecification,
@@ -98,6 +99,126 @@ def test_bind_exchange_to_queue(management: Management) -> None:
9899
management.unbind(binding_exchange_queue_path)
99100

100101

102+
def test_bind_no_destination(management: Management) -> None:
103+
104+
exchange_name = "test-bind-exchange-to-queue-exchange"
105+
queue_name = "test-bind-exchange-to-queue-queue"
106+
routing_key = "routing-key"
107+
raised = False
108+
109+
management.declare_exchange(ExchangeSpecification(name=exchange_name))
110+
111+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
112+
113+
try:
114+
management.bind(
115+
BindingSpecification(
116+
source_exchange=exchange_name,
117+
binding_key=routing_key,
118+
)
119+
)
120+
except AmqpValidationException:
121+
raised = True
122+
123+
assert raised is True
124+
125+
management.delete_exchange(exchange_name)
126+
127+
management.delete_queue(queue_name)
128+
129+
130+
def test_bind_exchange_to_queue_without_key(management: Management) -> None:
131+
132+
exchange_name = "test-bind-exchange-to-queue-exchange"
133+
queue_name = "test-bind-exchange-to-queue-queue"
134+
135+
management.declare_exchange(ExchangeSpecification(name=exchange_name))
136+
137+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
138+
139+
binding_exchange_queue_path = management.bind(
140+
BindingSpecification(
141+
source_exchange=exchange_name,
142+
destination_queue=queue_name,
143+
)
144+
)
145+
146+
assert (
147+
binding_exchange_queue_path
148+
== "/bindings/src=" + exchange_name + ";dstq=" + queue_name + ";key=" + ";args="
149+
)
150+
151+
management.unbind(binding_exchange_queue_path)
152+
153+
management.delete_exchange(exchange_name)
154+
155+
management.delete_queue(queue_name)
156+
157+
158+
def test_bind_unbind_by_binding_spec(management: Management) -> None:
159+
160+
exchange_name = "test-bind-exchange-to-queue-exchange"
161+
queue_name = "test-bind-exchange-to-queue-queue"
162+
163+
management.declare_exchange(ExchangeSpecification(name=exchange_name))
164+
165+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
166+
167+
management.bind(
168+
BindingSpecification(
169+
source_exchange=exchange_name,
170+
destination_queue=queue_name,
171+
)
172+
)
173+
174+
management.unbind(
175+
BindingSpecification(
176+
source_exchange=exchange_name,
177+
destination_queue=queue_name,
178+
)
179+
)
180+
181+
management.delete_exchange(exchange_name)
182+
183+
management.delete_queue(queue_name)
184+
185+
186+
def test_bind_exchange_to_exchange(management: Management) -> None:
187+
188+
source_exchange_name = "source_exchange"
189+
destination_exchange_name = "destination_exchange"
190+
routing_key = "routing-key"
191+
192+
management.declare_exchange(ExchangeSpecification(name=source_exchange_name))
193+
194+
management.declare_exchange(ExchangeSpecification(name=destination_exchange_name))
195+
196+
binding_exchange_exchange_path = management.bind(
197+
BindingSpecification(
198+
source_exchange=source_exchange_name,
199+
destination_exchange=destination_exchange_name,
200+
binding_key=routing_key,
201+
)
202+
)
203+
204+
assert (
205+
binding_exchange_exchange_path
206+
== "/bindings/src="
207+
+ source_exchange_name
208+
+ ";dstq="
209+
+ destination_exchange_name
210+
+ ";key="
211+
+ routing_key
212+
+ ";args="
213+
)
214+
215+
management.unbind(binding_exchange_exchange_path)
216+
217+
management.delete_exchange(source_exchange_name)
218+
219+
management.delete_exchange(destination_exchange_name)
220+
221+
101222
def test_queue_info_with_validations(management: Management) -> None:
102223

103224
queue_name = "test_queue_info_with_validation"

0 commit comments

Comments
 (0)