Skip to content

Commit d44d087

Browse files
authored
Merge pull request #573 from atlanhq/APP-5519
APP-5519: Migrated `AtlanClient._default_client` to thread-local storage + caches now bound to optional client
2 parents 4338c2f + 84326a4 commit d44d087

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1157
-750
lines changed

pyatlan/cache/abstract_asset_cache.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,28 @@
44

55
import threading
66
from abc import ABC, abstractmethod
7-
from typing import Any
7+
from typing import TYPE_CHECKING, Any, Dict
88

99
from pyatlan.errors import ErrorCode
1010
from pyatlan.model.assets import Asset
1111
from pyatlan.model.enums import AtlanConnectorType
1212

13+
if TYPE_CHECKING:
14+
from pyatlan.client.atlan import AtlanClient
15+
1316

1417
class AbstractAssetCache(ABC):
1518
"""
1619
Base class for reusable components that are common
1720
to all caches, where a cache is populated entry-by-entry.
1821
"""
1922

20-
def __init__(self, client):
23+
def __init__(self, client: AtlanClient):
2124
self.client = client
2225
self.lock = threading.Lock()
23-
self.name_to_guid = dict()
24-
self.guid_to_asset = dict()
25-
self.qualified_name_to_guid = dict()
26-
27-
@classmethod
28-
@abstractmethod
29-
def get_cache(cls):
30-
"""Abstract method to retreive cache."""
26+
self.name_to_guid: Dict[str, str] = dict()
27+
self.guid_to_asset: Dict[str, Asset] = dict()
28+
self.qualified_name_to_guid: Dict[str, str] = dict()
3129

3230
@abstractmethod
3331
def lookup_by_guid(self, guid: str):
@@ -84,9 +82,9 @@ def cache(self, asset: Asset):
8482
name = asset and self.get_name(asset)
8583
if not all([name, asset.guid, asset.qualified_name]):
8684
return
87-
self.name_to_guid[name] = asset.guid
88-
self.guid_to_asset[asset.guid] = asset
89-
self.qualified_name_to_guid[asset.qualified_name] = asset.guid
85+
self.name_to_guid[name] = asset.guid # type: ignore[index]
86+
self.guid_to_asset[asset.guid] = asset # type: ignore[index]
87+
self.qualified_name_to_guid[asset.qualified_name] = asset.guid # type: ignore[index]
9088

9189
def _get_by_guid(self, guid: str, allow_refresh: bool = True):
9290
"""

pyatlan/cache/atlan_tag_cache.py

Lines changed: 25 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
# SPDX-License-Identifier: Apache-2.0
2-
# Copyright 2022 Atlan Pte. Ltd.
2+
# Copyright 2025 Atlan Pte. Ltd.
3+
from __future__ import annotations
4+
35
from threading import Lock
4-
from typing import Dict, Optional, Set
6+
from typing import TYPE_CHECKING, Dict, Optional, Set
57

6-
from pyatlan.client.typedef import TypeDefClient
78
from pyatlan.errors import ErrorCode
89
from pyatlan.model.enums import AtlanTypeCategory
910
from pyatlan.model.typedef import AtlanTagDef
1011

12+
if TYPE_CHECKING:
13+
from pyatlan.client.atlan import AtlanClient
14+
1115
lock: Lock = Lock()
1216

1317

@@ -17,73 +21,56 @@ class AtlanTagCache:
1721
for Atlan tags.
1822
"""
1923

20-
caches: Dict[int, "AtlanTagCache"] = {}
21-
22-
@classmethod
23-
def get_cache(cls) -> "AtlanTagCache":
24-
from pyatlan.client.atlan import AtlanClient
25-
26-
with lock:
27-
client = AtlanClient.get_default_client()
28-
cache_key = client.cache_key
29-
if cache_key not in cls.caches:
30-
cls.caches[cache_key] = AtlanTagCache(typedef_client=client.typedef)
31-
return cls.caches[cache_key]
24+
def __init__(self, client: AtlanClient):
25+
self.client: AtlanClient = client
26+
self.cache_by_id: Dict[str, AtlanTagDef] = {}
27+
self.map_id_to_name: Dict[str, str] = {}
28+
self.map_name_to_id: Dict[str, str] = {}
29+
self.deleted_ids: Set[str] = set()
30+
self.deleted_names: Set[str] = set()
31+
self.map_id_to_source_tags_attr_id: Dict[str, str] = {}
32+
self.lock: Lock = Lock()
3233

33-
@classmethod
34-
def refresh_cache(cls) -> None:
34+
def refresh_cache(self) -> None:
3535
"""
3636
Refreshes the cache of Atlan tags by requesting the full set of Atlan tags from Atlan.
3737
"""
38-
cls.get_cache()._refresh_cache()
38+
self._refresh_cache()
3939

40-
@classmethod
41-
def get_id_for_name(cls, name: str) -> Optional[str]:
40+
def get_id_for_name(self, name: str) -> Optional[str]:
4241
"""
4342
Translate the provided human-readable Atlan tag name to its Atlan-internal ID string.
4443
4544
:param name: human-readable name of the Atlan tag
4645
:returns: Atlan-internal ID string of the Atlan tag
4746
"""
48-
return cls.get_cache()._get_id_for_name(name=name)
47+
return self._get_id_for_name(name=name)
4948

50-
@classmethod
51-
def get_name_for_id(cls, idstr: str) -> Optional[str]:
49+
def get_name_for_id(self, idstr: str) -> Optional[str]:
5250
"""
5351
Translate the provided Atlan-internal classification ID string to the human-readable Atlan tag name.
5452
5553
:param idstr: Atlan-internal ID string of the Atlan tag
5654
:returns: human-readable name of the Atlan tag
5755
"""
58-
return cls.get_cache()._get_name_for_id(idstr=idstr)
56+
return self._get_name_for_id(idstr=idstr)
5957

60-
@classmethod
61-
def get_source_tags_attr_id(cls, id: str) -> Optional[str]:
58+
def get_source_tags_attr_id(self, id: str) -> Optional[str]:
6259
"""
6360
Translate the provided Atlan-internal Atlan tag ID string to the Atlan-internal name of the attribute that
6461
captures tag attachment details (for source-synced tags).
6562
6663
:param id: Atlan-internal ID string of the Atlan tag
6764
:returns: Atlan-internal ID string of the attribute containing source-synced tag attachment details
6865
"""
69-
return cls.get_cache()._get_source_tags_attr_id(id)
70-
71-
def __init__(self, typedef_client: TypeDefClient):
72-
self.typdef_client: TypeDefClient = typedef_client
73-
self.cache_by_id: Dict[str, AtlanTagDef] = {}
74-
self.map_id_to_name: Dict[str, str] = {}
75-
self.map_name_to_id: Dict[str, str] = {}
76-
self.deleted_ids: Set[str] = set()
77-
self.deleted_names: Set[str] = set()
78-
self.map_id_to_source_tags_attr_id: Dict[str, str] = {}
79-
self.lock: Lock = Lock()
66+
return self._get_source_tags_attr_id(id)
8067

8168
def _refresh_cache(self) -> None:
8269
"""
8370
Refreshes the cache of Atlan tags by requesting the full set of Atlan tags from Atlan.
8471
"""
8572
with self.lock:
86-
response = self.typdef_client.get(
73+
response = self.client.typedef.get(
8774
type_category=[
8875
AtlanTypeCategory.CLASSIFICATION,
8976
AtlanTypeCategory.STRUCT,

pyatlan/cache/connection_cache.py

Lines changed: 27 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
# SPDX-License-Identifier: Apache-2.0
2-
# Copyright 2024 Atlan Pte. Ltd.
2+
# Copyright 2025 Atlan Pte. Ltd.
33
from __future__ import annotations
44

55
import logging
66
import threading
7-
from typing import Dict, Optional, Union
7+
from typing import TYPE_CHECKING, Optional, Union
88

99
from pyatlan.cache.abstract_asset_cache import AbstractAssetCache, AbstractAssetName
10-
from pyatlan.client.atlan import AtlanClient
1110
from pyatlan.model.assets import Asset, Connection
1211
from pyatlan.model.enums import AtlanConnectorType
1312
from pyatlan.model.fluent_search import FluentSearch
1413
from pyatlan.model.search import Term
1514

16-
LOGGER = logging.getLogger(__name__)
15+
if TYPE_CHECKING:
16+
from pyatlan.client.atlan import AtlanClient
1717

1818
lock = threading.Lock()
19+
LOGGER = logging.getLogger(__name__)
1920

2021

2122
class ConnectionCache(AbstractAssetCache):
@@ -37,24 +38,11 @@ class ConnectionCache(AbstractAssetCache):
3738
Connection.CONNECTOR_NAME,
3839
]
3940
SEARCH_ATTRIBUTES = [field.atlan_field_name for field in _SEARCH_FIELDS]
40-
caches: Dict[int, ConnectionCache] = dict()
4141

4242
def __init__(self, client: AtlanClient):
4343
super().__init__(client)
4444

45-
@classmethod
46-
def get_cache(cls) -> ConnectionCache:
47-
from pyatlan.client.atlan import AtlanClient
48-
49-
with lock:
50-
default_client = AtlanClient.get_default_client()
51-
cache_key = default_client.cache_key
52-
if cache_key not in cls.caches:
53-
cls.caches[cache_key] = ConnectionCache(client=default_client)
54-
return cls.caches[cache_key]
55-
56-
@classmethod
57-
def get_by_guid(cls, guid: str, allow_refresh: bool = True) -> Connection:
45+
def get_by_guid(self, guid: str, allow_refresh: bool = True) -> Connection:
5846
"""
5947
Retrieve a connection from the cache by its UUID.
6048
If the asset is not found, it will be looked up and added to the cache.
@@ -66,11 +54,10 @@ def get_by_guid(cls, guid: str, allow_refresh: bool = True) -> Connection:
6654
:raises NotFoundError: if the connection cannot be found (does not exist) in Atlan
6755
:raises InvalidRequestError: if no UUID was provided for the connection to retrieve
6856
"""
69-
return cls.get_cache()._get_by_guid(guid=guid, allow_refresh=allow_refresh)
57+
return self._get_by_guid(guid=guid, allow_refresh=allow_refresh)
7058

71-
@classmethod
7259
def get_by_qualified_name(
73-
cls, qualified_name: str, allow_refresh: bool = True
60+
self, qualified_name: str, allow_refresh: bool = True
7461
) -> Connection:
7562
"""
7663
Retrieve a connection from the cache by its unique Atlan-internal name.
@@ -84,13 +71,12 @@ def get_by_qualified_name(
8471
:raises NotFoundError: if the connection cannot be found (does not exist) in Atlan
8572
:raises InvalidRequestError: if no qualified_name was provided for the connection to retrieve
8673
"""
87-
return cls.get_cache()._get_by_qualified_name(
74+
return self._get_by_qualified_name(
8875
qualified_name=qualified_name, allow_refresh=allow_refresh
8976
)
9077

91-
@classmethod
9278
def get_by_name(
93-
cls, name: ConnectionName, allow_refresh: bool = True
79+
self, name: ConnectionName, allow_refresh: bool = True
9480
) -> Connection:
9581
"""
9682
Retrieve an connection from the cache by its uniquely identifiable name.
@@ -104,7 +90,7 @@ def get_by_name(
10490
:raises NotFoundError: if the connection cannot be found (does not exist) in Atlan
10591
:raises InvalidRequestError: if no name was provided for the connection to retrieve
10692
"""
107-
return cls.get_cache()._get_by_name(name=name, allow_refresh=allow_refresh)
93+
return self._get_by_name(name=name, allow_refresh=allow_refresh)
10894

10995
def lookup_by_guid(self, guid: str) -> None:
11096
if not guid:
@@ -139,21 +125,22 @@ def lookup_by_qualified_name(self, connection_qn: str) -> None:
139125
def lookup_by_name(self, name: ConnectionName) -> None:
140126
if not isinstance(name, ConnectionName):
141127
return
142-
results = self.client.asset.find_connections_by_name(
143-
name=name.name,
144-
connector_type=name.type,
145-
attributes=self.SEARCH_ATTRIBUTES,
146-
)
147-
if not results:
148-
return
149-
if len(results) > 1:
150-
LOGGER.warning(
151-
(
152-
"Found multiple connections of the same type with the same name, caching only the first: %s"
153-
),
154-
name,
128+
with self.lock:
129+
results = self.client.asset.find_connections_by_name(
130+
name=name.name, # type: ignore[arg-type]
131+
connector_type=name.type, # type: ignore[arg-type]
132+
attributes=self.SEARCH_ATTRIBUTES,
155133
)
156-
self.cache(results[0])
134+
if not results:
135+
return
136+
if len(results) > 1:
137+
LOGGER.warning(
138+
(
139+
"Found multiple connections of the same type with the same name, caching only the first: %s"
140+
),
141+
name,
142+
)
143+
self.cache(results[0])
157144

158145
def get_name(self, asset: Asset):
159146
if not isinstance(asset, Connection):
@@ -188,7 +175,7 @@ def __init__(
188175
elif isinstance(connection, str):
189176
tokens = connection.split("/")
190177
if len(tokens) > 1:
191-
self.type = AtlanConnectorType(tokens[0]) # type: ignore[call-arg]
178+
self.type = AtlanConnectorType(tokens[0]).value # type: ignore[call-arg]
192179
self.name = connection[len(tokens[0]) + 1 :] # noqa
193180

194181
def __hash__(self):

0 commit comments

Comments
 (0)