Skip to content

Commit 62949a3

Browse files
author
DanielePalaia
committed
Introducing Environment
1 parent a1f3840 commit 62949a3

File tree

14 files changed

+219
-76
lines changed

14 files changed

+219
-76
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,13 @@ poetry run python ./examples/getting_started/main.py
3838

3939
### Creating a connection
4040

41-
A connection to the RabbitMQ AMQP 1.0 server can be established using the Connection object.
41+
A connection to the RabbitMQ AMQP 1.0 server can be established using the Environment object.
4242

4343
For example:
4444

4545
```python
46-
connection = Connection("amqp://guest:guest@localhost:5672/")
46+
environment = Environment()
47+
connection = environment.connection("amqp://guest:guest@localhost:5672/")
4748
connection.dial()
4849
```
4950

examples/getting_started/basic_example.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
ExchangeSpecification,
1212
Message,
1313
QuorumQueueSpecification,
14+
Environment,
1415
)
1516

1617
MESSAGES_TO_PUBLISH = 100
@@ -61,8 +62,19 @@ def on_link_closed(self, event: Event) -> None:
6162
print("link closed")
6263

6364

64-
def create_connection() -> Connection:
65-
connection = Connection("amqp://guest:guest@localhost:5672/")
65+
def create_connection(environment: Environment) -> Connection:
66+
connection = environment.connection("amqp://guest:guest@localhost:5672/")
67+
# in case of SSL enablement
68+
# ca_cert_file = ".ci/certs/ca_certificate.pem"
69+
# client_cert = ".ci/certs/client_certificate.pem"
70+
# client_key = ".ci/certs/client_key.pem"
71+
# connection = Connection(
72+
# "amqps://guest:guest@localhost:5671/",
73+
# ssl_context=SslConfigurationContext(
74+
# ca_cert=ca_cert_file,
75+
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
76+
# ),
77+
# )
6678
connection.dial()
6779

6880
return connection
@@ -75,7 +87,8 @@ def main() -> None:
7587
routing_key = "routing-key"
7688

7789
print("connection to amqp server")
78-
connection = create_connection()
90+
environment = Environment()
91+
connection = create_connection(environment)
7992

8093
management = connection.management()
8194

@@ -150,7 +163,7 @@ def main() -> None:
150163
print("closing connections")
151164
management.close()
152165
print("after management closing")
153-
connection.close()
166+
environment.close()
154167
print("after connection closing")
155168

156169

examples/getting_started/example_with_streams.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
OffsetSpecification,
1010
StreamOptions,
1111
StreamSpecification,
12+
Environment,
1213
)
1314

1415
MESSAGES_TO_PUBLISH = 100
@@ -65,8 +66,19 @@ def on_link_closed(self, event: Event) -> None:
6566
print("link closed")
6667

6768

68-
def create_connection() -> Connection:
69-
connection = Connection("amqp://guest:guest@localhost:5672/")
69+
def create_connection(environment: Environment) -> Connection:
70+
connection = environment.connection("amqp://guest:guest@localhost:5672/")
71+
# in case of SSL enablement
72+
# ca_cert_file = ".ci/certs/ca_certificate.pem"
73+
# client_cert = ".ci/certs/client_certificate.pem"
74+
# client_key = ".ci/certs/client_key.pem"
75+
# connection = Connection(
76+
# "amqps://guest:guest@localhost:5671/",
77+
# ssl_context=SslConfigurationContext(
78+
# ca_cert=ca_cert_file,
79+
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
80+
# ),
81+
# )
7082
connection.dial()
7183

7284
return connection
@@ -76,15 +88,16 @@ def main() -> None:
7688
queue_name = "example-queue"
7789

7890
print("connection to amqp server")
79-
connection = create_connection()
91+
environment = Environment()
92+
connection = create_connection(environment)
8093

8194
management = connection.management()
8295

8396
management.declare_queue(StreamSpecification(name=queue_name))
8497

8598
addr_queue = AddressHelper.queue_address(queue_name)
8699

87-
consumer_connection = create_connection()
100+
consumer_connection = create_connection(environment)
88101

89102
stream_filter_options = StreamOptions()
90103
# can be first, last, next or an offset long
@@ -135,7 +148,7 @@ def main() -> None:
135148
print("closing connections")
136149
management.close()
137150
print("after management closing")
138-
connection.close()
151+
environment.close()
139152
print("after connection closing")
140153

141154

examples/getting_started/reconnection_example.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
Message,
1919
Publisher,
2020
QuorumQueueSpecification,
21+
Environment,
2122
)
2223

23-
24+
environment = Environment()
2425
# here we keep track of the objects we need to reconnect
2526
@dataclass
2627
class ConnectionConfiguration:
@@ -118,8 +119,9 @@ def create_connection() -> Connection:
118119
# "amqp://ha_tls-rabbit_node2-1:5602/",
119120
# ]
120121
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)
121-
connection = Connection(
122-
uri="amqp://guest:guest@localhost:5672/",
122+
123+
connection = environment.connection(
124+
url="amqp://guest:guest@localhost:5672/",
123125
on_disconnection_handler=on_disconnection,
124126
)
125127
connection.dial()
@@ -242,7 +244,7 @@ def main() -> None:
242244
print("closing connections")
243245
connection_configuration.management.close()
244246
print("after management closing")
245-
connection_configuration.connection.close()
247+
environment.close()
246248
print("after connection closing")
247249

248250

examples/getting_started/tls_example.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
Message,
1313
QuorumQueueSpecification,
1414
SslConfigurationContext,
15+
Environment,
1516
)
1617

1718
messages_to_publish = 100
@@ -62,12 +63,12 @@ def on_link_closed(self, event: Event) -> None:
6263
print("link closed")
6364

6465

65-
def create_connection() -> Connection:
66+
def create_connection(environment: Environment) -> Connection:
6667
# in case of SSL enablement
6768
ca_cert_file = ".ci/certs/ca_certificate.pem"
6869
client_cert = ".ci/certs/client_certificate.pem"
6970
client_key = ".ci/certs/client_key.pem"
70-
connection = Connection(
71+
connection = environment.connection(
7172
"amqps://guest:guest@localhost:5671/",
7273
ssl_context=SslConfigurationContext(
7374
ca_cert=ca_cert_file,
@@ -84,9 +85,10 @@ def main() -> None:
8485
exchange_name = "test-exchange"
8586
queue_name = "example-queue"
8687
routing_key = "routing-key"
88+
environment = Environment()
8789

8890
print("connection to amqp server")
89-
connection = create_connection()
91+
connection = create_connection(environment)
9092

9193
management = connection.management()
9294

@@ -160,7 +162,7 @@ def main() -> None:
160162
print("closing connections")
161163
management.close()
162164
print("after management closing")
163-
connection.close()
165+
environment.close()
164166
print("after connection closing")
165167

166168

rabbitmq_amqp_python_client/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
OffsetSpecification,
1212
StreamOptions,
1313
)
14+
from .environment import Environment
1415
from .exceptions import ArgumentOutOfRangeException
1516
from .management import Management
1617
from .publisher import Publisher
@@ -66,4 +67,5 @@
6667
"StreamOptions",
6768
"OffsetSpecification",
6869
"Disposition",
70+
"Environment",
6971
]

rabbitmq_amqp_python_client/connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def management(self) -> Management:
7272
return self._management
7373

7474
# closes the connection to the AMQP 1.0 server.
75-
def close(self) -> None:
75+
def _close(self) -> None:
7676
logger.debug("Closing connection")
7777
self._conn.close()
7878

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from typing import Annotated, Callable, Optional, TypeVar
2+
3+
from .connection import Connection
4+
from .ssl_configuration import SslConfigurationContext
5+
6+
MT = TypeVar("MT")
7+
CB = Annotated[Callable[[MT], None], "Message callback type"]
8+
9+
10+
class Environment:
11+
12+
def __init__(self): # type: ignore
13+
14+
self._connections = []
15+
16+
def connection(
17+
self,
18+
# single-node mode
19+
url: Optional[str] = None,
20+
# multi-node mode
21+
urls: Optional[list[str]] = None,
22+
ssl_context: Optional[SslConfigurationContext] = None,
23+
on_disconnection_handler: Optional[CB] = None, # type: ignore
24+
) -> Connection:
25+
connection = Connection(
26+
url=url,
27+
urls=urls,
28+
ssl_context=ssl_context,
29+
on_disconnection_handler=on_disconnection_handler,
30+
)
31+
32+
self._connections.append(connection)
33+
return connection
34+
35+
def close(self) -> None:
36+
for connection in self._connections:
37+
connection._close()

tests/conftest.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,30 +6,42 @@
66
AddressHelper,
77
AMQPMessagingHandler,
88
ClientCert,
9-
Connection,
9+
Environment,
1010
Event,
1111
SslConfigurationContext,
1212
symbol,
1313
)
1414

1515

16+
@pytest.fixture()
17+
def environment(pytestconfig):
18+
environment = Environment()
19+
try:
20+
yield environment
21+
22+
finally:
23+
environment.close()
24+
25+
1626
@pytest.fixture()
1727
def connection(pytestconfig):
18-
connection = Connection("amqp://guest:guest@localhost:5672/")
28+
environment = Environment()
29+
connection = environment.connection("amqp://guest:guest@localhost:5672/")
1930
connection.dial()
2031
try:
2132
yield connection
2233

2334
finally:
24-
connection.close()
35+
environment.close()
2536

2637

2738
@pytest.fixture()
2839
def connection_ssl(pytestconfig):
40+
environment = Environment()
2941
ca_cert_file = ".ci/certs/ca_certificate.pem"
3042
client_cert = ".ci/certs/client_certificate.pem"
3143
client_key = ".ci/certs/client_key.pem"
32-
connection = Connection(
44+
connection = environment.connection(
3345
"amqps://guest:guest@localhost:5671/",
3446
ssl_context=SslConfigurationContext(
3547
ca_cert=ca_cert_file,
@@ -41,25 +53,26 @@ def connection_ssl(pytestconfig):
4153
yield connection
4254

4355
finally:
44-
connection.close()
56+
environment.close()
4557

4658

4759
@pytest.fixture()
4860
def management(pytestconfig):
49-
connection = Connection("amqp://guest:guest@localhost:5672/")
61+
environment = Environment()
62+
connection = environment.connection("amqp://guest:guest@localhost:5672/")
5063
connection.dial()
5164
try:
5265
management = connection.management()
5366
yield management
5467

5568
finally:
56-
management.close()
57-
connection.close()
69+
environment.close()
5870

5971

6072
@pytest.fixture()
6173
def consumer(pytestconfig):
62-
connection = Connection("amqp://guest:guest@localhost:5672/")
74+
environment = Environment()
75+
connection = environment.connection("amqp://guest:guest@localhost:5672/")
6376
connection.dial()
6477
try:
6578
queue_name = "test-queue"
@@ -69,7 +82,7 @@ def consumer(pytestconfig):
6982

7083
finally:
7184
consumer.close()
72-
connection.close()
85+
environment.close()
7386

7487

7588
class ConsumerTestException(BaseException):

0 commit comments

Comments
 (0)