Skip to content

Commit ffb788c

Browse files
author
maxim-lixakov
committed
[BREAKING] - split tests
1 parent 9efe035 commit ffb788c

File tree

1 file changed

+159
-87
lines changed

1 file changed

+159
-87
lines changed

tests/test_unit/test_connections/test_copy_connection.py

Lines changed: 159 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,9 @@
1313
pytestmark = [pytest.mark.asyncio, pytest.mark.backend]
1414

1515

16-
@pytest.mark.parametrize("is_delete_source", [True, False])
17-
async def test_maintainer_plus_can_copy_connection_with_delete_source(
16+
async def test_maintainer_plus_can_copy_connection_without_deleting_source(
1817
client: AsyncClient,
1918
session: AsyncSession,
20-
is_delete_source: bool,
2119
settings: Settings,
2220
group_connection: MockConnection,
2321
empty_group: MockGroup,
@@ -27,32 +25,24 @@ async def test_maintainer_plus_can_copy_connection_with_delete_source(
2725
role = group_connection_and_group_maintainer_plus
2826
user = group_connection.owner_group.get_member_of_role(role)
2927

30-
# Check: new connection does not exist and source connection not deleted
3128
query_current_row = select(Connection).where(Connection.id == group_connection.id)
32-
33-
result_current_row = await session.scalars(query_current_row)
34-
current = result_current_row.one()
29+
current = (await session.scalars(query_current_row)).one()
3530

3631
query_current_creds = select(AuthData).where(AuthData.connection_id == group_connection.id)
37-
result_current_creds = await session.scalars(query_current_creds)
38-
current_cred = result_current_creds.one()
39-
32+
current_cred = (await session.scalars(query_current_creds)).one()
4033
curr_id = current.id
4134

4235
assert decrypt_auth_data(current_cred.value, settings) == {
4336
"type": "postgres",
4437
"user": "user",
4538
"password": "password",
4639
}
47-
assert current is not None
4840

4941
query_copy_not_exist = select(Connection).filter(
5042
Connection.group_id == empty_group.id,
5143
Connection.name == group_connection.name,
5244
)
53-
result_copy_not_exist = await session.scalars(query_copy_not_exist)
54-
row_copy_not_exist = result_copy_not_exist.one_or_none()
55-
45+
row_copy_not_exist = (await session.scalars(query_copy_not_exist)).one_or_none()
5646
assert not row_copy_not_exist
5747

5848
# Act
@@ -61,96 +51,132 @@ async def test_maintainer_plus_can_copy_connection_with_delete_source(
6151
headers={"Authorization": f"Bearer {user.token}"},
6252
json={
6353
"new_group_id": empty_group.id,
64-
"remove_source": is_delete_source,
54+
"remove_source": False,
6555
},
6656
)
6757

68-
# Pre-Assert
58+
# Assertions
59+
assert result.status_code == 200
6960
assert result.json() == {
7061
"ok": True,
7162
"status_code": 200,
7263
"message": "Connection was copied",
7364
}
74-
assert result.status_code == 200
7565

76-
query_prev_row = select(Connection).where(Connection.id == curr_id)
77-
result_prev_row = await session.scalars(query_prev_row)
78-
origin = result_prev_row.first()
66+
# Check old (original) connection still exists
67+
session.expunge(current)
68+
origin = await session.get(Connection, curr_id)
69+
assert origin is not None
7970

8071
q_creds_origin = select(AuthData).where(AuthData.connection_id == curr_id)
81-
creds_origin = await session.scalars(q_creds_origin)
82-
creds_origin = creds_origin.first()
72+
creds_origin = (await session.scalars(q_creds_origin)).first()
73+
assert decrypt_auth_data(creds_origin.value, settings) == {
74+
"type": "postgres",
75+
"user": "user",
76+
"password": "password",
77+
}
8378

79+
# Check new connection
8480
query_new_row = select(Connection).filter(
8581
Connection.group_id == empty_group.id,
8682
Connection.name == group_connection.name,
8783
)
88-
result_new_row = await session.scalars(query_new_row)
89-
new = result_new_row.one() # connection was copied
84+
new = (await session.scalars(query_new_row)).one()
85+
assert new is not None
9086

9187
q_creds_new = select(AuthData).where(AuthData.connection_id == new.id)
92-
creds_new = await session.scalars(q_creds_new)
93-
creds_new = creds_new.one_or_none()
88+
creds_new = (await session.scalars(q_creds_new)).one_or_none()
89+
assert creds_new is None
9490

95-
# Assert
9691

97-
if not is_delete_source:
98-
assert origin.id == curr_id
99-
assert decrypt_auth_data(creds_origin.value, settings) == {
100-
"type": "postgres",
101-
"user": "user",
102-
"password": "password",
103-
}
92+
async def test_maintainer_plus_can_copy_connection_and_delete_source(
93+
client: AsyncClient,
94+
session: AsyncSession,
95+
settings: Settings,
96+
group_connection: MockConnection,
97+
empty_group: MockGroup,
98+
group_connection_and_group_maintainer_plus: str,
99+
):
100+
# Arrange
101+
role = group_connection_and_group_maintainer_plus
102+
user = group_connection.owner_group.get_member_of_role(role)
103+
104+
query_current_row = select(Connection).where(Connection.id == group_connection.id)
105+
current = (await session.scalars(query_current_row)).one()
106+
107+
query_current_creds = select(AuthData).where(AuthData.connection_id == group_connection.id)
108+
current_cred = (await session.scalars(query_current_creds)).one()
109+
curr_id = current.id
110+
111+
assert decrypt_auth_data(current_cred.value, settings) == {
112+
"type": "postgres",
113+
"user": "user",
114+
"password": "password",
115+
}
104116

117+
# Act
118+
result = await client.post(
119+
f"v1/connections/{group_connection.id}/copy_connection",
120+
headers={"Authorization": f"Bearer {user.token}"},
121+
json={
122+
"new_group_id": empty_group.id,
123+
"remove_source": True, # delete source
124+
},
125+
)
126+
127+
assert result.status_code == 200
128+
assert result.json() == {
129+
"ok": True,
130+
"status_code": 200,
131+
"message": "Connection was copied",
132+
}
133+
134+
# Check source deleted
105135
session.expunge(current)
106-
origin = await session.get(Connection, current.id)
107-
if is_delete_source:
108-
# Assert that origin connection was deleted
109-
assert origin is None
110-
else:
111-
# Assert that origin connection was not deleted
112-
assert origin is not None
136+
origin = await session.get(Connection, curr_id)
137+
assert origin is None
113138

139+
# Check new connection created
140+
query_new_row = select(Connection).filter(
141+
Connection.group_id == empty_group.id,
142+
Connection.name == group_connection.name,
143+
)
144+
new = (await session.scalars(query_new_row)).one()
114145
assert new is not None
115-
assert not creds_new
146+
147+
q_creds_new = select(AuthData).where(AuthData.connection_id == new.id)
148+
creds_new = (await session.scalars(q_creds_new)).one_or_none()
149+
assert creds_new is None
116150

117151

118-
@pytest.mark.parametrize("is_delete_source", [True, False])
119-
async def test_superuser_can_copy_connection(
152+
async def test_superuser_can_copy_connection_without_deleting_source(
120153
client: AsyncClient,
121154
group_connection: MockConnection,
122155
superuser: MockUser,
123156
empty_group: MockGroup,
124157
session: AsyncSession,
125-
is_delete_source: str,
126158
settings: Settings,
127159
):
128160
# Arrange
129161
query_current_row = select(Connection).where(Connection.id == group_connection.id)
130-
result_current_row = await session.scalars(query_current_row)
131-
current = result_current_row.one()
132-
133-
query_current_creds = select(AuthData).where(AuthData.connection_id == group_connection.id)
134-
result_current_creds = await session.scalars(query_current_creds)
135-
current_cred = result_current_creds.one()
136-
162+
current = (await session.scalars(query_current_row)).one()
137163
curr_id = current.id
138164
curr_group_id = current.group_id
139165

166+
query_current_creds = select(AuthData).where(AuthData.connection_id == group_connection.id)
167+
current_cred = (await session.scalars(query_current_creds)).one()
168+
140169
assert decrypt_auth_data(current_cred.value, settings) == {
141170
"type": "postgres",
142171
"user": "user",
143172
"password": "password",
144173
}
145-
assert current is not None
146174

147175
query_copy_not_exist = select(Connection).filter(
148176
Connection.group_id == empty_group.id,
149177
Connection.name == group_connection.name,
150178
)
151-
result_copy_not_exist = await session.scalars(query_copy_not_exist)
152-
row_copy_not_exist = result_copy_not_exist.one_or_none()
153-
179+
row_copy_not_exist = (await session.scalars(query_copy_not_exist)).one_or_none()
154180
assert not row_copy_not_exist
155181

156182
# Act
@@ -159,48 +185,99 @@ async def test_superuser_can_copy_connection(
159185
headers={"Authorization": f"Bearer {superuser.token}"},
160186
json={
161187
"new_group_id": empty_group.id,
162-
"remove_source": is_delete_source,
188+
"remove_source": False,
163189
},
164190
)
165191

166-
# Pre-assert
192+
assert result.status_code == 200
167193
assert result.json() == {
168194
"ok": True,
169195
"status_code": 200,
170196
"message": "Connection was copied",
171197
}
172-
assert result.status_code == 200
173198

199+
# Assert original still there
200+
origin = await session.get(Connection, curr_id)
201+
assert origin is not None
202+
assert origin.id == curr_id
203+
assert origin.group_id == curr_group_id
204+
205+
q_creds_origin = select(AuthData).where(AuthData.connection_id == current.id)
206+
creds_origin = (await session.scalars(q_creds_origin)).first()
207+
assert decrypt_auth_data(creds_origin.value, settings) == {
208+
"type": "postgres",
209+
"user": "user",
210+
"password": "password",
211+
}
212+
213+
# Assert new copy
174214
query_new_row = select(Connection).filter(
175215
Connection.group_id == empty_group.id,
176216
Connection.name == group_connection.name,
177217
)
178-
result_new_row = await session.scalars(query_new_row)
179-
new = result_new_row.one() # copied
180-
181-
q_creds_origin = select(AuthData).where(AuthData.connection_id == current.id)
182-
creds_origin = await session.scalars(q_creds_origin)
183-
creds_origin = creds_origin.first()
218+
new = (await session.scalars(query_new_row)).one()
219+
assert new is not None
184220

185221
q_creds_new = select(AuthData).where(AuthData.connection_id == new.id)
186-
creds_new = await session.scalars(q_creds_new)
187-
creds_new = creds_new.one_or_none()
222+
creds_new = (await session.scalars(q_creds_new)).one_or_none()
223+
assert creds_new is None
188224

189-
# Assert
225+
226+
async def test_superuser_can_copy_connection_and_delete_source(
227+
client: AsyncClient,
228+
group_connection: MockConnection,
229+
superuser: MockUser,
230+
empty_group: MockGroup,
231+
session: AsyncSession,
232+
settings: Settings,
233+
):
234+
# Arrange
235+
query_current_row = select(Connection).where(Connection.id == group_connection.id)
236+
current = (await session.scalars(query_current_row)).one()
237+
curr_id = current.id
238+
239+
query_current_creds = select(AuthData).where(AuthData.connection_id == group_connection.id)
240+
current_cred = (await session.scalars(query_current_creds)).one()
241+
242+
assert decrypt_auth_data(current_cred.value, settings) == {
243+
"type": "postgres",
244+
"user": "user",
245+
"password": "password",
246+
}
247+
248+
# Act
249+
result = await client.post(
250+
f"v1/connections/{group_connection.id}/copy_connection",
251+
headers={"Authorization": f"Bearer {superuser.token}"},
252+
json={
253+
"new_group_id": empty_group.id,
254+
"remove_source": True,
255+
},
256+
)
257+
258+
assert result.status_code == 200
259+
assert result.json() == {
260+
"ok": True,
261+
"status_code": 200,
262+
"message": "Connection was copied",
263+
}
264+
265+
# Source should be deleted
190266
session.expunge(current)
191-
origin = await session.get(Connection, current.id)
192-
if is_delete_source:
193-
# Assert that origin connection was deleted
194-
assert origin is None
195-
else:
196-
assert origin.id == curr_id
197-
assert origin.group_id == curr_group_id
198-
assert decrypt_auth_data(creds_origin.value, settings) == {
199-
"type": "postgres",
200-
"user": "user",
201-
"password": "password",
202-
}
203-
assert not creds_new
267+
origin = await session.get(Connection, curr_id)
268+
assert origin is None
269+
270+
# New connection should exist
271+
query_new_row = select(Connection).filter(
272+
Connection.group_id == empty_group.id,
273+
Connection.name == group_connection.name,
274+
)
275+
new = (await session.scalars(query_new_row)).one()
276+
assert new is not None
277+
278+
q_creds_new = select(AuthData).where(AuthData.connection_id == new.id)
279+
creds_new = (await session.scalars(q_creds_new)).one_or_none()
280+
assert creds_new is None
204281

205282

206283
async def test_unauthorized_user_cannot_copy_connection(
@@ -289,7 +366,7 @@ async def test_not_in_both_groups_user_can_not_copy_connection(
289366
query_current_row = select(Connection).where(Connection.id == group_connection.id)
290367

291368
result_current_row = await session.scalars(query_current_row)
292-
current = result_current_row.one()
369+
result_current_row.one()
293370

294371
query_current_creds = select(AuthData).where(AuthData.connection_id == group_connection.id)
295372
result_current_creds = await session.scalars(query_current_creds)
@@ -300,7 +377,6 @@ async def test_not_in_both_groups_user_can_not_copy_connection(
300377
"user": "user",
301378
"password": "password",
302379
}
303-
assert current is not None
304380

305381
query_copy_not_exist = select(Connection).filter(
306382
Connection.group_id == empty_group.id,
@@ -346,9 +422,6 @@ async def test_groupless_user_can_not_copy_connection(
346422

347423
query_current_row = select(Connection).where(Connection.id == group_connection.id)
348424

349-
result_current_row = await session.scalars(query_current_row)
350-
current = result_current_row.one()
351-
352425
query_current_creds = select(AuthData).where(AuthData.connection_id == group_connection.id)
353426
result_current_creds = await session.scalars(query_current_creds)
354427
current_cred = result_current_creds.one()
@@ -358,7 +431,6 @@ async def test_groupless_user_can_not_copy_connection(
358431
"user": "user",
359432
"password": "password",
360433
}
361-
assert current is not None
362434

363435
query_copy_not_exist = select(Connection).filter(
364436
Connection.group_id == empty_group.id,

0 commit comments

Comments
 (0)