Skip to content

Commit 2744c14

Browse files
author
DanielePalaia
committed
removing amqpmessage wrapper layer
1 parent fb62cc3 commit 2744c14

File tree

7 files changed

+44
-67
lines changed

7 files changed

+44
-67
lines changed

rabbitmq_amqp_python_client/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from .address_helper import AddressHelper
44
from .amqp_consumer_handler import AMQPMessagingHandler
5-
from .amqp_message import AmqpMessage
65
from .common import ExchangeType, QueueType
76
from .connection import Connection
87
from .consumer import Consumer
@@ -19,6 +18,7 @@
1918
from .qpid.proton._data import symbol # noqa: E402
2019
from .qpid.proton._delivery import Delivery, Disposition
2120
from .qpid.proton._events import Event
21+
from .qpid.proton._message import Message
2222
from .qpid.proton._utils import ConnectionClosed
2323
from .qpid.proton.handlers import MessagingHandler
2424
from .queues import (
@@ -52,7 +52,7 @@
5252
"BindingSpecification",
5353
"QueueType",
5454
"Publisher",
55-
"AmqpMessage",
55+
"Message",
5656
"Consumer",
5757
"MessagingHandler",
5858
"Event",
@@ -69,4 +69,5 @@
6969
"OffsetSpecification",
7070
"OutcomeState",
7171
"Environment",
72+
"message_to_address_helper",
7273
]

rabbitmq_amqp_python_client/address_helper.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from .entities import BindingSpecification
2+
from .qpid.proton._message import Message
23

34

45
def _is_unreserved(char: str) -> bool:
@@ -73,6 +74,11 @@ def binding_path_with_exchange_queue(
7374
)
7475
return binding_path_wth_exchange_queue_key
7576

77+
@staticmethod
78+
def message_to_address_helper(message: Message, address: str) -> Message:
79+
message.address = address
80+
return message
81+
7682

7783
def validate_address(address: str) -> bool:
7884
if address.startswith("/queues") or address.startswith("/exchanges"):

rabbitmq_amqp_python_client/amqp_message.py

Lines changed: 0 additions & 30 deletions
This file was deleted.

rabbitmq_amqp_python_client/consumer.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import logging
22
from typing import Literal, Optional, Union, cast
33

4-
from .amqp_message import AmqpMessage
54
from .entities import StreamOptions
65
from .options import (
76
ReceiverOptionUnsettled,
87
ReceiverOptionUnsettledWithFilters,
98
)
109
from .qpid.proton._handlers import MessagingHandler
10+
from .qpid.proton._message import Message
1111
from .qpid.proton.utils import (
1212
BlockingConnection,
1313
BlockingReceiver,
@@ -38,12 +38,10 @@ def _open(self) -> None:
3838
logger.debug("Creating Sender")
3939
self._receiver = self._create_receiver(self._addr)
4040

41-
def consume( # type: ignore
42-
self, timeout: Union[None, Literal[False], float] = False
43-
) -> AmqpMessage:
41+
def consume(self, timeout: Union[None, Literal[False], float] = False) -> Message:
4442
if self._receiver is not None:
4543
message = self._receiver.receive(timeout=timeout)
46-
return cast(AmqpMessage, message)
44+
return cast(Message, message)
4745

4846
def close(self) -> None:
4947
logger.debug("Closing the receiver")

rabbitmq_amqp_python_client/publisher.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
from typing import Optional
33

44
from .address_helper import validate_address
5-
from .amqp_message import AmqpMessage
65
from .exceptions import ArgumentOutOfRangeException
76
from .options import SenderOptionUnseattle
87
from .qpid.proton._delivery import Delivery
8+
from .qpid.proton._message import Message
99
from .qpid.proton.utils import (
1010
BlockingConnection,
1111
BlockingSender,
@@ -26,18 +26,18 @@ def _open(self) -> None:
2626
logger.debug("Creating Sender")
2727
self._sender = self._create_sender(self._addr)
2828

29-
def publish(self, message: AmqpMessage) -> Delivery:
29+
def publish(self, message: Message) -> Delivery:
3030
if self._addr != "":
3131
if self._sender is not None:
32-
return self._sender.send(message.native_message())
32+
return self._sender.send(message)
3333
else:
34-
if message.get_address() != "":
35-
if validate_address(message.get_address()) is False:
34+
if message.address != "":
35+
if validate_address(message.address) is False:
3636
raise ArgumentOutOfRangeException(
3737
"destination address must start with /queues or /exchanges"
3838
)
3939
if self._sender is not None:
40-
delivery = self._sender.send(message.native_message())
40+
delivery = self._sender.send(message)
4141
return delivery
4242

4343
def close(self) -> None:

tests/test_publisher.py

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
from rabbitmq_amqp_python_client import (
44
AddressHelper,
5-
AmqpMessage,
65
ArgumentOutOfRangeException,
76
Connection,
87
ConnectionClosed,
98
Environment,
9+
Message,
1010
OutcomeState,
1111
QuorumQueueSpecification,
1212
StreamSpecification,
@@ -30,7 +30,7 @@ def test_publish_queue(connection: Connection) -> None:
3030

3131
try:
3232
publisher = connection.publisher("/queues/" + queue_name)
33-
status = publisher.publish(AmqpMessage(body="test"))
33+
status = publisher.publish(Message(body="test"))
3434
if status.remote_state == OutcomeState.ACCEPTED:
3535
accepted = True
3636
except Exception:
@@ -103,7 +103,7 @@ def test_publish_ssl(connection_ssl: Connection) -> None:
103103

104104
try:
105105
publisher = connection_ssl.publisher("/queues/" + queue_name)
106-
publisher.publish(AmqpMessage(body="test"))
106+
publisher.publish(Message(body="test"))
107107
except Exception:
108108
raised = True
109109

@@ -124,7 +124,7 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
124124
publisher = None
125125
try:
126126
publisher = connection.publisher("/invalid-destination/" + queue_name)
127-
publisher.publish(AmqpMessage(body="test"))
127+
publisher.publish(Message(body="test"))
128128
except ArgumentOutOfRangeException:
129129
raised = True
130130
except Exception:
@@ -141,8 +141,10 @@ def test_publish_per_message_to_invalid_destination(connection: Connection) -> N
141141
queue_name = "test-queue-1"
142142
raised = False
143143

144-
message = AmqpMessage(body="test")
145-
message.to_address("/invalid_destination/" + queue_name)
144+
message = Message(body="test")
145+
message = AddressHelper.message_to_address_helper(
146+
message, "/invalid_destination/" + queue_name
147+
)
146148
publisher = connection.publisher()
147149

148150
try:
@@ -174,7 +176,7 @@ def test_publish_exchange(connection: Connection) -> None:
174176

175177
try:
176178
publisher = connection.publisher(addr)
177-
status = publisher.publish(AmqpMessage(body="test"))
179+
status = publisher.publish(Message(body="test"))
178180
if status.ACCEPTED:
179181
accepted = True
180182
except Exception:
@@ -204,7 +206,7 @@ def test_publish_purge(connection: Connection) -> None:
204206
try:
205207
publisher = connection.publisher("/queues/" + queue_name)
206208
for i in range(messages_to_publish):
207-
publisher.publish(AmqpMessage(body="test"))
209+
publisher.publish(Message(body="test"))
208210
except Exception:
209211
raised = True
210212

@@ -269,7 +271,7 @@ def on_disconnected():
269271
# simulate a disconnection
270272
delete_all_connections()
271273
try:
272-
publisher.publish(AmqpMessage(body="test"))
274+
publisher.publish(Message(body="test"))
273275

274276
except ConnectionClosed:
275277
disconnected = True
@@ -323,9 +325,9 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
323325

324326
for i in range(messages_to_send):
325327

326-
publisher.publish(AmqpMessage(body="test"))
328+
publisher.publish(Message(body="test"))
329+
327330

328-
'''
329331
def test_publish_per_message_exchange(connection: Connection) -> None:
330332

331333
exchange_name = "test-exchange-per-message"
@@ -338,16 +340,16 @@ def test_publish_per_message_exchange(connection: Connection) -> None:
338340
raised = False
339341

340342
publisher = None
341-
# accepted = False
343+
accepted = False
342344
accepted_2 = False
343345

344346
try:
345347
publisher = connection.publisher()
346-
# status = publish_per_message(
347-
# publisher, addr=AddressHelper.exchange_address(exchange_name, routing_key)
348-
# )
349-
# if status.remote_state == OutcomeState.ACCEPTED:
350-
# accepted = True
348+
status = publish_per_message(
349+
publisher, addr=AddressHelper.exchange_address(exchange_name, routing_key)
350+
)
351+
if status.remote_state == OutcomeState.ACCEPTED:
352+
accepted = True
351353
status = publish_per_message(
352354
publisher, addr=AddressHelper.queue_address(queue_name)
353355
)
@@ -366,8 +368,7 @@ def test_publish_per_message_exchange(connection: Connection) -> None:
366368

367369
management.close()
368370

369-
# assert accepted is True
371+
assert accepted is True
370372
assert accepted_2 is True
371-
assert purged_messages_queue == 1
373+
assert purged_messages_queue == 2
372374
assert raised is False
373-
'''

tests/utils.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
from typing import Optional
22

33
from rabbitmq_amqp_python_client import (
4-
AmqpMessage,
4+
AddressHelper,
55
BindingSpecification,
66
Connection,
77
Delivery,
88
ExchangeSpecification,
99
ExchangeType,
1010
Management,
11+
Message,
1112
Publisher,
1213
QuorumQueueSpecification,
1314
)
@@ -27,13 +28,13 @@ def publish_messages(
2728
publisher = connection.publisher("/queues/" + queue_name)
2829
# publish messages_to_send messages
2930
for i in range(messages_to_send):
30-
publisher.publish(AmqpMessage(body="test" + str(i), annotations=annotations))
31+
publisher.publish(Message(body="test" + str(i), annotations=annotations))
3132
publisher.close()
3233

3334

3435
def publish_per_message(publisher: Publisher, addr: str) -> Delivery:
35-
message = AmqpMessage(body="test")
36-
message.to_address(addr)
36+
message = Message(body="test")
37+
message = AddressHelper.message_to_address_helper(message, addr)
3738
status = publisher.publish(message)
3839
return status
3940

0 commit comments

Comments
 (0)