Skip to content

Commit bcb5fbe

Browse files
committed
[refactor] Rebased, refactored (dq_template_config_cache) and did cleaned up
1 parent ce98b05 commit bcb5fbe

File tree

16 files changed

+190
-62
lines changed

16 files changed

+190
-62
lines changed

pyatlan/cache/aio/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from .atlan_tag_cache import AsyncAtlanTagCache
1414
from .connection_cache import AsyncConnectionCache
1515
from .custom_metadata_cache import AsyncCustomMetadataCache
16+
from .dq_template_config_cache import AsyncDQTemplateConfigCache
1617
from .enum_cache import AsyncEnumCache
1718
from .group_cache import AsyncGroupCache
1819
from .role_cache import AsyncRoleCache
@@ -23,6 +24,7 @@
2324
"AsyncAtlanTagCache",
2425
"AsyncConnectionCache",
2526
"AsyncCustomMetadataCache",
27+
"AsyncDQTemplateConfigCache",
2628
"AsyncEnumCache",
2729
"AsyncGroupCache",
2830
"AsyncRoleCache",
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# Copyright 2025 Atlan Pte. Ltd.
3+
from __future__ import annotations
4+
5+
import asyncio
6+
from typing import TYPE_CHECKING, Dict, Optional
7+
8+
from pyatlan.cache.common.dq_template_config_cache import DQTemplateConfigCacheCommon
9+
10+
if TYPE_CHECKING:
11+
from pyatlan.client.aio.client import AsyncAtlanClient
12+
13+
14+
class AsyncDQTemplateConfigCache:
15+
"""
16+
Lazily-loaded async cache for DQ rule template configurations to avoid multiple API calls.
17+
"""
18+
19+
def __init__(self, client: AsyncAtlanClient):
20+
self.client: AsyncAtlanClient = client
21+
self._cache: Dict[str, Dict] = {}
22+
self._lock: asyncio.Lock = asyncio.Lock()
23+
self._initialized: bool = False
24+
25+
async def refresh_cache(self) -> None:
26+
"""
27+
Refreshes the cache of DQ template configurations by requesting the full set from Atlan.
28+
"""
29+
await self._refresh_cache()
30+
31+
async def get_template_config(self, rule_type: str) -> Optional[Dict]:
32+
"""
33+
Get template configuration for a specific rule type.
34+
35+
:param rule_type: The display name of the rule type
36+
:returns: Template configuration dict or None if not found
37+
"""
38+
if not self._initialized:
39+
await self._refresh_cache()
40+
41+
return self._cache.get(rule_type)
42+
43+
async def _refresh_cache(self) -> None:
44+
"""Refresh the cache by fetching all template configurations."""
45+
async with self._lock:
46+
if self._initialized:
47+
return
48+
49+
try:
50+
search_request = DQTemplateConfigCacheCommon.prepare_search_request()
51+
request = search_request.to_request()
52+
results = await self.client.asset.search(request)
53+
54+
success, error = DQTemplateConfigCacheCommon.process_search_results(
55+
results, self._cache
56+
)
57+
58+
if success:
59+
self._initialized = True
60+
else:
61+
# If cache refresh fails, mark as initialized to prevent infinite retries
62+
self._initialized = True
63+
if error:
64+
raise error
65+
except Exception:
66+
# If cache refresh fails, mark as initialized to prevent infinite retries
67+
self._initialized = True
68+
raise

pyatlan/cache/common/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# Cache shared logic classes
1616
from .atlan_tag_cache import AtlanTagCacheCommon
1717
from .custom_metadata_cache import CustomMetadataCacheCommon
18+
from .dq_template_config_cache import DQTemplateConfigCacheCommon
1819
from .enum_cache import EnumCacheCommon
1920
from .group_cache import GroupCacheCommon
2021
from .role_cache import RoleCacheCommon
@@ -24,6 +25,7 @@
2425
# Cache shared logic classes
2526
"AtlanTagCacheCommon",
2627
"CustomMetadataCacheCommon",
28+
"DQTemplateConfigCacheCommon",
2729
"EnumCacheCommon",
2830
"GroupCacheCommon",
2931
"RoleCacheCommon",
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# Copyright 2025 Atlan Pte. Ltd.
3+
from __future__ import annotations
4+
5+
from typing import TYPE_CHECKING, Dict, Optional, Tuple
6+
7+
from pyatlan.model.assets import Asset
8+
from pyatlan.model.fluent_search import FluentSearch
9+
10+
if TYPE_CHECKING:
11+
pass
12+
13+
14+
class DQTemplateConfigCacheCommon:
15+
"""
16+
Common logic for DQ rule template configuration cache operations.
17+
Provides shared functionality between sync and async implementations.
18+
"""
19+
20+
@classmethod
21+
def prepare_search_request(cls) -> FluentSearch:
22+
"""
23+
Prepare the search request for fetching DQ rule template configurations.
24+
25+
:returns: FluentSearch configured for DQ rule templates
26+
"""
27+
try:
28+
from pyatlan.model.assets.core.alpha__d_q_rule_template import (
29+
alpha_DQRuleTemplate,
30+
)
31+
32+
return (
33+
FluentSearch()
34+
.where(Asset.TYPE_NAME.eq(alpha_DQRuleTemplate.__name__))
35+
.include_on_results(alpha_DQRuleTemplate.NAME)
36+
.include_on_results(alpha_DQRuleTemplate.QUALIFIED_NAME)
37+
.include_on_results(alpha_DQRuleTemplate.DISPLAY_NAME)
38+
.include_on_results(
39+
alpha_DQRuleTemplate.ALPHADQ_RULE_TEMPLATE_DIMENSION
40+
)
41+
.include_on_results(alpha_DQRuleTemplate.ALPHADQ_RULE_TEMPLATE_CONFIG)
42+
)
43+
except ImportError:
44+
# If the alpha_DQRuleTemplate is not available, return empty search
45+
return FluentSearch()
46+
47+
@classmethod
48+
def process_search_results(
49+
cls, search_results, cache: Dict[str, Dict]
50+
) -> Tuple[bool, Optional[Exception]]:
51+
"""
52+
Process search results and populate the cache.
53+
54+
:param search_results: Iterator of search results
55+
:param cache: Cache dictionary to populate
56+
:returns: Tuple of (success, exception if any)
57+
"""
58+
try:
59+
for result in search_results:
60+
template_config = {
61+
"name": result.name,
62+
"qualified_name": result.qualified_name,
63+
"display_name": result.display_name,
64+
"dimension": result.alpha_dq_rule_template_dimension, # type: ignore
65+
"config": result.alpha_dq_rule_template_config, # type: ignore
66+
}
67+
cache[result.display_name] = template_config # type: ignore
68+
return True, None
69+
except Exception as e:
70+
return False, e

pyatlan/cache/dq_template_config_cache.py

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
import threading
66
from typing import TYPE_CHECKING, Dict, Optional
77

8-
from pyatlan.model.assets import Asset
9-
from pyatlan.model.fluent_search import FluentSearch
8+
from pyatlan.cache.common.dq_template_config_cache import DQTemplateConfigCacheCommon
109

1110
if TYPE_CHECKING:
1211
from pyatlan.client.atlan import AtlanClient
@@ -23,6 +22,12 @@ def __init__(self, client: AtlanClient):
2322
self._lock: threading.Lock = threading.Lock()
2423
self._initialized: bool = False
2524

25+
def refresh_cache(self) -> None:
26+
"""
27+
Refreshes the cache of DQ template configurations by requesting the full set from Atlan.
28+
"""
29+
self._refresh_cache()
30+
2631
def get_template_config(self, rule_type: str) -> Optional[Dict]:
2732
"""
2833
Get template configuration for a specific rule type.
@@ -42,36 +47,21 @@ def _refresh_cache(self) -> None:
4247
return
4348

4449
try:
45-
from pyatlan.model.assets.core.alpha__d_q_rule_template import (
46-
alpha_DQRuleTemplate,
47-
)
48-
49-
request = (
50-
FluentSearch()
51-
.where(Asset.TYPE_NAME.eq(alpha_DQRuleTemplate.__name__))
52-
.include_on_results(alpha_DQRuleTemplate.NAME)
53-
.include_on_results(alpha_DQRuleTemplate.QUALIFIED_NAME)
54-
.include_on_results(alpha_DQRuleTemplate.DISPLAY_NAME)
55-
.include_on_results(
56-
alpha_DQRuleTemplate.ALPHADQ_RULE_TEMPLATE_DIMENSION
57-
)
58-
.include_on_results(
59-
alpha_DQRuleTemplate.ALPHADQ_RULE_TEMPLATE_CONFIG
60-
)
61-
).to_request()
62-
50+
search_request = DQTemplateConfigCacheCommon.prepare_search_request()
51+
request = search_request.to_request()
6352
results = self.client.asset.search(request)
64-
for result in results:
65-
template_config = {
66-
"name": result.name,
67-
"qualified_name": result.qualified_name,
68-
"display_name": result.display_name,
69-
"dimension": result.alpha_dq_rule_template_dimension, # type: ignore
70-
"config": result.alpha_dq_rule_template_config, # type: ignore
71-
}
72-
self._cache[result.display_name] = template_config # type: ignore
7353

74-
self._initialized = True
54+
success, error = DQTemplateConfigCacheCommon.process_search_results(
55+
results, self._cache
56+
)
57+
58+
if success:
59+
self._initialized = True
60+
else:
61+
# If cache refresh fails, mark as initialized to prevent infinite retries
62+
self._initialized = True
63+
if error:
64+
raise error
7565
except Exception:
7666
# If cache refresh fails, mark as initialized to prevent infinite retries
7767
self._initialized = True

pyatlan/client/aio/asset.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,29 +108,21 @@ async def search(
108108
self,
109109
criteria: IndexSearchRequest,
110110
bulk=False,
111-
concurrent_pages: bool = False,
112-
prefetch_pages: int = 1,
113111
) -> AsyncIndexSearchResults:
114112
"""
115113
Async search that reuses shared business logic via Search.
116114
117115
:param criteria: search criteria
118116
:param bulk: whether to use bulk search mode
119-
:param concurrent_pages: whether to enable concurrent page fetching (experimental)
120-
:param prefetch_pages: number of pages to prefetch in background (only used with concurrent_pages=True)
121-
:returns: AsyncIndexSearchResults or ConcurrentAsyncIndexSearchResults
117+
:returns: AsyncIndexSearchResults
122118
"""
123119
INDEX_SEARCH, request_obj = Search.prepare_request(criteria, bulk)
124120
raw_json = await self._async_client._call_api(
125121
INDEX_SEARCH, request_obj=request_obj
126122
)
127-
response = Search.process_response(raw_json, criteria, bulk, self._async_client)
123+
response = Search.process_response(raw_json, criteria)
128124
if Search._check_for_bulk_search(criteria, response["count"], bulk):
129-
return await self.search(
130-
criteria,
131-
concurrent_pages=concurrent_pages,
132-
prefetch_pages=prefetch_pages,
133-
)
125+
return await self.search(criteria, bulk)
134126

135127
return AsyncIndexSearchResults(
136128
self._async_client,

pyatlan/client/aio/atlan.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
# SPDX-License-Identifier: Apache-2.0
22
# Copyright 2025 Atlan Pte. Ltd.
3+
from __future__ import annotations
34

45
import contextlib
5-
from typing import AsyncGenerator, Optional
6+
from typing import TYPE_CHECKING, AsyncGenerator, Optional
67

78
from httpx_retries import Retry
89
from pydantic.v1 import HttpUrl
910

10-
from pyatlan.client.aio.client import AsyncAtlanClient
1111
from pyatlan.client.atlan import DEFAULT_RETRY
1212

13+
if TYPE_CHECKING:
14+
from pyatlan.client.aio.client import AsyncAtlanClient
15+
1316

1417
@contextlib.asynccontextmanager
1518
async def client_connection(

pyatlan/client/aio/audit.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ async def search(
5757
raw_json = await self._client._call_api(endpoint, request_obj=request_obj)
5858

5959
# Process response using shared logic
60-
response = AuditSearch.process_response(raw_json, criteria, bulk, self._client)
60+
response = AuditSearch.process_response(raw_json)
6161

6262
# Check if we need to convert to bulk search using shared logic
6363
if AuditSearch.check_for_bulk_search(response["count"], criteria, bulk):

pyatlan/client/aio/client.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
AsyncAtlanTagCache,
1818
AsyncConnectionCache,
1919
AsyncCustomMetadataCache,
20+
AsyncDQTemplateConfigCache,
2021
AsyncEnumCache,
2122
AsyncGroupCache,
2223
AsyncRoleCache,
@@ -95,6 +96,9 @@ class AsyncAtlanClient(AtlanClient):
9596
_async_custom_metadata_cache: Optional[AsyncCustomMetadataCache] = PrivateAttr(
9697
default=None
9798
)
99+
_async_dq_template_config_cache: Optional[AsyncDQTemplateConfigCache] = PrivateAttr(
100+
default=None
101+
)
98102
_async_enum_cache: Optional[AsyncEnumCache] = PrivateAttr(default=None)
99103
_async_group_cache: Optional[AsyncGroupCache] = PrivateAttr(default=None)
100104
_async_role_cache: Optional[AsyncRoleCache] = PrivateAttr(default=None)
@@ -252,6 +256,15 @@ def custom_metadata_cache(self) -> AsyncCustomMetadataCache:
252256
self._async_custom_metadata_cache = AsyncCustomMetadataCache(client=self)
253257
return self._async_custom_metadata_cache
254258

259+
@property
260+
def dq_template_config_cache(self) -> AsyncDQTemplateConfigCache:
261+
"""Get async DQ template config cache with same API as sync"""
262+
if self._async_dq_template_config_cache is None:
263+
self._async_dq_template_config_cache = AsyncDQTemplateConfigCache(
264+
client=self
265+
)
266+
return self._async_dq_template_config_cache
267+
255268
@property
256269
def enum_cache(self) -> AsyncEnumCache:
257270
"""Get async enum cache with same API as sync"""

pyatlan/client/aio/user.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from pyatlan.model.user import AtlanUser, UserMinimalResponse, UserRequest
3434

3535
if TYPE_CHECKING:
36-
from .client import AsyncAtlanClient
36+
from pyatlan.client.aio.client import AsyncAtlanClient
3737

3838

3939
class AsyncUserClient:

0 commit comments

Comments
 (0)