Skip to content

Commit 2835843

Browse files
committed
add tests for async connection and format
1 parent 0b8fb2e commit 2835843

File tree

3 files changed

+334
-40
lines changed

3 files changed

+334
-40
lines changed

tests/asyncio/fixtures.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
from rabbitmq_amqp_python_client import (
77
AsyncConnection,
88
AsyncEnvironment,
9+
AsyncManagement,
910
OAuth2Options,
1011
PosixSslConfigurationContext,
1112
RecoveryConfiguration,
1213
WinSslConfigurationContext,
13-
AsyncManagement,
1414
)
1515

1616
from ..utils import token
@@ -24,7 +24,7 @@ async def async_environment():
2424

2525

2626
@pytest_asyncio.fixture
27-
async def environment_auth() -> AsyncGenerator[AsyncEnvironment, None]:
27+
async def async_environment_auth() -> AsyncGenerator[AsyncEnvironment, None]:
2828
token_string = token(datetime.now() + timedelta(milliseconds=2500))
2929
environment = AsyncEnvironment(
3030
uri="amqp://localhost:5672",
@@ -78,4 +78,4 @@ async def async_management() -> AsyncGenerator[AsyncManagement, None]:
7878
await connection.dial()
7979
management = await connection.management()
8080
yield management
81-
await management.close()
81+
await management.close()

tests/asyncio/test_connection.py

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
import time
2+
from datetime import datetime, timedelta
3+
from pathlib import Path
4+
5+
import pytest
6+
7+
from rabbitmq_amqp_python_client import (
8+
AsyncEnvironment,
9+
ConnectionClosed,
10+
PKCS12Store,
11+
PosixSslConfigurationContext,
12+
QuorumQueueSpecification,
13+
RecoveryConfiguration,
14+
StreamSpecification,
15+
ValidationCodeException,
16+
WinSslConfigurationContext,
17+
)
18+
from rabbitmq_amqp_python_client.qpid.proton import (
19+
ConnectionException,
20+
)
21+
22+
from ..http_requests import (
23+
create_vhost,
24+
delete_all_connections,
25+
delete_vhost,
26+
)
27+
from ..utils import token
28+
from .fixtures import * # noqa: F401, F403
29+
30+
31+
def on_disconnected():
32+
global disconnected
33+
disconnected = True
34+
35+
36+
@pytest.mark.asyncio
37+
async def test_async_connection() -> None:
38+
environment = AsyncEnvironment(uri="amqp://guest:guest@localhost:5672/")
39+
connection = await environment.connection()
40+
await connection.dial()
41+
await environment.close()
42+
43+
44+
@pytest.mark.asyncio
45+
async def test_async_environment_context_manager() -> None:
46+
async with AsyncEnvironment(
47+
uri="amqp://guest:guest@localhost:5672/"
48+
) as environment:
49+
connection = await environment.connection()
50+
await connection.dial()
51+
52+
53+
@pytest.mark.asyncio
54+
async def test_async_connection_ssl(ssl_context) -> None:
55+
environment = AsyncEnvironment(
56+
"amqps://guest:guest@localhost:5671/",
57+
ssl_context=ssl_context,
58+
)
59+
if isinstance(ssl_context, PosixSslConfigurationContext):
60+
path = Path(ssl_context.ca_cert)
61+
assert path.is_file() is True
62+
assert path.exists() is True
63+
64+
path = Path(ssl_context.client_cert.client_cert) # type: ignore
65+
assert path.is_file() is True
66+
assert path.exists() is True
67+
elif isinstance(ssl_context, WinSslConfigurationContext):
68+
assert isinstance(ssl_context.ca_store, PKCS12Store)
69+
path = Path(ssl_context.ca_store.path)
70+
assert path.is_file() is True
71+
assert path.exists() is True
72+
73+
assert isinstance(ssl_context.client_cert.store, PKCS12Store) # type: ignore
74+
path = Path(ssl_context.client_cert.store.path) # type: ignore
75+
assert path.is_file() is True
76+
assert path.exists() is True
77+
else:
78+
pytest.fail("Unsupported ssl context")
79+
80+
connection = await environment.connection()
81+
await connection.dial()
82+
83+
await environment.close()
84+
85+
86+
@pytest.mark.asyncio
87+
async def test_async_connection_oauth(async_environment_auth: AsyncEnvironment) -> None:
88+
connection = await async_environment_auth.connection()
89+
await connection.dial()
90+
management = await connection.management()
91+
await management.declare_queue(QuorumQueueSpecification(name="test-queue"))
92+
await management.close()
93+
await connection.close()
94+
95+
96+
@pytest.mark.asyncio
97+
async def test_async_connection_oauth_with_timeout(
98+
async_environment_auth: AsyncEnvironment,
99+
) -> None:
100+
connection = await async_environment_auth.connection()
101+
await connection.dial()
102+
103+
# let the token expire
104+
time.sleep(3)
105+
# token expired
106+
107+
with pytest.raises(Exception):
108+
management = await connection.management()
109+
await management.declare_queue(QuorumQueueSpecification(name="test-queue"))
110+
await management.close()
111+
112+
await connection.close()
113+
114+
115+
@pytest.mark.asyncio
116+
async def test_async_connection_oauth_refresh_token(
117+
async_environment_auth: AsyncEnvironment,
118+
) -> None:
119+
connection = await async_environment_auth.connection()
120+
await connection.dial()
121+
122+
# let the token expire
123+
time.sleep(1)
124+
# # token expired, refresh
125+
126+
await connection.refresh_token(token(datetime.now() + timedelta(milliseconds=5000)))
127+
time.sleep(3)
128+
129+
with pytest.raises(Exception):
130+
management = await connection.management()
131+
await management.declare_queue(QuorumQueueSpecification(name="test-queue"))
132+
await management.close()
133+
134+
await connection.close()
135+
136+
137+
@pytest.mark.asyncio
138+
async def test_async_connection_oauth_refresh_token_with_disconnection(
139+
async_environment_auth: AsyncEnvironment,
140+
) -> None:
141+
connection = await async_environment_auth.connection()
142+
await connection.dial()
143+
144+
# let the token expire
145+
time.sleep(1)
146+
# # token expired, refresh
147+
148+
await connection.refresh_token(token(datetime.now() + timedelta(milliseconds=5000)))
149+
delete_all_connections()
150+
time.sleep(3)
151+
152+
with pytest.raises(Exception):
153+
management = await connection.management()
154+
await management.declare_queue(QuorumQueueSpecification(name="test-queue"))
155+
await management.close()
156+
157+
await connection.close()
158+
159+
160+
@pytest.mark.asyncio
161+
async def test_async_environment_connections_management() -> None:
162+
enviroment = AsyncEnvironment(uri="amqp://guest:guest@localhost:5672/")
163+
164+
connection1 = await enviroment.connection()
165+
await connection1.dial()
166+
connection2 = await enviroment.connection()
167+
await connection2.dial()
168+
connection3 = await enviroment.connection()
169+
await connection3.dial()
170+
171+
assert enviroment.active_connections == 3
172+
173+
# this shouldn't happen but we test it anyway
174+
await connection1.close()
175+
assert enviroment.active_connections == 2
176+
177+
await connection2.close()
178+
assert enviroment.active_connections == 1
179+
180+
await connection3.close()
181+
assert enviroment.active_connections == 0
182+
183+
await enviroment.close()
184+
185+
186+
@pytest.mark.asyncio
187+
async def test_async_connection_reconnection() -> None:
188+
disconnected = False
189+
enviroment = AsyncEnvironment(
190+
uri="amqp://guest:guest@localhost:5672/",
191+
recovery_configuration=RecoveryConfiguration(active_recovery=True),
192+
)
193+
194+
connection = await enviroment.connection()
195+
await connection.dial()
196+
197+
# delay
198+
time.sleep(5)
199+
# simulate a disconnection
200+
# raise a reconnection
201+
management = await connection.management()
202+
203+
delete_all_connections()
204+
205+
stream_name = "test_stream_info_with_validation"
206+
queue_specification = StreamSpecification(
207+
name=stream_name,
208+
)
209+
210+
try:
211+
await management.declare_queue(queue_specification)
212+
except ConnectionClosed:
213+
disconnected = True
214+
215+
# check that we reconnected
216+
await management.declare_queue(queue_specification)
217+
await management.delete_queue(stream_name)
218+
await management.close()
219+
await enviroment.close()
220+
221+
assert disconnected is True
222+
223+
224+
@pytest.mark.asyncio
225+
async def test_async_reconnection_parameters() -> None:
226+
enviroment = AsyncEnvironment(
227+
uri="amqp://guest:guest@localhost:5672/",
228+
recovery_configuration=RecoveryConfiguration(
229+
active_recovery=True,
230+
back_off_reconnect_interval=timedelta(milliseconds=100),
231+
),
232+
)
233+
234+
with pytest.raises(ValidationCodeException):
235+
await enviroment.connection()
236+
237+
238+
@pytest.mark.asyncio
239+
async def test_async_connection_vhost() -> None:
240+
vhost = "tmpVhost" + str(time.time())
241+
create_vhost(vhost)
242+
uri = "amqp://guest:guest@localhost:5672/{}".format(vhost)
243+
environment = AsyncEnvironment(uri=uri)
244+
connection = await environment.connection()
245+
await connection.dial()
246+
is_correct_vhost = connection._connection._conn.conn.hostname == "vhost:{}".format(vhost) # type: ignore
247+
await environment.close()
248+
delete_vhost(vhost)
249+
250+
assert is_correct_vhost is True
251+
252+
253+
@pytest.mark.asyncio
254+
async def test_async_connection_vhost_not_exists() -> None:
255+
vhost = "tmpVhost" + str(time.time())
256+
uri = "amqp://guest:guest@localhost:5672/{}".format(vhost)
257+
258+
environment = AsyncEnvironment(uri=uri)
259+
260+
with pytest.raises(ConnectionException):
261+
connection = await environment.connection()
262+
await connection.dial()

0 commit comments

Comments
 (0)