Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ poetry run python ./examples/getting_started/main.py

### Creating a connection

A connection to the RabbitMQ AMQP 1.0 server can be established using the Connection object.
A connection to the RabbitMQ AMQP 1.0 server can be established using the Environment object.

For example:

```python
connection = Connection("amqp://guest:guest@localhost:5672/")
environment = Environment()
connection = environment.connection("amqp://guest:guest@localhost:5672/")
connection.dial()
```

Expand Down
31 changes: 22 additions & 9 deletions examples/getting_started/basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
AMQPMessagingHandler,
BindingSpecification,
Connection,
Disposition,
Environment,
Event,
ExchangeSpecification,
Message,
OutcomeState,
QuorumQueueSpecification,
)

Expand Down Expand Up @@ -61,8 +62,19 @@ def on_link_closed(self, event: Event) -> None:
print("link closed")


def create_connection() -> Connection:
connection = Connection("amqp://guest:guest@localhost:5672/")
def create_connection(environment: Environment) -> Connection:
connection = environment.connection("amqp://guest:guest@localhost:5672/")
# in case of SSL enablement
# ca_cert_file = ".ci/certs/ca_certificate.pem"
# client_cert = ".ci/certs/client_certificate.pem"
# client_key = ".ci/certs/client_key.pem"
# connection = Connection(
# "amqps://guest:guest@localhost:5671/",
# ssl_context=SslConfigurationContext(
# ca_cert=ca_cert_file,
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
# ),
# )
connection.dial()

return connection
Expand All @@ -75,12 +87,13 @@ def main() -> None:
routing_key = "routing-key"

print("connection to amqp server")
connection = create_connection()
environment = Environment()
connection = create_connection(environment)

management = connection.management()

print("declaring exchange and queue")
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
management.declare_exchange(ExchangeSpecification(name=exchange_name))

management.declare_queue(
QuorumQueueSpecification(name=queue_name)
Expand Down Expand Up @@ -113,11 +126,11 @@ def main() -> None:
for i in range(MESSAGES_TO_PUBLISH):
print("publishing")
status = publisher.publish(Message(body="test"))
if status.remote_state == Disposition.ACCEPTED:
if status.remote_state == OutcomeState.ACCEPTED:
print("message accepted")
elif status.remote_state == Disposition.RELEASED:
elif status.remote_state == OutcomeState.RELEASED:
print("message not routed")
elif status.remote_state == Disposition.REJECTED:
elif status.remote_state == OutcomeState.REJECTED:
print("message not rejected")

publisher.close()
Expand Down Expand Up @@ -150,7 +163,7 @@ def main() -> None:
print("closing connections")
management.close()
print("after management closing")
connection.close()
environment.close()
print("after connection closing")


Expand Down
23 changes: 18 additions & 5 deletions examples/getting_started/example_with_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
AddressHelper,
AMQPMessagingHandler,
Connection,
Environment,
Event,
Message,
OffsetSpecification,
Expand Down Expand Up @@ -65,8 +66,19 @@ def on_link_closed(self, event: Event) -> None:
print("link closed")


def create_connection() -> Connection:
connection = Connection("amqp://guest:guest@localhost:5672/")
def create_connection(environment: Environment) -> Connection:
connection = environment.connection("amqp://guest:guest@localhost:5672/")
# in case of SSL enablement
# ca_cert_file = ".ci/certs/ca_certificate.pem"
# client_cert = ".ci/certs/client_certificate.pem"
# client_key = ".ci/certs/client_key.pem"
# connection = Connection(
# "amqps://guest:guest@localhost:5671/",
# ssl_context=SslConfigurationContext(
# ca_cert=ca_cert_file,
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
# ),
# )
connection.dial()

return connection
Expand All @@ -76,15 +88,16 @@ def main() -> None:
queue_name = "example-queue"

print("connection to amqp server")
connection = create_connection()
environment = Environment()
connection = create_connection(environment)

management = connection.management()

management.declare_queue(StreamSpecification(name=queue_name))

addr_queue = AddressHelper.queue_address(queue_name)

consumer_connection = create_connection()
consumer_connection = create_connection(environment)

stream_filter_options = StreamOptions()
# can be first, last, next or an offset long
Expand Down Expand Up @@ -135,7 +148,7 @@ def main() -> None:
print("closing connections")
management.close()
print("after management closing")
connection.close()
environment.close()
print("after connection closing")


Expand Down
12 changes: 8 additions & 4 deletions examples/getting_started/reconnection_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Connection,
ConnectionClosed,
Consumer,
Environment,
Event,
ExchangeSpecification,
Management,
Expand All @@ -20,6 +21,8 @@
QuorumQueueSpecification,
)

environment = Environment()


# here we keep track of the objects we need to reconnect
@dataclass
Expand Down Expand Up @@ -118,8 +121,9 @@ def create_connection() -> Connection:
# "amqp://ha_tls-rabbit_node2-1:5602/",
# ]
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)
connection = Connection(
uri="amqp://guest:guest@localhost:5672/",

connection = environment.connection(
url="amqp://guest:guest@localhost:5672/",
on_disconnection_handler=on_disconnection,
)
connection.dial()
Expand All @@ -146,7 +150,7 @@ def main() -> None:

print("declaring exchange and queue")
connection_configuration.management.declare_exchange(
ExchangeSpecification(name=exchange_name, arguments={})
ExchangeSpecification(name=exchange_name)
)

connection_configuration.management.declare_queue(
Expand Down Expand Up @@ -242,7 +246,7 @@ def main() -> None:
print("closing connections")
connection_configuration.management.close()
print("after management closing")
connection_configuration.connection.close()
environment.close()
print("after connection closing")


Expand Down
12 changes: 7 additions & 5 deletions examples/getting_started/tls_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
BindingSpecification,
ClientCert,
Connection,
Environment,
Event,
ExchangeSpecification,
Message,
Expand Down Expand Up @@ -62,12 +63,12 @@ def on_link_closed(self, event: Event) -> None:
print("link closed")


def create_connection() -> Connection:
def create_connection(environment: Environment) -> Connection:
# in case of SSL enablement
ca_cert_file = ".ci/certs/ca_certificate.pem"
client_cert = ".ci/certs/client_certificate.pem"
client_key = ".ci/certs/client_key.pem"
connection = Connection(
connection = environment.connection(
"amqps://guest:guest@localhost:5671/",
ssl_context=SslConfigurationContext(
ca_cert=ca_cert_file,
Expand All @@ -84,14 +85,15 @@ def main() -> None:
exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"
environment = Environment()

print("connection to amqp server")
connection = create_connection()
connection = create_connection(environment)

management = connection.management()

print("declaring exchange and queue")
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
management.declare_exchange(ExchangeSpecification(name=exchange_name))

management.declare_queue(
QuorumQueueSpecification(name=queue_name)
Expand Down Expand Up @@ -160,7 +162,7 @@ def main() -> None:
print("closing connections")
management.close()
print("after management closing")
connection.close()
environment.close()
print("after connection closing")


Expand Down
7 changes: 5 additions & 2 deletions rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
OffsetSpecification,
StreamOptions,
)
from .environment import Environment
from .exceptions import ArgumentOutOfRangeException
from .management import Management
from .publisher import Publisher
Expand Down Expand Up @@ -39,6 +40,8 @@

del metadata

OutcomeState = Disposition

__all__ = [
"Connection",
"Management",
Expand All @@ -61,9 +64,9 @@
"ArgumentOutOfRangeException",
"SslConfigurationContext",
"ClientCert",
"Delivery",
"ConnectionClosed",
"StreamOptions",
"OffsetSpecification",
"Disposition",
"OutcomeState",
"Environment",
]
9 changes: 8 additions & 1 deletion rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ def __init__(
self._on_disconnection_handler = on_disconnection_handler
self._conf_ssl_context: Optional[SslConfigurationContext] = ssl_context
self._ssl_domain = None
self._connections = [] # type: ignore
self._index: int = -1

def _set_environment_connection_list(self, connections: []): # type: ignore
self._connections = connections

def dial(self) -> None:
logger.debug("Establishing a connection to the amqp server")
Expand Down Expand Up @@ -72,9 +77,11 @@ def management(self) -> Management:
return self._management

# closes the connection to the AMQP 1.0 server.
def close(self) -> None:
# This method should be called just from Environment and not from the user
def _close(self) -> None:
logger.debug("Closing connection")
self._conn.close()
self._connections.remove(self)

def publisher(self, destination: str) -> Publisher:
if validate_address(destination) is False:
Expand Down
6 changes: 3 additions & 3 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional, Union

Expand All @@ -13,7 +13,7 @@
@dataclass
class ExchangeSpecification:
name: str
arguments: dict[str, str]
arguments: dict[str, str] = field(default_factory=dict)
exchange_type: ExchangeType = ExchangeType.direct
is_auto_delete: bool = False
is_internal: bool = False
Expand All @@ -24,7 +24,7 @@ class ExchangeSpecification:
class QueueInfo:
name: str
arguments: dict[str, Any]
queue_type: QueueType = QueueType.quorum
queue_type: QueueType = QueueType.classic
is_exclusive: Optional[bool] = None
is_auto_delete: bool = False
is_durable: bool = True
Expand Down
46 changes: 46 additions & 0 deletions rabbitmq_amqp_python_client/environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# For the moment this is just a Connection pooler to keep compatibility with other clients
import logging
from typing import Annotated, Callable, Optional, TypeVar

from .connection import Connection
from .ssl_configuration import SslConfigurationContext

logger = logging.getLogger(__name__)

MT = TypeVar("MT")
CB = Annotated[Callable[[MT], None], "Message callback type"]


class Environment:

def __init__(self): # type: ignore

self._connections: list[Connection] = []

def connection(
self,
# single-node mode
uri: Optional[str] = None,
# multi-node mode
uris: Optional[list[str]] = None,
ssl_context: Optional[SslConfigurationContext] = None,
on_disconnection_handler: Optional[CB] = None, # type: ignore
) -> Connection:
connection = Connection(
uri=uri,
uris=uris,
ssl_context=ssl_context,
on_disconnection_handler=on_disconnection_handler,
)
logger.debug("Environment: Creating and returning a new connection")
self._connections.append(connection)
connection._set_environment_connection_list(self._connections)
return connection

def close(self) -> None:
logger.debug("Environment: Closing all pending connections")
for connection in self._connections:
connection._close()

def connections(self) -> list[Connection]:
return self._connections
2 changes: 2 additions & 0 deletions rabbitmq_amqp_python_client/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,4 +369,6 @@ def queue_info(self, name: str) -> QueueInfo:
leader=queue_info["leader"],
members=queue_info["replicas"],
arguments=queue_info["arguments"],
message_count=queue_info["message_count"],
consumer_count=queue_info["consumer_count"],
)
2 changes: 1 addition & 1 deletion rabbitmq_amqp_python_client/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ClassicQueueSpecification(QueueSpecification):

@dataclass
class QuorumQueueSpecification(QueueSpecification):
deliver_limit: Optional[str] = None
deliver_limit: Optional[int] = None
dead_letter_strategy: Optional[str] = None
quorum_initial_group_size: Optional[int] = None
cluster_target_size: Optional[int] = None
Expand Down
Loading