Skip to content

Commit d7e6a9a

Browse files
author
DanielePalaia
committed
review queue arguments
1 parent 22c8f32 commit d7e6a9a

File tree

3 files changed

+178
-60
lines changed

3 files changed

+178
-60
lines changed

rabbitmq_amqp_python_client/management.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -163,31 +163,36 @@ def _declare_queue(
163163
body = {}
164164
args: dict[str, Any] = {}
165165

166-
body["auto_delete"] = queue_specification.is_auto_delete
167-
body["durable"] = queue_specification.is_durable
168-
169166
if queue_specification.dead_letter_exchange is not None:
170167
args["x-dead-letter-exchange"] = queue_specification.dead_letter_exchange
171168
if queue_specification.dead_letter_routing_key is not None:
172169
args["x-dead-letter-routing-key"] = (
173170
queue_specification.dead_letter_routing_key
174171
)
175-
if queue_specification.overflow is not None:
176-
args["x-overflow"] = queue_specification.overflow
172+
if queue_specification.overflow_behaviour is not None:
173+
args["x-overflow"] = queue_specification.overflow_behaviour
177174
if queue_specification.max_len is not None:
178175
args["x-max-length"] = queue_specification.max_len
179176
if queue_specification.max_len_bytes is not None:
180177
args["x-max-length-bytes"] = queue_specification.max_len_bytes
181178
if queue_specification.message_ttl is not None:
182-
args["x-message-ttl"] = queue_specification.message_ttl
183-
if queue_specification.expires is not None:
184-
args["x-expires"] = queue_specification.expires
179+
args["x-message-ttl"] = int(
180+
queue_specification.message_ttl.total_seconds() * 1000
181+
)
182+
if queue_specification.auto_expires is not None:
183+
args["x-expires"] = int(
184+
queue_specification.auto_expires.total_seconds() * 1000
185+
)
185186
if queue_specification.single_active_consumer is not None:
186187
args["x-single-active-consumer"] = (
187188
queue_specification.single_active_consumer
188189
)
189190

190191
if isinstance(queue_specification, ClassicQueueSpecification):
192+
body["auto_delete"] = queue_specification.is_auto_delete
193+
body["durable"] = queue_specification.is_durable
194+
body["exclusive"] = queue_specification.is_exclusive
195+
191196
args["x-queue-type"] = QueueType.classic.value
192197
if queue_specification.maximum_priority is not None:
193198
args["x-maximum-priority"] = queue_specification.maximum_priority
@@ -227,21 +232,23 @@ def _declare_stream(
227232
args["x-max-length-bytes"] = stream_specification.max_len_bytes
228233

229234
if stream_specification.max_time_retention is not None:
230-
args["x-max-time-retention"] = stream_specification.max_time_retention
235+
args["x-time-retention"] = (
236+
stream_specification.max_time_retention.total_seconds() * 1000
237+
)
231238

232239
if stream_specification.max_segment_size_in_bytes is not None:
233-
args["x-max-segment-size-in-bytes"] = (
240+
args["x-stream-max-segment-size-bytes"] = (
234241
stream_specification.max_segment_size_in_bytes
235242
)
236243

237244
if stream_specification.filter_size is not None:
238-
args["x-filter-size"] = stream_specification.filter_size
245+
args["x-stream-filter-size-bytes"] = stream_specification.filter_size
239246

240247
if stream_specification.initial_group_size is not None:
241248
args["x-initial-group-size"] = stream_specification.initial_group_size
242249

243250
if stream_specification.leader_locator is not None:
244-
args["x-leader-locator"] = stream_specification.leader_locator
251+
args["x-queue-leader-locator"] = stream_specification.leader_locator
245252

246253
body["arguments"] = args
247254

@@ -276,7 +283,6 @@ def delete_queue(self, name: str) -> None:
276283
def _validate_reponse_code(
277284
self, response_code: int, expected_response_codes: list[int]
278285
) -> None:
279-
logger.debug("response_code received: " + str(response_code))
280286
if response_code == CommonValues.response_code_409.value:
281287
raise ValidationCodeException("ErrPreconditionFailed")
282288

rabbitmq_amqp_python_client/queues.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,28 @@
11
from dataclasses import dataclass
2+
from datetime import timedelta
23
from typing import Optional
34

45

56
@dataclass
67
class QueueSpecification:
78
name: str
8-
expires: Optional[int] = None
9-
message_ttl: Optional[int] = None
10-
overflow: Optional[str] = None
9+
auto_expires: Optional[timedelta] = None
10+
message_ttl: Optional[timedelta] = None
11+
overflow_behaviour: Optional[str] = None
1112
single_active_consumer: Optional[bool] = None
1213
dead_letter_exchange: Optional[str] = None
1314
dead_letter_routing_key: Optional[str] = None
1415
max_len: Optional[int] = None
1516
max_len_bytes: Optional[int] = None
1617
leader_locator: Optional[str] = None
17-
is_auto_delete: bool = False
18-
is_durable: bool = True
1918

2019

2120
@dataclass
2221
class ClassicQueueSpecification(QueueSpecification):
2322
maximum_priority: Optional[int] = None
23+
is_auto_delete: bool = False
24+
is_exclusive: bool = False
25+
is_durable: bool = True
2426

2527

2628
@dataclass
@@ -35,7 +37,7 @@ class QuorumQueueSpecification(QueueSpecification):
3537
class StreamSpecification:
3638
name: str
3739
max_len_bytes: Optional[int] = None
38-
max_time_retention: Optional[int] = None
40+
max_time_retention: Optional[timedelta] = None
3941
max_segment_size_in_bytes: Optional[int] = None
4042
filter_size: Optional[int] = None
4143
initial_group_size: Optional[int] = None

0 commit comments

Comments
 (0)