Skip to content

Commit 7ee38ca

Browse files
author
DanielePalaia
committed
improving AddressHelper utility functions
1 parent 71c532d commit 7ee38ca

File tree

8 files changed

+77
-81
lines changed

8 files changed

+77
-81
lines changed

examples/getting_started/main.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# type: ignore
22
from rabbitmq_amqp_python_client import (
3+
AddressHelper,
34
BindingSpecification,
45
Connection,
56
Event,
@@ -8,8 +9,6 @@
89
MessageAck,
910
MessagingHandler,
1011
QuorumQueueSpecification,
11-
exchange_address,
12-
queue_address,
1312
)
1413

1514

@@ -91,9 +90,9 @@ def main() -> None:
9190
)
9291
)
9392

94-
addr = exchange_address(exchange_name, routing_key)
93+
addr = AddressHelper.exchange_address(exchange_name, routing_key)
9594

96-
addr_queue = queue_address(queue_name)
95+
addr_queue = AddressHelper.queue_address(queue_name)
9796

9897
print("create a publisher and publish a test message")
9998
publisher = connection.publisher(addr)

rabbitmq_amqp_python_client/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from importlib import metadata
22

3-
from .address_helper import exchange_address, queue_address
3+
from .address_helper import AddressHelper
44
from .common import ExchangeType, QueueType
55
from .connection import Connection
66
from .consumer import Consumer
@@ -41,8 +41,6 @@
4141
"BindingSpecification",
4242
"QueueType",
4343
"Publisher",
44-
"exchange_address",
45-
"queue_address",
4644
"Message",
4745
"Consumer",
4846
"MessagingHandler",
@@ -51,4 +49,5 @@
5149
"MessageAck",
5250
"symbol",
5351
"ExchangeType",
52+
"AddressHelper",
5453
]
Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from .entities import BindingSpecification
22

33

4-
def is_unreserved(char: str) -> bool:
4+
def _is_unreserved(char: str) -> bool:
55
# According to RFC 3986, unreserved characters are A-Z, a-z, 0-9, '-', '.', '_', and '~'
66
return char.isalnum() or char in "-._~"
77

@@ -12,7 +12,7 @@ def encode_path_segment(input_string: str) -> str:
1212
# Iterate over each character in the input string
1313
for char in input_string:
1414
# Check if the character is an unreserved character
15-
if is_unreserved(char):
15+
if _is_unreserved(char):
1616
encoded.append(char) # Append as is
1717
else:
1818
# Encode character to %HH format
@@ -21,49 +21,54 @@ def encode_path_segment(input_string: str) -> str:
2121
return "".join(encoded)
2222

2323

24-
def exchange_address(exchange_name: str, routing_key: str = "") -> str:
25-
if routing_key == "":
26-
path = "/exchanges/" + encode_path_segment(exchange_name)
27-
else:
28-
path = (
29-
"/exchanges/"
30-
+ encode_path_segment(exchange_name)
31-
+ "/"
32-
+ encode_path_segment(routing_key)
33-
)
34-
35-
return path
36-
24+
class AddressHelper:
3725

38-
def queue_address(queue_name: str) -> str:
39-
path = "/queues/" + encode_path_segment(queue_name)
40-
41-
return path
26+
@staticmethod
27+
def exchange_address(exchange_name: str, routing_key: str = "") -> str:
28+
if routing_key == "":
29+
path = "/exchanges/" + encode_path_segment(exchange_name)
30+
else:
31+
path = (
32+
"/exchanges/"
33+
+ encode_path_segment(exchange_name)
34+
+ "/"
35+
+ encode_path_segment(routing_key)
36+
)
4237

38+
return path
4339

44-
def purge_queue_address(queue_name: str) -> str:
45-
path = "/queues/" + encode_path_segment(queue_name) + "/messages"
40+
@staticmethod
41+
def queue_address(queue_name: str) -> str:
42+
path = "/queues/" + encode_path_segment(queue_name)
4643

47-
return path
44+
return path
4845

46+
@staticmethod
47+
def purge_queue_address(queue_name: str) -> str:
48+
path = "/queues/" + encode_path_segment(queue_name) + "/messages"
4949

50-
def path_address() -> str:
51-
path = "/bindings"
50+
return path
5251

53-
return path
52+
@staticmethod
53+
def path_address() -> str:
54+
path = "/bindings"
5455

56+
return path
5557

56-
def binding_path_with_exchange_queue(bind_specification: BindingSpecification) -> str:
57-
binding_path_wth_exchange_queue_key = (
58-
"/bindings"
59-
+ "/"
60-
+ "src="
61-
+ encode_path_segment(bind_specification.source_exchange)
62-
+ ";"
63-
+ "dstq="
64-
+ encode_path_segment(bind_specification.destination_queue)
65-
+ ";key="
66-
+ encode_path_segment(bind_specification.binding_key)
67-
+ ";args="
68-
)
69-
return binding_path_wth_exchange_queue_key
58+
@staticmethod
59+
def binding_path_with_exchange_queue(
60+
bind_specification: BindingSpecification,
61+
) -> str:
62+
binding_path_wth_exchange_queue_key = (
63+
"/bindings"
64+
+ "/"
65+
+ "src="
66+
+ encode_path_segment(bind_specification.source_exchange)
67+
+ ";"
68+
+ "dstq="
69+
+ encode_path_segment(bind_specification.destination_queue)
70+
+ ";key="
71+
+ encode_path_segment(bind_specification.binding_key)
72+
+ ";args="
73+
)
74+
return binding_path_wth_exchange_queue_key

rabbitmq_amqp_python_client/management.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,7 @@
22
import uuid
33
from typing import Any, Optional, Union
44

5-
from .address_helper import (
6-
binding_path_with_exchange_queue,
7-
exchange_address,
8-
path_address,
9-
purge_queue_address,
10-
queue_address,
11-
)
5+
from .address_helper import AddressHelper
126
from .common import CommonValues, QueueType
137
from .entities import (
148
BindingSpecification,
@@ -113,7 +107,7 @@ def declare_exchange(
113107
body["internal"] = exchange_specification.is_internal
114108
body["arguments"] = exchange_specification.arguments # type: ignore
115109

116-
path = exchange_address(exchange_specification.name)
110+
path = AddressHelper.exchange_address(exchange_specification.name)
117111

118112
self.request(
119113
body,
@@ -146,7 +140,7 @@ def declare_queue(
146140
elif isinstance(queue_specification, StreamSpecification):
147141
body = self._declare_stream(queue_specification)
148142

149-
path = queue_address(queue_specification.name)
143+
path = AddressHelper.queue_address(queue_specification.name)
150144

151145
self.request(
152146
body,
@@ -255,7 +249,7 @@ def _declare_stream(
255249

256250
def delete_exchange(self, exchange_name: str) -> None:
257251
logger.debug("delete_exchange operation called")
258-
path = exchange_address(exchange_name)
252+
path = AddressHelper.exchange_address(exchange_name)
259253

260254
self.request(
261255
None,
@@ -268,7 +262,7 @@ def delete_exchange(self, exchange_name: str) -> None:
268262

269263
def delete_queue(self, queue_name: str) -> None:
270264
logger.debug("delete_queue operation called")
271-
path = queue_address(queue_name)
265+
path = AddressHelper.queue_address(queue_name)
272266

273267
self.request(
274268
None,
@@ -302,7 +296,7 @@ def bind(self, bind_specification: BindingSpecification) -> str:
302296
body["destination_queue"] = bind_specification.destination_queue
303297
body["arguments"] = {} # type: ignore
304298

305-
path = path_address()
299+
path = AddressHelper.path_address()
306300

307301
self.request(
308302
body,
@@ -313,7 +307,9 @@ def bind(self, bind_specification: BindingSpecification) -> str:
313307
],
314308
)
315309

316-
binding_path_with_queue = binding_path_with_exchange_queue(bind_specification)
310+
binding_path_with_queue = AddressHelper.binding_path_with_exchange_queue(
311+
bind_specification
312+
)
317313
return binding_path_with_queue
318314

319315
def unbind(self, binding_exchange_queue_path: str) -> None:
@@ -329,7 +325,7 @@ def unbind(self, binding_exchange_queue_path: str) -> None:
329325

330326
def purge_queue(self, queue_name: str) -> int:
331327
logger.debug("purge_queue operation called")
332-
path = purge_queue_address(queue_name)
328+
path = AddressHelper.purge_queue_address(queue_name)
333329

334330
response = self.request(
335331
None,
@@ -344,7 +340,7 @@ def purge_queue(self, queue_name: str) -> int:
344340

345341
def queue_info(self, queue_name: str) -> QueueInfo:
346342
logger.debug("queue_info operation called")
347-
path = queue_address(queue_name)
343+
path = AddressHelper.queue_address(queue_name)
348344

349345
message = self.request(
350346
None,

tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import pytest
22

33
from rabbitmq_amqp_python_client import (
4+
AddressHelper,
45
Connection,
56
Event,
67
MessageAck,
78
MessagingHandler,
8-
queue_address,
99
symbol,
1010
)
1111

@@ -40,7 +40,7 @@ def consumer(pytestconfig):
4040
connection.dial()
4141
try:
4242
queue_name = "test-queue"
43-
addr_queue = queue_address(queue_name)
43+
addr_queue = AddressHelper.queue_address(queue_name)
4444
consumer = connection.consumer(addr_queue)
4545
yield consumer
4646

tests/test_address_helper.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,25 @@
1-
from rabbitmq_amqp_python_client import (
2-
exchange_address,
3-
queue_address,
4-
)
1+
from rabbitmq_amqp_python_client import AddressHelper
52

63

74
def test_encoding_queue_simple() -> None:
85
queue = "my_queue"
96

10-
address = queue_address(queue)
7+
address = AddressHelper.queue_address(queue)
118

129
assert address == "/queues/my_queue"
1310

1411

1512
def test_encoding_queue_hex() -> None:
1613
queue = "my_queue>"
1714

18-
address = queue_address(queue)
15+
address = AddressHelper.queue_address(queue)
1916

2017
assert address == "/queues/my_queue%3E"
2118

2219

2320
def test_encoding_exchange_hex() -> None:
2421
queue = "my_exchange/()"
2522

26-
address = exchange_address(queue)
23+
address = AddressHelper.exchange_address(queue)
2724

2825
assert address == "/exchanges/my_exchange%2F%28%29"

tests/test_consumer.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from rabbitmq_amqp_python_client import (
2+
AddressHelper,
23
Connection,
34
QuorumQueueSpecification,
4-
queue_address,
55
)
66

77
from .conftest import (
@@ -29,7 +29,7 @@ def test_consumer_sync_queue_accept(connection: Connection) -> None:
2929

3030
management.declare_queue(QuorumQueueSpecification(name=queue_name))
3131

32-
addr_queue = queue_address(queue_name)
32+
addr_queue = AddressHelper.queue_address(queue_name)
3333
consumer = connection.consumer(addr_queue)
3434

3535
consumed = 0
@@ -61,7 +61,7 @@ def test_consumer_async_queue_accept(connection: Connection) -> None:
6161

6262
management.declare_queue(QuorumQueueSpecification(name=queue_name))
6363

64-
addr_queue = queue_address(queue_name)
64+
addr_queue = AddressHelper.queue_address(queue_name)
6565

6666
publish_messages(connection, messages_to_send, queue_name)
6767

@@ -99,7 +99,7 @@ def test_consumer_async_queue_no_ack(connection: Connection) -> None:
9999

100100
management.declare_queue(QuorumQueueSpecification(name=queue_name))
101101

102-
addr_queue = queue_address(queue_name)
102+
addr_queue = AddressHelper.queue_address(queue_name)
103103

104104
publish_messages(connection, messages_to_send, queue_name)
105105

@@ -138,7 +138,7 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None:
138138

139139
# configuring dead lettering
140140
bind_path = setup_dead_lettering(management)
141-
addr_queue = queue_address(queue_name)
141+
addr_queue = AddressHelper.queue_address(queue_name)
142142

143143
management.declare_queue(
144144
QuorumQueueSpecification(
@@ -204,8 +204,8 @@ def test_consumer_async_queue_with_discard_with_annotations(
204204
publish_messages(connection, messages_to_send, queue_name)
205205

206206
bind_path = setup_dead_lettering(management)
207-
addr_queue = queue_address(queue_name)
208-
addr_queue_dl = queue_address(queue_dead_lettering)
207+
addr_queue = AddressHelper.queue_address(queue_name)
208+
addr_queue_dl = AddressHelper.queue_address(queue_dead_lettering)
209209

210210
# workaround: it looks like when the consumer finish to consume invalidate the connection
211211
# so for the moment we need to use one dedicated
@@ -254,7 +254,7 @@ def test_consumer_async_queue_with_requeue(connection: Connection) -> None:
254254

255255
management.declare_queue(QuorumQueueSpecification(name=queue_name))
256256

257-
addr_queue = queue_address(queue_name)
257+
addr_queue = AddressHelper.queue_address(queue_name)
258258

259259
publish_messages(connection, messages_to_send, queue_name)
260260

@@ -293,7 +293,7 @@ def test_consumer_async_queue_with_requeue_with_annotations(
293293

294294
management.declare_queue(QuorumQueueSpecification(name=queue_name))
295295

296-
addr_queue = queue_address(queue_name)
296+
addr_queue = AddressHelper.queue_address(queue_name)
297297

298298
publish_messages(connection, messages_to_send, queue_name)
299299

0 commit comments

Comments
 (0)