Skip to content

Commit 3d2c04a

Browse files
add integration test for documentdb
1 parent 56a4aff commit 3d2c04a

File tree

1 file changed

+205
-0
lines changed

1 file changed

+205
-0
lines changed

tests/integration/document_db.py

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
import logging
2+
from typing import Callable, List, Optional, Type
3+
4+
import pytest
5+
6+
from pyatlan.client.atlan import AtlanClient
7+
from pyatlan.model.assets import Asset, Connection, DocumentDBCollection, DocumentDBDatabase
8+
from pyatlan.model.enums import AtlanConnectorType
9+
from pyatlan.model.response import A, AssetMutationResponse
10+
from tests.integration.client import TestId
11+
12+
LOGGER = logging.getLogger(__name__)
13+
14+
15+
@pytest.fixture(scope="module")
16+
def upsert(client: AtlanClient):
17+
guids: List[str] = []
18+
19+
def _upsert(asset: Asset) -> AssetMutationResponse:
20+
_response = client.asset.save(asset)
21+
if (
22+
_response
23+
and _response.mutated_entities
24+
and _response.mutated_entities.CREATE
25+
):
26+
guids.append(_response.mutated_entities.CREATE[0].guid)
27+
return _response
28+
29+
yield _upsert
30+
31+
for guid in reversed(guids):
32+
response = client.asset.purge_by_guid(guid)
33+
if (
34+
not response
35+
or not response.mutated_entities
36+
or not response.mutated_entities.DELETE
37+
):
38+
LOGGER.error(f"Failed to remove asset with GUID {guid}.")
39+
40+
41+
def verify_asset_created(response, asset_type: Type[A]):
42+
assert response.mutated_entities
43+
44+
def verify_asset_updated(response, asset_type: Type[A]):
45+
assert response.mutated_entities
46+
assert response.mutated_entities.CREATE is None
47+
assert response.mutated_entities.UPDATE
48+
assert len(response.mutated_entities.UPDATE) == 1
49+
assets = response.assets_updated(asset_type=asset_type)
50+
assert len(assets) == 1
51+
52+
class TestConnection:
53+
connection: Optional[Connection] = None
54+
55+
def test_create(
56+
self,
57+
client: AtlanClient,
58+
upsert: Callable[[Asset], AssetMutationResponse],
59+
):
60+
role = client.role_cache.get_id_for_name("$admin")
61+
assert role
62+
connection_name = TestId.make_unique("DOC_Conn")
63+
c = Connection.create(
64+
name=connection_name,
65+
connector_type=AtlanConnectorType.DOCUMENTDB,
66+
admin_roles=[role],
67+
)
68+
assert c.guid
69+
response = upsert(c)
70+
assert response.mutated_entities
71+
assert response.mutated_entities.CREATE
72+
assert len(response.mutated_entities.CREATE) == 1
73+
assert isinstance(response.mutated_entities.CREATE[0], Connection)
74+
assert response.guid_assignments
75+
assert c.guid in response.guid_assignments
76+
c = response.mutated_entities.CREATE[0]
77+
c = client.asset.get_by_guid(c.guid, Connection, ignore_relationships=False)
78+
assert isinstance(c, Connection)
79+
TestConnection.connection = c
80+
81+
@pytest.mark.order(after="test_create")
82+
def test_trim_to_required(
83+
self, client: AtlanClient, upsert: Callable[[Asset], AssetMutationResponse]
84+
):
85+
assert TestConnection.connection
86+
connection = TestConnection.connection.trim_to_required()
87+
response = upsert(connection)
88+
assert response.mutated_entities is None
89+
90+
91+
@pytest.mark.order(after="TestConnection")
92+
class TestDatabase:
93+
database: Optional[DocumentDBDatabase] = None
94+
95+
def test_create(
96+
self,
97+
client: AtlanClient,
98+
upsert: Callable[[Asset], AssetMutationResponse],
99+
):
100+
assert TestConnection.connection
101+
connection = TestConnection.connection
102+
assert connection
103+
assert connection.qualified_name
104+
database_name = TestId.make_unique("DocDB")
105+
database = DocumentDBDatabase.creator(
106+
name=database_name,
107+
connection_qualified_name=connection.qualified_name,
108+
)
109+
assert database.guid
110+
response = upsert(database)
111+
assert response.mutated_entities
112+
assert response.mutated_entities.CREATE
113+
assert connection.qualified_name == database.connection_qualified_name
114+
assert len(response.mutated_entities.CREATE) == 1
115+
assert isinstance(response.mutated_entities.CREATE[0], DocumentDBDatabase)
116+
assert response.guid_assignments
117+
assert database.guid in response.guid_assignments
118+
database = response.mutated_entities.CREATE[0]
119+
client.asset.get_by_guid(database.guid, DocumentDBDatabase, ignore_relationships=False)
120+
TestDatabase.database = database
121+
122+
@pytest.mark.order(after="test_create")
123+
def test_create_for_modification(
124+
self, client, upsert: Callable[[Asset], AssetMutationResponse]
125+
):
126+
assert TestDatabase.database
127+
assert TestDatabase.database.qualified_name
128+
assert TestDatabase.database.name
129+
database = DocumentDBDatabase.create_for_modification(
130+
qualified_name=TestDatabase.database.qualified_name,
131+
name=TestDatabase.database.name,
132+
)
133+
description = f"{TestDatabase.database.description} more stuff"
134+
database.description = description
135+
response = upsert(database)
136+
verify_asset_updated(response, DocumentDBDatabase)
137+
138+
@pytest.mark.order(after="test_create")
139+
def test_trim_to_required(
140+
self, client, upsert: Callable[[Asset], AssetMutationResponse]
141+
):
142+
assert TestDatabase.database
143+
database = TestDatabase.database.trim_to_required()
144+
response = upsert(database)
145+
assert response.mutated_entities is None
146+
147+
@pytest.mark.order(after="TestDatabase")
148+
class TestCollection:
149+
collection: Optional[DocumentDBCollection] = None
150+
151+
def test_create(
152+
self,
153+
client: AtlanClient,
154+
upsert: Callable[[Asset], AssetMutationResponse],
155+
):
156+
assert TestConnection.connection
157+
connection = TestConnection.connection
158+
assert connection
159+
assert connection.qualified_name
160+
assert TestDatabase.database
161+
database = TestDatabase.database
162+
assert database
163+
assert database.qualified_name
164+
collection_name = TestId.make_unique("DocDBColl")
165+
collection = DocumentDBCollection.creator(
166+
name=collection_name,
167+
database_qualified_name=database.qualified_name,
168+
connection_qualified_name=connection.qualified_name,
169+
)
170+
assert collection.guid
171+
response = upsert(collection)
172+
assert response.mutated_entities
173+
assert response.mutated_entities.CREATE
174+
assert len(response.mutated_entities.CREATE) == 1
175+
assert isinstance(response.mutated_entities.CREATE[0], DocumentDBCollection)
176+
assert response.guid_assignments
177+
assert collection.guid in response.guid_assignments
178+
collection = response.mutated_entities.CREATE[0]
179+
client.asset.get_by_guid(collection.guid, DocumentDBCollection, ignore_relationships=False)
180+
TestCollection.collection = collection
181+
182+
@pytest.mark.order(after="test_create")
183+
def test_create_for_modification(
184+
self, client, upsert: Callable[[Asset], AssetMutationResponse]
185+
):
186+
assert TestCollection.collection
187+
assert TestCollection.collection.qualified_name
188+
assert TestCollection.collection.name
189+
collection = DocumentDBCollection.create_for_modification(
190+
qualified_name=TestCollection.collection.qualified_name,
191+
name=TestCollection.collection.name,
192+
)
193+
description = f"{TestCollection.collection.description} more stuff"
194+
collection.description = description
195+
response = upsert(collection)
196+
verify_asset_updated(response, DocumentDBCollection)
197+
198+
@pytest.mark.order(after="test_create")
199+
def test_trim_to_required(
200+
self, client, upsert: Callable[[Asset], AssetMutationResponse]
201+
):
202+
assert TestCollection.collection
203+
collection = TestCollection.collection.trim_to_required()
204+
response = upsert(collection)
205+
assert response.mutated_entities is None

0 commit comments

Comments
 (0)