Skip to content

Commit 3ee087a

Browse files
authored
Set message durable as default (#71)
* set message durable as default Add limitation due to #83 --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent d3ca042 commit 3ee087a

File tree

5 files changed

+49
-19
lines changed

5 files changed

+49
-19
lines changed

.ci/ubuntu/rabbitmq.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ listeners.tcp.default = 5672
1111
listeners.ssl.default = 5671
1212
reverse_dns_lookups = false
1313

14-
deprecated_features.permit.amqp_address_v1 = false
14+
# deprecated_features.permit.amqp_address_v1 = false
1515

1616
ssl_options.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem
1717
ssl_options.certfile = /etc/rabbitmq/certs/server_localhost_certificate.pem

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ Inside the [examples](./examples) folder you can find a set of examples that sho
1515

1616
[Client Guide](https://www.rabbitmq.com/client-libraries/amqp-client-libraries) select the python section.
1717

18+
### Client Limitations
19+
It is not possible to set message durable to False because of this [issue](https://github.com/rabbitmq/rabbitmq-amqp-python-client/issues/83)
20+
1821

1922
### Build
2023

@@ -33,8 +36,7 @@ To run TLS you need to:
3336
- export CFLAGS="-I/usr/local/opt/openssl/include"; pip install python-qpid-proton --verbose --no-cache-dir
3437
```
3538

36-
Read more about the issue [here](https://stackoverflow.com/questions/44979947/python-qpid-proton-for-mac-using-amqps
37-
)
39+
Read more about the issue [here](https://stackoverflow.com/questions/44979947/python-qpid-proton-for-mac-using-amqps)
3840

3941

4042

rabbitmq_amqp_python_client/management.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,19 +125,20 @@ def request(
125125

126126
def _request(
127127
self,
128-
id: str,
128+
msg_id: str,
129129
body: Any,
130130
path: str,
131131
method: str,
132132
expected_response_codes: list[int],
133133
) -> Message:
134134
amq_message = Message(
135-
id=id,
135+
id=msg_id,
136136
body=body,
137137
inferred=False,
138138
reply_to="$me",
139139
address=path,
140140
subject=method,
141+
durable=False,
141142
)
142143

143144
if self._sender is not None:
@@ -170,9 +171,7 @@ def declare_exchange(
170171
ValidationCodeException: If exchange already exists or other validation fails
171172
"""
172173
logger.debug("declare_exchange operation called")
173-
body: dict[str, Any] = {}
174-
body["auto_delete"] = exchange_specification.is_auto_delete
175-
body["durable"] = exchange_specification.is_durable
174+
body: dict[str, Any] = {"durable": exchange_specification.is_durable}
176175
if isinstance(exchange_specification, ExchangeSpecification):
177176
body["type"] = exchange_specification.exchange_type.value
178177
elif isinstance(exchange_specification, ExchangeCustomSpecification):

rabbitmq_amqp_python_client/qpid/proton/_message.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class Message(object):
110110
""" Default AMQP message priority"""
111111

112112
def __init__(
113-
self, body: Union[bytes, None] = None, inferred=True, **kwargs
113+
self, body: Union[bytes, None] = None, inferred=True, durable=True, **kwargs
114114
) -> None:
115115
# validate the types
116116

@@ -120,6 +120,7 @@ def __init__(
120120
self.application_properties = None
121121
self.body = body
122122
self.inferred = inferred
123+
self.durable = durable
123124

124125
for k, v in kwargs.items():
125126
getattr(self, k) # Raise exception if it's not a valid attribute.

tests/test_publisher.py

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ def test_validate_message_for_publishing(connection: Connection) -> None:
4545

4646

4747
def test_publish_queue(connection: Connection) -> None:
48-
4948
queue_name = "test-queue"
5049
management = connection.management()
5150

@@ -77,7 +76,6 @@ def test_publish_queue(connection: Connection) -> None:
7776

7877

7978
def test_publish_per_message(connection: Connection) -> None:
80-
8179
queue_name = "test-queue-1"
8280
queue_name_2 = "test-queue-2"
8381
management = connection.management()
@@ -123,7 +121,6 @@ def test_publish_per_message(connection: Connection) -> None:
123121

124122

125123
def test_publish_ssl(connection_ssl: Connection) -> None:
126-
127124
queue_name = "test-queue"
128125
management = connection_ssl.management()
129126

@@ -148,7 +145,6 @@ def test_publish_ssl(connection_ssl: Connection) -> None:
148145

149146

150147
def test_publish_to_invalid_destination(connection: Connection) -> None:
151-
152148
queue_name = "test-queue"
153149

154150
raised = False
@@ -169,7 +165,6 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
169165

170166

171167
def test_publish_per_message_to_invalid_destination(connection: Connection) -> None:
172-
173168
queue_name = "test-queue-1"
174169
raised = False
175170

@@ -193,7 +188,6 @@ def test_publish_per_message_to_invalid_destination(connection: Connection) -> N
193188

194189

195190
def test_publish_per_message_both_address(connection: Connection) -> None:
196-
197191
queue_name = "test-queue-1"
198192
raised = False
199193

@@ -223,7 +217,6 @@ def test_publish_per_message_both_address(connection: Connection) -> None:
223217

224218

225219
def test_publish_exchange(connection: Connection) -> None:
226-
227220
exchange_name = "test-exchange"
228221
queue_name = "test-queue"
229222
management = connection.management()
@@ -342,7 +335,6 @@ def test_disconnection_reconnection() -> None:
342335

343336

344337
def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
345-
346338
stream_name = "test_stream_info_with_validation"
347339
messages_to_send = 200
348340

@@ -361,7 +353,6 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
361353

362354

363355
def test_publish_per_message_exchange(connection: Connection) -> None:
364-
365356
exchange_name = "test-exchange-per-message"
366357
queue_name = "test-queue-per-message"
367358
management = connection.management()
@@ -407,7 +398,6 @@ def test_publish_per_message_exchange(connection: Connection) -> None:
407398

408399

409400
def test_multiple_publishers(environment: Environment) -> None:
410-
411401
stream_name = "test_multiple_publisher_1"
412402
stream_name_2 = "test_multiple_publisher_2"
413403
connection = environment.connection()
@@ -456,3 +446,41 @@ def test_multiple_publishers(environment: Environment) -> None:
456446
management.delete_queue(stream_name_2)
457447

458448
management.close()
449+
450+
451+
def test_durable_message(connection: Connection) -> None:
452+
queue_name = "test_durable_message"
453+
454+
management = connection.management()
455+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
456+
destination = AddressHelper.queue_address(queue_name)
457+
publisher = connection.publisher(destination)
458+
# message should be durable by default
459+
status = publisher.publish(
460+
Message(
461+
body=Converter.string_to_bytes("durable"),
462+
)
463+
)
464+
465+
assert status.remote_state == OutcomeState.ACCEPTED
466+
# message should be not durable by setting the durable to False by the user
467+
468+
m = Message(
469+
body=Converter.string_to_bytes("not durable"),
470+
durable=False,
471+
)
472+
status = publisher.publish(m)
473+
474+
assert status.remote_state == OutcomeState.ACCEPTED
475+
476+
consumer = connection.consumer(destination)
477+
should_be_durable = consumer.consume()
478+
assert should_be_durable.durable is True
479+
consumer.close()
480+
# it does not work due of https://github.com/rabbitmq/rabbitmq-amqp-python-client/issues/83
481+
# should_be_not_durable = consumer.consume()
482+
# assert should_be_not_durable.durable is False
483+
# message_count = management.purge_queue(queue_name)
484+
management.purge_queue(queue_name)
485+
# assert message_count == 0
486+
management.delete_queue(queue_name)

0 commit comments

Comments
 (0)