Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .connection import Connection
from .consumer import Consumer
from .entities import (
ExchangeCustomSpecification,
ExchangeSpecification,
ExchangeToExchangeBindingSpecification,
ExchangeToQueueBindingSpecification,
Expand Down Expand Up @@ -75,4 +76,5 @@
"OffsetSpecification",
"OutcomeState",
"Environment",
"ExchangeCustomSpecification",
]
1 change: 1 addition & 0 deletions rabbitmq_amqp_python_client/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class ExchangeType(enum.Enum):
direct = "direct"
topic = "topic"
fanout = "fanout"
headers = "headers"


class QueueType(enum.Enum):
Expand Down
10 changes: 10 additions & 0 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ class ExchangeSpecification:
is_durable: bool = True


@dataclass
class ExchangeCustomSpecification:
name: str
exchange_type: str
arguments: dict[str, str] = field(default_factory=dict)
is_auto_delete: bool = False
is_internal: bool = False
is_durable: bool = True


@dataclass
class QueueInfo:
name: str
Expand Down
17 changes: 12 additions & 5 deletions rabbitmq_amqp_python_client/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .address_helper import AddressHelper
from .common import CommonValues, QueueType
from .entities import (
ExchangeCustomSpecification,
ExchangeSpecification,
ExchangeToExchangeBindingSpecification,
ExchangeToQueueBindingSpecification,
Expand Down Expand Up @@ -98,15 +99,21 @@ def _request(
return msg

def declare_exchange(
self, exchange_specification: ExchangeSpecification
) -> ExchangeSpecification:
self,
exchange_specification: Union[
ExchangeSpecification, ExchangeCustomSpecification
],
) -> Union[ExchangeSpecification, ExchangeCustomSpecification]:
logger.debug("declare_exchange operation called")
body = {}
body: dict[str, Any] = {}
body["auto_delete"] = exchange_specification.is_auto_delete
body["durable"] = exchange_specification.is_durable
body["type"] = exchange_specification.exchange_type.value # type: ignore
if isinstance(exchange_specification, ExchangeSpecification):
body["type"] = exchange_specification.exchange_type.value
elif isinstance(exchange_specification, ExchangeCustomSpecification):
body["type"] = exchange_specification.exchange_type
body["internal"] = exchange_specification.is_internal
body["arguments"] = exchange_specification.arguments # type: ignore
body["arguments"] = exchange_specification.arguments

path = AddressHelper.exchange_address(exchange_specification.name)

Expand Down
13 changes: 13 additions & 0 deletions tests/test_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ def test_declare_delete_exchange(management: Management) -> None:
management.delete_exchange(exchange_name)


def test_declare_delete_exchange_headers(management: Management) -> None:

exchange_name = "test-exchange"

exchange_info = management.declare_exchange(
ExchangeSpecification(name=exchange_name, exchange_type=ExchangeType.headers)
)

assert exchange_info.name == exchange_name

management.delete_exchange(exchange_name)


def test_declare_delete_exchange_with_args(management: Management) -> None:

exchange_name = "test-exchange-with-args"
Expand Down