Skip to content

Commit 2983576

Browse files
author
DanielePalaia
committed
improving AddressHelper utility functions
1 parent 71c532d commit 2983576

File tree

13 files changed

+148
-159
lines changed

13 files changed

+148
-159
lines changed

examples/getting_started/main.py

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,60 @@
11
# type: ignore
2+
3+
24
from rabbitmq_amqp_python_client import (
5+
AddressHelper,
36
BindingSpecification,
47
Connection,
8+
DeliveryConsumerHandler,
59
Event,
610
ExchangeSpecification,
711
Message,
8-
MessageAck,
9-
MessagingHandler,
1012
QuorumQueueSpecification,
11-
exchange_address,
12-
queue_address,
1313
)
1414

1515

16-
class MyMessageHandler(MessagingHandler):
16+
class MyMessageHandler(DeliveryConsumerHandler):
1717

1818
def __init__(self):
19-
super().__init__(auto_accept=False, auto_settle=False)
19+
super().__init__()
2020
self._count = 0
2121

2222
def on_message(self, event: Event):
2323
print("received message: " + str(event.message.annotations))
2424

2525
# accepting
26-
MessageAck.accept(event)
26+
self.delivery_context.accept(event)
2727

2828
# in case of rejection (+eventually deadlettering)
29-
# MessageAck.discard(event)
29+
# self.delivery_context.discard(event)
3030

3131
# in case of requeuing
32-
# MessageAck.requeue(event)
32+
# self.delivery_context.requeue(event)
3333

3434
# annotations = {}
3535
# annotations[symbol('x-opt-string')] = 'x-test1'
3636
# in case of requeuing with annotations added
37-
# MessageAck.requeue_with_annotations(event, annotations)
37+
# self.delivery_context.requeue_with_annotations(event, annotations)
3838

3939
# in case of rejection with annotations added
40-
# MessageAck.discard_with_annotations(event)
40+
# self.delivery_context.discard_with_annotations(event)
4141

4242
print("count " + str(self._count))
4343

4444
self._count = self._count + 1
4545

4646
if self._count == 100:
4747
print("closing receiver")
48-
event.receiver.close()
49-
event.connection.close()
48+
# if you want you can add cleanup operations here
49+
# event.receiver.close()
50+
# event.connection.close()
5051

5152
def on_connection_closed(self, event: Event):
53+
# if you want you can add cleanup operations here
5254
print("connection closed")
5355

5456
def on_link_closed(self, event: Event) -> None:
57+
# if you want you can add cleanup operations here
5558
print("link closed")
5659

5760

@@ -91,9 +94,9 @@ def main() -> None:
9194
)
9295
)
9396

94-
addr = exchange_address(exchange_name, routing_key)
97+
addr = AddressHelper.exchange_address(exchange_name, routing_key)
9598

96-
addr_queue = queue_address(queue_name)
99+
addr_queue = AddressHelper.queue_address(queue_name)
97100

98101
print("create a publisher and publish a test message")
99102
publisher = connection.publisher(addr)
@@ -102,7 +105,7 @@ def main() -> None:
102105
messages_purged = management.purge_queue(queue_name)
103106

104107
print("messages purged: " + str(messages_purged))
105-
management.close()
108+
# management.close()
106109

107110
# publish 10 messages
108111
for i in range(messages_to_publish):
@@ -121,22 +124,22 @@ def main() -> None:
121124
pass
122125

123126
print("cleanup")
124-
# once we finish consuming we close the connection so we need to create a new one
125-
connection = create_connection()
127+
consumer.close()
128+
# once we finish consuming if we close the connection we need to create a new one
129+
# connection = create_connection()
130+
# management = connection.management()
126131

127-
management = connection.management()
128132
print("unbind")
129133
management.unbind(bind_name)
130134

131135
print("delete queue")
132-
# management.delete_queue(queue_name)
136+
management.delete_queue(queue_name)
133137

134138
print("delete exchange")
135139
management.delete_exchange(exchange_name)
136140

137141
print("closing connections")
138142
management.close()
139-
# consumer.close()
140143
print("after management closing")
141144
connection.close()
142145
print("after connection closing")

rabbitmq_amqp_python_client/__init__.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
from importlib import metadata
22

3-
from .address_helper import exchange_address, queue_address
3+
from .address_helper import AddressHelper
44
from .common import ExchangeType, QueueType
55
from .connection import Connection
66
from .consumer import Consumer
7+
from .delivery_consumer_handler import (
8+
DeliveryConsumerHandler,
9+
)
710
from .entities import (
811
BindingSpecification,
912
ExchangeSpecification,
1013
)
1114
from .management import Management
12-
from .message_ack import MessageAck
1315
from .publisher import Publisher
1416
from .qpid.proton._data import symbol # noqa: E402
1517
from .qpid.proton._delivery import Delivery
@@ -41,14 +43,13 @@
4143
"BindingSpecification",
4244
"QueueType",
4345
"Publisher",
44-
"exchange_address",
45-
"queue_address",
4646
"Message",
4747
"Consumer",
4848
"MessagingHandler",
4949
"Event",
5050
"Delivery",
51-
"MessageAck",
5251
"symbol",
5352
"ExchangeType",
53+
"AddressHelper",
54+
"DeliveryConsumerHandler",
5455
]
Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from .entities import BindingSpecification
22

33

4-
def is_unreserved(char: str) -> bool:
4+
def _is_unreserved(char: str) -> bool:
55
# According to RFC 3986, unreserved characters are A-Z, a-z, 0-9, '-', '.', '_', and '~'
66
return char.isalnum() or char in "-._~"
77

@@ -12,7 +12,7 @@ def encode_path_segment(input_string: str) -> str:
1212
# Iterate over each character in the input string
1313
for char in input_string:
1414
# Check if the character is an unreserved character
15-
if is_unreserved(char):
15+
if _is_unreserved(char):
1616
encoded.append(char) # Append as is
1717
else:
1818
# Encode character to %HH format
@@ -21,49 +21,54 @@ def encode_path_segment(input_string: str) -> str:
2121
return "".join(encoded)
2222

2323

24-
def exchange_address(exchange_name: str, routing_key: str = "") -> str:
25-
if routing_key == "":
26-
path = "/exchanges/" + encode_path_segment(exchange_name)
27-
else:
28-
path = (
29-
"/exchanges/"
30-
+ encode_path_segment(exchange_name)
31-
+ "/"
32-
+ encode_path_segment(routing_key)
33-
)
34-
35-
return path
36-
24+
class AddressHelper:
3725

38-
def queue_address(queue_name: str) -> str:
39-
path = "/queues/" + encode_path_segment(queue_name)
40-
41-
return path
26+
@staticmethod
27+
def exchange_address(exchange_name: str, routing_key: str = "") -> str:
28+
if routing_key == "":
29+
path = "/exchanges/" + encode_path_segment(exchange_name)
30+
else:
31+
path = (
32+
"/exchanges/"
33+
+ encode_path_segment(exchange_name)
34+
+ "/"
35+
+ encode_path_segment(routing_key)
36+
)
4237

38+
return path
4339

44-
def purge_queue_address(queue_name: str) -> str:
45-
path = "/queues/" + encode_path_segment(queue_name) + "/messages"
40+
@staticmethod
41+
def queue_address(queue_name: str) -> str:
42+
path = "/queues/" + encode_path_segment(queue_name)
4643

47-
return path
44+
return path
4845

46+
@staticmethod
47+
def purge_queue_address(queue_name: str) -> str:
48+
path = "/queues/" + encode_path_segment(queue_name) + "/messages"
4949

50-
def path_address() -> str:
51-
path = "/bindings"
50+
return path
5251

53-
return path
52+
@staticmethod
53+
def path_address() -> str:
54+
path = "/bindings"
5455

56+
return path
5557

56-
def binding_path_with_exchange_queue(bind_specification: BindingSpecification) -> str:
57-
binding_path_wth_exchange_queue_key = (
58-
"/bindings"
59-
+ "/"
60-
+ "src="
61-
+ encode_path_segment(bind_specification.source_exchange)
62-
+ ";"
63-
+ "dstq="
64-
+ encode_path_segment(bind_specification.destination_queue)
65-
+ ";key="
66-
+ encode_path_segment(bind_specification.binding_key)
67-
+ ";args="
68-
)
69-
return binding_path_wth_exchange_queue_key
58+
@staticmethod
59+
def binding_path_with_exchange_queue(
60+
bind_specification: BindingSpecification,
61+
) -> str:
62+
binding_path_wth_exchange_queue_key = (
63+
"/bindings"
64+
+ "/"
65+
+ "src="
66+
+ encode_path_segment(bind_specification.source_exchange)
67+
+ ";"
68+
+ "dstq="
69+
+ encode_path_segment(bind_specification.destination_queue)
70+
+ ";key="
71+
+ encode_path_segment(bind_specification.binding_key)
72+
+ ";args="
73+
)
74+
return binding_path_wth_exchange_queue_key

rabbitmq_amqp_python_client/connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ def management(self) -> Management:
3232
# closes the connection to the AMQP 1.0 server.
3333
def close(self) -> None:
3434
logger.debug("Closing connection")
35+
print("closing connection")
3536
self._conn.close()
37+
print("after closing connection")
3638

3739
def publisher(self, destination: str) -> Publisher:
3840
publisher = Publisher(self._conn, destination)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from .delivery_context import DeliveryContext
2+
from .qpid.proton.handlers import MessagingHandler
3+
4+
5+
class DeliveryConsumerHandler(MessagingHandler): # type: ignore
6+
7+
def __init__(self, auto_accept: bool = False, auto_settle: bool = True):
8+
super().__init__(auto_accept=auto_accept, auto_settle=auto_settle)
9+
self._count = 0
10+
self.delivery_context: DeliveryContext = DeliveryContext()

rabbitmq_amqp_python_client/message_ack.py renamed to rabbitmq_amqp_python_client/delivery_context.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,20 @@
55
from .qpid.proton._events import Event
66

77

8-
class MessageAck:
8+
class DeliveryContext:
99

10-
@staticmethod
11-
def accept(event: Event) -> None:
10+
def accept(self, event: Event) -> None:
1211
dlv = event.delivery
1312
dlv.update(Delivery.ACCEPTED)
1413
dlv.settle()
1514

16-
@staticmethod
17-
def discard(event: Event) -> None:
15+
def discard(self, event: Event) -> None:
1816
dlv = event.delivery
1917
dlv.update(Delivery.REJECTED)
2018
dlv.settle()
2119

22-
@staticmethod
2320
def discard_with_annotations(
24-
event: Event, annotations: Dict[str, "PythonAMQPData"]
21+
self, event: Event, annotations: Dict[str, "PythonAMQPData"]
2522
) -> None:
2623
dlv = event.delivery
2724
dlv.local.annotations = annotations
@@ -31,15 +28,13 @@ def discard_with_annotations(
3128
dlv.update(Delivery.MODIFIED)
3229
dlv.settle()
3330

34-
@staticmethod
35-
def requeue(event: Event) -> None:
31+
def requeue(self, event: Event) -> None:
3632
dlv = event.delivery
3733
dlv.update(Delivery.RELEASED)
3834
dlv.settle()
3935

40-
@staticmethod
4136
def requeue_with_annotations(
42-
event: Event, annotations: Dict[str, "PythonAMQPData"]
37+
self, event: Event, annotations: Dict[str, "PythonAMQPData"]
4338
) -> None:
4439
dlv = event.delivery
4540
dlv.local.annotations = annotations

0 commit comments

Comments
 (0)