Skip to content

Commit ce98b05

Browse files
committed
[refactor] Migrated caches to use common operations
- Fixed local imports due to recent refactoring.
1 parent 22655d6 commit ce98b05

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2180
-365
lines changed

pyatlan/cache/aio/__init__.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# Copyright 2025 Atlan Pte. Ltd.
3+
"""
4+
Async cache modules for Atlan.
5+
6+
This module provides async versions of all cache functionality
7+
with the same API as the sync versions, just requiring await.
8+
9+
Pattern: All async cache methods reuse shared business logic from pyatlan.cache.common
10+
to ensure identical behavior with sync cache implementations.
11+
"""
12+
13+
from .atlan_tag_cache import AsyncAtlanTagCache
14+
from .connection_cache import AsyncConnectionCache
15+
from .custom_metadata_cache import AsyncCustomMetadataCache
16+
from .enum_cache import AsyncEnumCache
17+
from .group_cache import AsyncGroupCache
18+
from .role_cache import AsyncRoleCache
19+
from .source_tag_cache import AsyncSourceTagCache
20+
from .user_cache import AsyncUserCache
21+
22+
__all__ = [
23+
"AsyncAtlanTagCache",
24+
"AsyncConnectionCache",
25+
"AsyncCustomMetadataCache",
26+
"AsyncEnumCache",
27+
"AsyncGroupCache",
28+
"AsyncRoleCache",
29+
"AsyncSourceTagCache",
30+
"AsyncUserCache",
31+
]
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# Copyright 2025 Atlan Pte. Ltd.
3+
from __future__ import annotations
4+
5+
import asyncio
6+
from abc import ABC, abstractmethod
7+
from typing import TYPE_CHECKING, Any, Dict
8+
9+
from pyatlan.errors import ErrorCode
10+
from pyatlan.model.assets import Asset
11+
12+
if TYPE_CHECKING:
13+
from pyatlan.client.aio import AsyncAtlanClient
14+
15+
16+
class AsyncAbstractAssetCache(ABC):
17+
"""
18+
Async base class for reusable components that are common
19+
to all caches, where a cache is populated entry-by-entry.
20+
"""
21+
22+
def __init__(self, client: AsyncAtlanClient):
23+
self.client = client
24+
self.lock = asyncio.Lock()
25+
self.name_to_guid: Dict[str, str] = dict()
26+
self.guid_to_asset: Dict[str, Asset] = dict()
27+
self.qualified_name_to_guid: Dict[str, str] = dict()
28+
29+
@abstractmethod
30+
async def lookup_by_guid(self, guid: str):
31+
"""Abstract method to lookup asset by guid."""
32+
33+
@abstractmethod
34+
async def lookup_by_qualified_name(self, qualified_name: str):
35+
"""Abstract method to lookup asset by qualified name."""
36+
37+
@abstractmethod
38+
async def lookup_by_name(self, name: Any):
39+
"""Abstract method to lookup asset by name."""
40+
41+
@abstractmethod
42+
def get_name(self, asset: Asset):
43+
"""Abstract method to get name from asset."""
44+
45+
def is_guid_known(self, guid: str) -> bool:
46+
"""
47+
Checks whether the provided Atlan-internal UUID is known.
48+
NOTE: will not refresh the cache itself to determine this.
49+
50+
:param guid: Atlan-internal UUID of the object
51+
:returns: `True` if the object is known, `False` otherwise
52+
"""
53+
return guid in self.guid_to_asset
54+
55+
def is_qualified_name_known(self, qualified_name: str) -> bool:
56+
"""
57+
Checks whether the provided Atlan-internal ID string is known.
58+
NOTE: will not refresh the cache itself to determine this.
59+
60+
:param qualified_name: Atlan-internal ID string of the object
61+
:returns: `True` if the object is known, `False` otherwise
62+
"""
63+
return qualified_name in self.qualified_name_to_guid
64+
65+
def is_name_known(self, name: str) -> bool:
66+
"""
67+
Checks whether the provided Atlan-internal ID string is known.
68+
NOTE: will not refresh the cache itself to determine this.
69+
70+
:param name: human-constructable name of the object
71+
:returns: `True` if the object is known, `False` otherwise
72+
"""
73+
return name in self.name_to_guid
74+
75+
def cache(self, asset: Asset):
76+
"""
77+
Add an entry to the cache.
78+
79+
:param asset: to be cached
80+
"""
81+
name = asset and self.get_name(asset)
82+
if not all([name, asset.guid, asset.qualified_name]):
83+
return
84+
self.name_to_guid[name] = asset.guid # type: ignore[index]
85+
self.guid_to_asset[asset.guid] = asset # type: ignore[index]
86+
self.qualified_name_to_guid[asset.qualified_name] = asset.guid # type: ignore[index]
87+
88+
async def _get_by_guid(self, guid: str, allow_refresh: bool = True):
89+
"""
90+
Retrieve an asset from the cache by its UUID.
91+
92+
:param guid: UUID of the asset in Atlan
93+
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
94+
:returns: the asset (if found)
95+
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
96+
:raises NotFoundError: if the object cannot be found (does not exist) in Atlan
97+
:raises InvalidRequestError: if no UUID was provided for the object to retrieve
98+
"""
99+
if not guid:
100+
raise ErrorCode.MISSING_GUID.exception_with_parameters()
101+
asset = self.guid_to_asset.get(guid)
102+
if not asset and allow_refresh:
103+
await self.lookup_by_guid(guid)
104+
asset = self.guid_to_asset.get(guid)
105+
if not asset:
106+
raise ErrorCode.ASSET_NOT_FOUND_BY_GUID.exception_with_parameters(guid)
107+
return asset
108+
109+
async def _get_by_qualified_name(
110+
self, qualified_name: str, allow_refresh: bool = True
111+
):
112+
"""
113+
Retrieve an asset from the cache by its qualifiedName.
114+
115+
:param qualified_name: qualifiedName of the asset in Atlan
116+
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
117+
:returns: the asset (if found)
118+
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
119+
:raises NotFoundError: if the object cannot be found (does not exist) in Atlan
120+
:raises InvalidRequestError: if no qualified name was provided for the object to retrieve
121+
"""
122+
if not qualified_name:
123+
raise ErrorCode.MISSING_QUALIFIED_NAME.exception_with_parameters()
124+
guid = self.qualified_name_to_guid.get(qualified_name)
125+
if not guid and allow_refresh:
126+
await self.lookup_by_qualified_name(qualified_name)
127+
guid = self.qualified_name_to_guid.get(qualified_name)
128+
if not guid:
129+
raise ErrorCode.ASSET_NOT_FOUND_BY_QN.exception_with_parameters(
130+
qualified_name
131+
)
132+
133+
return await self._get_by_guid(guid=guid, allow_refresh=False)
134+
135+
async def _get_by_name(self, name, allow_refresh: bool = True):
136+
"""
137+
Retrieve an asset from the cache by its uniquely identifiable name.
138+
139+
:param name: uniquely identifiable name of the asset in Atlan
140+
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
141+
:returns: the asset (if found)
142+
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
143+
:raises NotFoundError: if the object cannot be found (does not exist) in Atlan
144+
:raises InvalidRequestError: if no name was provided for the object to retrieve
145+
"""
146+
from pyatlan.cache.abstract_asset_cache import AbstractAssetName
147+
148+
if not isinstance(name, AbstractAssetName):
149+
raise ErrorCode.MISSING_NAME.exception_with_parameters()
150+
guid = self.name_to_guid.get(str(name))
151+
if not guid and allow_refresh:
152+
await self.lookup_by_name(name)
153+
guid = self.name_to_guid.get(str(name))
154+
if not guid:
155+
raise ErrorCode.ASSET_NOT_FOUND_BY_NAME.exception_with_parameters(
156+
name._TYPE_NAME, name
157+
)
158+
159+
return await self._get_by_guid(guid=guid, allow_refresh=False)

pyatlan/cache/aio/atlan_tag_cache.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# Copyright 2025 Atlan Pte. Ltd.
3+
from __future__ import annotations
4+
5+
import asyncio
6+
from typing import TYPE_CHECKING, Dict, Optional, Set
7+
8+
from pyatlan.cache.common import AtlanTagCacheCommon
9+
from pyatlan.errors import ErrorCode
10+
from pyatlan.model.enums import AtlanTypeCategory
11+
from pyatlan.model.typedef import AtlanTagDef
12+
13+
if TYPE_CHECKING:
14+
from pyatlan.client.aio import AsyncAtlanClient
15+
16+
17+
class AsyncAtlanTagCache:
18+
"""
19+
Async lazily-loaded cache for translating between Atlan-internal ID strings and human-readable names
20+
for Atlan tags.
21+
"""
22+
23+
def __init__(self, client: AsyncAtlanClient):
24+
self.client: AsyncAtlanClient = client
25+
self.cache_by_id: Dict[str, AtlanTagDef] = {}
26+
self.map_id_to_name: Dict[str, str] = {}
27+
self.map_name_to_id: Dict[str, str] = {}
28+
self.deleted_ids: Set[str] = set()
29+
self.deleted_names: Set[str] = set()
30+
self.map_id_to_source_tags_attr_id: Dict[str, str] = {}
31+
self.lock: asyncio.Lock = asyncio.Lock()
32+
33+
async def refresh_cache(self) -> None:
34+
"""
35+
Refreshes the cache of Atlan tags by requesting the full set of Atlan tags from Atlan.
36+
"""
37+
await self._refresh_cache()
38+
39+
async def get_id_for_name(self, name: str) -> Optional[str]:
40+
"""
41+
Translate the provided human-readable Atlan tag name to its Atlan-internal ID string.
42+
43+
:param name: human-readable name of the Atlan tag
44+
:returns: Atlan-internal ID string of the Atlan tag
45+
"""
46+
return await self._get_id_for_name(name=name)
47+
48+
async def get_name_for_id(self, idstr: str) -> Optional[str]:
49+
"""
50+
Translate the provided Atlan-internal classification ID string to the human-readable Atlan tag name.
51+
52+
:param idstr: Atlan-internal ID string of the Atlan tag
53+
:returns: human-readable name of the Atlan tag
54+
"""
55+
return await self._get_name_for_id(idstr=idstr)
56+
57+
async def get_source_tags_attr_id(self, id: str) -> Optional[str]:
58+
"""
59+
Translate the provided Atlan-internal Atlan tag ID string to the Atlan-internal name of the attribute that
60+
captures tag attachment details (for source-synced tags).
61+
62+
:param id: Atlan-internal ID string of the Atlan tag
63+
:returns: Atlan-internal ID string of the attribute containing source-synced tag attachment details
64+
"""
65+
return await self._get_source_tags_attr_id(id)
66+
67+
async def _refresh_cache(self) -> None:
68+
"""
69+
Refreshes the cache of Atlan tags by requesting the full set of Atlan tags from Atlan.
70+
"""
71+
async with self.lock:
72+
# Make async API call directly
73+
response = await self.client.typedef.get(
74+
type_category=[
75+
AtlanTypeCategory.CLASSIFICATION,
76+
AtlanTypeCategory.STRUCT,
77+
]
78+
)
79+
80+
if not response or not response.struct_defs:
81+
raise ErrorCode.EXPIRED_API_TOKEN.exception_with_parameters()
82+
83+
# Process response using shared logic
84+
(
85+
self.cache_by_id,
86+
self.map_id_to_name,
87+
self.map_name_to_id,
88+
self.map_id_to_source_tags_attr_id,
89+
) = AtlanTagCacheCommon.refresh_cache_data(response)
90+
91+
async def _get_id_for_name(self, name: str) -> Optional[str]:
92+
"""
93+
Translate the provided human-readable Atlan tag name to its Atlan-internal ID string.
94+
95+
:param name: human-readable name of the Atlan tag
96+
:returns: Atlan-internal ID string of the Atlan tag
97+
"""
98+
if not self.cache_by_id:
99+
await self._refresh_cache()
100+
result, should_refresh = AtlanTagCacheCommon.get_id_for_name(
101+
name, self.map_name_to_id, self.deleted_names
102+
)
103+
if should_refresh:
104+
await self._refresh_cache()
105+
return AtlanTagCacheCommon.get_id_for_name_after_refresh(
106+
name, self.map_name_to_id, self.deleted_names
107+
)
108+
return result
109+
110+
async def _get_name_for_id(self, idstr: str) -> Optional[str]:
111+
"""
112+
Translate the provided Atlan-internal classification ID string to the human-readable Atlan tag name.
113+
114+
:param idstr: Atlan-internal ID string of the Atlan tag
115+
:returns: human-readable name of the Atlan tag
116+
"""
117+
if not self.cache_by_id:
118+
await self._refresh_cache()
119+
result, should_refresh = AtlanTagCacheCommon.get_name_for_id(
120+
idstr, self.map_id_to_name, self.deleted_ids
121+
)
122+
if should_refresh:
123+
await self._refresh_cache()
124+
return AtlanTagCacheCommon.get_name_for_id_after_refresh(
125+
idstr, self.map_id_to_name, self.deleted_ids
126+
)
127+
return result
128+
129+
async def _get_source_tags_attr_id(self, id: str) -> Optional[str]:
130+
"""
131+
Translate the provided Atlan-internal Atlan tag ID string to the Atlan-internal name of the attribute that
132+
captures tag attachment details (for source-synced tags).
133+
134+
:param id: Atlan-internal ID string of the Atlan tag
135+
:returns: Atlan-internal ID string of the attribute containing source-synced tag attachment details
136+
"""
137+
if not self.cache_by_id:
138+
await self._refresh_cache()
139+
result, should_refresh = AtlanTagCacheCommon.get_source_tags_attr_id(
140+
id, self.map_id_to_source_tags_attr_id, self.deleted_ids
141+
)
142+
if should_refresh:
143+
await self._refresh_cache()
144+
return AtlanTagCacheCommon.get_source_tags_attr_id_after_refresh(
145+
id, self.map_id_to_source_tags_attr_id, self.deleted_ids
146+
)
147+
return result

0 commit comments

Comments
 (0)