Skip to content

Commit 658af34

Browse files
author
DanielePalaia
committed
making Message a client class
1 parent eac025a commit 658af34

File tree

10 files changed

+62
-30
lines changed

10 files changed

+62
-30
lines changed

examples/getting_started/getting_started.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33

44
from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
55
AddressHelper,
6+
AmqpMessage,
67
AMQPMessagingHandler,
78
BindingSpecification,
89
Connection,
910
Environment,
1011
Event,
1112
ExchangeSpecification,
12-
Message,
1313
OutcomeState,
1414
QuorumQueueSpecification,
1515
)
@@ -125,7 +125,7 @@ def main() -> None:
125125
# publish 10 messages
126126
for i in range(MESSAGES_TO_PUBLISH):
127127
print("publishing")
128-
status = publisher.publish(Message(body="test"))
128+
status = publisher.publish(AmqpMessage(body="test"))
129129
if status.remote_state == OutcomeState.ACCEPTED:
130130
print("message accepted")
131131
elif status.remote_state == OutcomeState.RELEASED:

examples/reconnection/reconnection_example.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from rabbitmq_amqp_python_client import (
99
AddressHelper,
10+
AmqpMessage,
1011
AMQPMessagingHandler,
1112
BindingSpecification,
1213
Connection,
@@ -16,7 +17,6 @@
1617
Event,
1718
ExchangeSpecification,
1819
Management,
19-
Message,
2020
Publisher,
2121
QuorumQueueSpecification,
2222
)
@@ -123,7 +123,7 @@ def create_connection() -> Connection:
123123
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)
124124

125125
connection = environment.connection(
126-
url="amqp://guest:guest@localhost:5672/",
126+
uri="amqp://guest:guest@localhost:5672/",
127127
on_disconnection_handler=on_disconnection,
128128
)
129129
connection.dial()
@@ -191,7 +191,7 @@ def main() -> None:
191191
print("published 1000 messages...")
192192
try:
193193
if connection_configuration.publisher is not None:
194-
connection_configuration.publisher.publish(Message(body="test"))
194+
connection_configuration.publisher.publish(AmqpMessage(body="test"))
195195
except ConnectionClosed:
196196
print("publisher closing exception, resubmitting")
197197
continue

examples/streams/example_with_streams.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
44
AddressHelper,
5+
AmqpMessage,
56
AMQPMessagingHandler,
67
Connection,
78
Environment,
89
Event,
9-
Message,
1010
OffsetSpecification,
1111
StreamOptions,
1212
StreamSpecification,
@@ -120,15 +120,15 @@ def main() -> None:
120120
# publish with a filter of apple
121121
for i in range(MESSAGES_TO_PUBLISH):
122122
publisher.publish(
123-
Message(
123+
AmqpMessage(
124124
body="apple: " + str(i), annotations={"x-stream-filter-value": "apple"}
125125
)
126126
)
127127

128128
# publish with a filter of banana
129129
for i in range(MESSAGES_TO_PUBLISH):
130130
publisher.publish(
131-
Message(
131+
AmqpMessage(
132132
body="banana: " + str(i),
133133
annotations={"x-stream-filter-value": "banana"},
134134
)

examples/tls/tls_example.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33

44
from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert,
55
AddressHelper,
6+
AmqpMessage,
67
AMQPMessagingHandler,
78
BindingSpecification,
89
ClientCert,
910
Connection,
1011
Environment,
1112
Event,
1213
ExchangeSpecification,
13-
Message,
1414
QuorumQueueSpecification,
1515
SslConfigurationContext,
1616
)
@@ -124,7 +124,7 @@ def main() -> None:
124124

125125
# publish 10 messages
126126
for i in range(messages_to_publish):
127-
status = publisher.publish(Message(body="test"))
127+
status = publisher.publish(AmqpMessage(body="test"))
128128
if status.ACCEPTED:
129129
print("message accepted")
130130
elif status.RELEASED:

rabbitmq_amqp_python_client/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from .address_helper import AddressHelper
44
from .amqp_consumer_handler import AMQPMessagingHandler
5+
from .amqp_message import AmqpMessage
56
from .common import ExchangeType, QueueType
67
from .connection import Connection
78
from .consumer import Consumer
@@ -18,7 +19,6 @@
1819
from .qpid.proton._data import symbol # noqa: E402
1920
from .qpid.proton._delivery import Delivery, Disposition
2021
from .qpid.proton._events import Event
21-
from .qpid.proton._message import Message
2222
from .qpid.proton._utils import ConnectionClosed
2323
from .qpid.proton.handlers import MessagingHandler
2424
from .queues import (
@@ -52,7 +52,7 @@
5252
"BindingSpecification",
5353
"QueueType",
5454
"Publisher",
55-
"Message",
55+
"AmqpMessage",
5656
"Consumer",
5757
"MessagingHandler",
5858
"Event",
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from typing import Optional, Union, cast
2+
from uuid import UUID
3+
4+
from proton._data import Described
5+
6+
from .qpid.proton._message import Message
7+
8+
9+
class AmqpMessage(Message): # type: ignore
10+
11+
def __init__( # type: ignore
12+
self,
13+
body: Union[ # type: ignore
14+
bytes, str, dict, list, int, float, "UUID", "Described", None
15+
] = None,
16+
**kwargs,
17+
):
18+
super().__init__(body=body, **kwargs)
19+
self._addr: Optional[str] = None
20+
self._native_message = None
21+
22+
def to_address(self, addr: str) -> None:
23+
self._addr = addr
24+
25+
def address(self) -> Optional[str]:
26+
return self._addr
27+
28+
def native_message(self) -> Message:
29+
return cast(Message, self)

rabbitmq_amqp_python_client/consumer.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import logging
2-
from typing import Literal, Optional, Union
2+
from typing import Literal, Optional, Union, cast
33

4+
from .amqp_message import AmqpMessage
45
from .entities import StreamOptions
56
from .options import (
67
ReceiverOptionUnsettled,
78
ReceiverOptionUnsettledWithFilters,
89
)
910
from .qpid.proton._handlers import MessagingHandler
10-
from .qpid.proton._message import Message
1111
from .qpid.proton.utils import (
1212
BlockingConnection,
1313
BlockingReceiver,
@@ -38,9 +38,12 @@ def _open(self) -> None:
3838
logger.debug("Creating Sender")
3939
self._receiver = self._create_receiver(self._addr)
4040

41-
def consume(self, timeout: Union[None, Literal[False], float] = False) -> Message:
41+
def consume( # type: ignore
42+
self, timeout: Union[None, Literal[False], float] = False
43+
) -> AmqpMessage:
4244
if self._receiver is not None:
43-
return self._receiver.receive(timeout=timeout)
45+
message = self._receiver.receive(timeout=timeout)
46+
return cast(AmqpMessage, message)
4447

4548
def close(self) -> None:
4649
logger.debug("Closing the receiver")

rabbitmq_amqp_python_client/publisher.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import logging
22
from typing import Optional
33

4+
from .amqp_message import AmqpMessage
45
from .options import SenderOptionUnseattle
56
from .qpid.proton._delivery import Delivery
6-
from .qpid.proton._message import Message
77
from .qpid.proton.utils import (
88
BlockingConnection,
99
BlockingSender,
@@ -13,7 +13,7 @@
1313

1414

1515
class Publisher:
16-
def __init__(self, conn: BlockingConnection, addr: str):
16+
def __init__(self, conn: BlockingConnection, addr: str = ""):
1717
self._sender: Optional[BlockingSender] = None
1818
self._conn = conn
1919
self._addr = addr
@@ -24,9 +24,9 @@ def _open(self) -> None:
2424
logger.debug("Creating Sender")
2525
self._sender = self._create_sender(self._addr)
2626

27-
def publish(self, message: Message) -> Delivery:
27+
def publish(self, message: AmqpMessage) -> Delivery:
2828
if self._sender is not None:
29-
return self._sender.send(message)
29+
return self._sender.send(message.native_message())
3030

3131
def close(self) -> None:
3232
logger.debug("Closing Sender")

tests/test_publisher.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22

33
from rabbitmq_amqp_python_client import (
44
AddressHelper,
5+
AmqpMessage,
56
ArgumentOutOfRangeException,
67
BindingSpecification,
78
Connection,
89
ConnectionClosed,
910
Environment,
1011
ExchangeSpecification,
11-
Message,
1212
QuorumQueueSpecification,
1313
StreamSpecification,
1414
)
@@ -30,7 +30,7 @@ def test_publish_queue(connection: Connection) -> None:
3030

3131
try:
3232
publisher = connection.publisher("/queues/" + queue_name)
33-
status = publisher.publish(Message(body="test"))
33+
status = publisher.publish(AmqpMessage(body="test"))
3434
if status.ACCEPTED:
3535
accepted = True
3636
except Exception:
@@ -57,7 +57,7 @@ def test_publish_ssl(connection_ssl: Connection) -> None:
5757

5858
try:
5959
publisher = connection_ssl.publisher("/queues/" + queue_name)
60-
publisher.publish(Message(body="test"))
60+
publisher.publish(AmqpMessage(body="test"))
6161
except Exception:
6262
raised = True
6363

@@ -78,7 +78,7 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
7878
publisher = None
7979
try:
8080
publisher = connection.publisher("/invalid-destination/" + queue_name)
81-
publisher.publish(Message(body="test"))
81+
publisher.publish(AmqpMessage(body="test"))
8282
except ArgumentOutOfRangeException:
8383
raised = True
8484
except Exception:
@@ -116,7 +116,7 @@ def test_publish_exchange(connection: Connection) -> None:
116116

117117
try:
118118
publisher = connection.publisher(addr)
119-
status = publisher.publish(Message(body="test"))
119+
status = publisher.publish(AmqpMessage(body="test"))
120120
if status.ACCEPTED:
121121
accepted = True
122122
except Exception:
@@ -145,7 +145,7 @@ def test_publish_purge(connection: Connection) -> None:
145145
try:
146146
publisher = connection.publisher("/queues/" + queue_name)
147147
for i in range(messages_to_publish):
148-
publisher.publish(Message(body="test"))
148+
publisher.publish(AmqpMessage(body="test"))
149149
except Exception:
150150
raised = True
151151

@@ -210,7 +210,7 @@ def on_disconnected():
210210
# simulate a disconnection
211211
delete_all_connections()
212212
try:
213-
publisher.publish(Message(body="test"))
213+
publisher.publish(AmqpMessage(body="test"))
214214

215215
except ConnectionClosed:
216216
disconnected = True
@@ -264,4 +264,4 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
264264

265265
for i in range(messages_to_send):
266266

267-
publisher.publish(Message(body="test"))
267+
publisher.publish(AmqpMessage(body="test"))

tests/utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
from typing import Optional
22

33
from rabbitmq_amqp_python_client import (
4+
AmqpMessage,
45
BindingSpecification,
56
Connection,
67
ExchangeSpecification,
78
ExchangeType,
89
Management,
9-
Message,
1010
QuorumQueueSpecification,
1111
)
1212

@@ -25,7 +25,7 @@ def publish_messages(
2525
publisher = connection.publisher("/queues/" + queue_name)
2626
# publish messages_to_send messages
2727
for i in range(messages_to_send):
28-
publisher.publish(Message(body="test" + str(i), annotations=annotations))
28+
publisher.publish(AmqpMessage(body="test" + str(i), annotations=annotations))
2929
publisher.close()
3030

3131

0 commit comments

Comments
 (0)