Skip to content

Commit 6bd36fd

Browse files
author
DanielePalaia
committed
implementing Declare Exchange/Queue
1 parent 818ea5f commit 6bd36fd

File tree

6 files changed

+76
-22
lines changed

6 files changed

+76
-22
lines changed

examples/getting_started/main.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
from rabbitmq_amqp_python_client import (
44
Connection,
55
ExchangeSpecification,
6+
QueueSpecification,
7+
QueueType,
68
)
79

810

911
def main():
1012
exchange_name = "example-exchange"
13+
queue_name = "example-queue"
1114
connection = Connection("amqp://guest:guest@localhost:5672/")
1215

1316
connection.dial()
@@ -18,12 +21,9 @@ def main():
1821
ExchangeSpecification(name=exchange_name, arguments={})
1922
)
2023

21-
"""
22-
queue_info = management.declare_queue(QueueSpecification{
23-
name: queue_name,
24-
queue_type: QueueType{type: Quorum},
25-
})
26-
"""
24+
queue_info = management.declare_queue(
25+
QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={})
26+
)
2727

2828
"""
2929
#management.bind(BindingSpecification{
@@ -67,7 +67,9 @@ def main():
6767
management.delete_exchange(exchange_info.name)
6868
"""
6969

70-
# connection.close()
70+
management.close()
71+
72+
connection.close()
7173

7274

7375
if __name__ == "__main__":

rabbitmq_amqp_python_client/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
from importlib import metadata
22

3+
from .common import QueueType
34
from .connection import Connection
4-
from .entities import ExchangeSpecification
5+
from .entities import (
6+
ExchangeSpecification,
7+
QueueSpecification,
8+
)
59

610
try:
711
__version__ = metadata.version(__package__)
@@ -15,4 +19,6 @@
1519
__all__ = [
1620
"Connection",
1721
"ExchangeSpecification",
22+
"QueueSpecification",
23+
"QueueType",
1824
]

rabbitmq_amqp_python_client/address_helper.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,9 @@ def exchange_address(name: str) -> str:
22
path = "/exchanges/" + name
33

44
return path
5+
6+
7+
def queue_address(name: str) -> str:
8+
path = "/queues/" + name
9+
10+
return path

rabbitmq_amqp_python_client/common.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@ class CommonValues(enum.Enum):
2020
bindings = "bindings"
2121

2222

23-
class ExchangeTypes(enum.Enum):
23+
class ExchangeType(enum.Enum):
2424
direct = "direct"
2525
topic = "topic"
2626
fanout = "fanout"
27+
28+
29+
class QueueType(enum.Enum):
30+
quorum = "quorum"
31+
classic = "classic"
32+
stream = "stream"

rabbitmq_amqp_python_client/entities.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
1-
import enum
2-
from collections import defaultdict
31
from dataclasses import dataclass
2+
from typing import Optional
43

5-
6-
class ExchangeType(enum.Enum):
7-
direct = "direct"
8-
topic = "topic"
9-
fanout = "fanout"
4+
from .common import ExchangeType, QueueType
105

116

127
@dataclass
138
class ExchangeSpecification:
149
name: str
1510
arguments: dict
11+
exchange_type: ExchangeType = ExchangeType.direct
12+
is_auto_delete: bool = False
13+
is_durable: bool = True
14+
15+
16+
@dataclass
17+
class QueueSpecification:
18+
name: str
19+
arguments: dict
20+
queue_type: QueueType = QueueType.quorum
21+
dead_letter_routing_key: str = ""
22+
is_exclusive: Optional[bool] = None
23+
max_len_bytes: Optional[int] = None
24+
dead_letter_exchange: str = ""
1625
is_auto_delete: bool = False
1726
is_durable: bool = True
18-
exchange_type: ExchangeType = ExchangeType.direct

rabbitmq_amqp_python_client/management.py

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@
88
BlockingSender,
99
)
1010

11-
from .address_helper import exchange_address
11+
from .address_helper import exchange_address, queue_address
1212
from .common import CommonValues
1313
from .configuration_options import (
1414
ReceiverOption,
1515
SenderOption,
1616
)
17-
from .entities import ExchangeSpecification
17+
from .entities import (
18+
ExchangeSpecification,
19+
QueueSpecification,
20+
)
1821

1922

2023
class Management:
@@ -85,9 +88,6 @@ def _request(
8588

8689
# TO_COMPLETE HERE
8790

88-
# TODO
89-
# def declare_queue(self, name:str):
90-
9191
# TODO
9292
# def delete_queue(self, name:str):
9393

@@ -114,6 +114,32 @@ def declare_exchange(self, exchange_specification: ExchangeSpecification):
114114
],
115115
)
116116

117+
def declare_queue(self, queue_specification: QueueSpecification):
118+
body = {}
119+
body["auto_delete"] = queue_specification.is_auto_delete
120+
body["durable"] = queue_specification.is_durable
121+
body["arguments"] = {
122+
"x-queue-type": queue_specification.queue_type.value,
123+
"x-dead-letter-exchange": queue_specification.dead_letter_exchange,
124+
"x-dead-letter-routing-key": queue_specification.dead_letter_routing_key,
125+
"max-length-bytes": queue_specification.max_len_bytes,
126+
}
127+
128+
path = queue_address(queue_specification.name)
129+
130+
print(path)
131+
132+
self.request(
133+
body,
134+
path,
135+
CommonValues.command_put.value,
136+
[
137+
CommonValues.response_code_201.value,
138+
CommonValues.response_code_204.value,
139+
CommonValues.response_code_409.value,
140+
],
141+
)
142+
117143
# TODO
118144
# def delete_exchange(self, name:str):
119145

0 commit comments

Comments
 (0)