Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
4f4e55f
APP-5243: Migrate to `httpx` (goodbye `requests`)
Aryamanz29 Jul 14, 2025
7a5148f
[ci] Dropped support for `Python 3.8` (httpx-retries requires Python …
Aryamanz29 Jul 14, 2025
98b51c8
[changes] Fixed `AtlanClient` event stream handling
Aryamanz29 Jul 15, 2025
5a09533
[refactor] Migrated `AssetClient/AsyncAssetClient` to use shared oper…
Aryamanz29 Aug 1, 2025
14cb505
[ci:temp] Skip `qa-checks` job
Aryamanz29 Aug 1, 2025
dfc0ac3
[refactor] Migrated other clients to use `common` operations
Aryamanz29 Aug 4, 2025
22655d6
[tests/fix] Fixed failing cm test due to incorrect refactoring (save_…
Aryamanz29 Aug 5, 2025
ce98b05
[refactor] Migrated `caches` to use `common` operations
Aryamanz29 Aug 6, 2025
bcb5fbe
[refactor] Rebased, refactored (`dq_template_config_cache`) and did c…
Aryamanz29 Aug 7, 2025
b091fe4
[refactor] Refactored translators, retranslators, async batch and AIO…
Aryamanz29 Aug 7, 2025
27e7266
[deps] Added `pytest-asyncio` dependency for async testing
Aryamanz29 Aug 8, 2025
1a82ee6
[tests] Added async `connection_cache` tests
Aryamanz29 Aug 8, 2025
d4f866f
[tests] Added async `source_tag_cache` tests
Aryamanz29 Aug 8, 2025
3bbbea0
[refactor] Use `@validate_arguments` decorator in aio clients and use…
Aryamanz29 Aug 11, 2025
00ecd03
[tests] Added asyncio configuration for aio tests
Aryamanz29 Aug 11, 2025
139d097
[tests] Added async `client`, `credential_client`, `file_client`, and…
Aryamanz29 Aug 11, 2025
5494461
[tests] Added async `cm`, `audit`, `query`, and `sso`, `task`, `atlan…
Aryamanz29 Aug 12, 2025
7c3a70e
Merge branch 'main' into APP-5243
Aryamanz29 Aug 14, 2025
f61c373
[uv] Resync uv lock dependencies
Aryamanz29 Aug 14, 2025
3e73176
[feat] Added client type to the sdk headers
Aryamanz29 Aug 14, 2025
0bff7e4
[tests:aio] Added async `test_client` integration tests
Aryamanz29 Aug 14, 2025
c8c71ff
[tests:aio] Added async `test_index_search` integration tests
Aryamanz29 Aug 15, 2025
e163551
[change] Used tenacity for `_retrieve_connection_with_retry`
Aryamanz29 Aug 15, 2025
f337035
[tests:aio] Added remaining tests: admin, batch, tag, ol, cm, conn, g…
Aryamanz29 Aug 17, 2025
0ef9249
[tests] Fixed unit tests + aio integration runtime warnings (test_cus…
Aryamanz29 Aug 18, 2025
6fdcc42
[fix] Fixed race condition in shared session headers
Aryamanz29 Aug 18, 2025
62eea05
Merge branch 'main' into APP-5243
Aryamanz29 Aug 18, 2025
7152c45
[feat] Added `AsyncAtlanClient.from_token_guid()` method
Aryamanz29 Aug 18, 2025
74d5cea
[tests:aio] Fixed `aio` tests suite (+ test_lineage)
Aryamanz29 Aug 18, 2025
f5c3f82
[ci] Added `pattern` and `label` based execution of `aio` integration…
Aryamanz29 Aug 18, 2025
719b047
[ci] Fixed changed files output for `check-aio-changes` job
Aryamanz29 Aug 18, 2025
8dda545
[cleanup] Fixed imports + aio tests collision (admin_test)
Aryamanz29 Aug 19, 2025
d67c3b7
[generator] Fixed generator as per latest async refactoring changes
Aryamanz29 Aug 19, 2025
ba9a30b
[generator] Generated latest typedef models
Aryamanz29 Aug 19, 2025
1d17747
[change] Fixed async cm handling in referenceable.py (json() method)
Aryamanz29 Aug 19, 2025
dbf74cc
[ci] Added `-vv` flag for pytest verbose output
Aryamanz29 Aug 19, 2025
a609595
[qa] Fixed all mypy violations
Aryamanz29 Aug 19, 2025
0a72ae8
[test:aio] Improved `test_asset_batch` test
Aryamanz29 Aug 19, 2025
d4477ac
[change] lets use consistent naming in FS (`execute_async()`)
Aryamanz29 Aug 20, 2025
896ae40
[ci:conda] changed python constraint to `< 4`
Aryamanz29 Aug 20, 2025
020033d
[ci] enabled qa checks
Aryamanz29 Aug 20, 2025
767cdee
[tests] Use `token_client` (retry=0) to avoid extra token generation
Aryamanz29 Aug 20, 2025
ff0f932
[qa] Fixed mypy violations
Aryamanz29 Aug 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build_and_upload_conda_packages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
with:
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/pyatlan-pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
matrix:
# Specify version as a string
# https://github.com/actions/setup-python/issues/160"
python-version: ["3.8", "3.12", "3.13"]
python-version: ["3.9", "3.12", "3.13"]

steps:
- name: Checkout code
Expand Down Expand Up @@ -48,7 +48,7 @@ jobs:
matrix:
# Specify version as a string
# https://github.com/actions/setup-python/issues/160"
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- name: Checkout code
Expand All @@ -65,8 +65,8 @@ jobs:
- name: Install dependencies
run: uv sync --extra dev

- name: QA checks (ruff-format, ruff-lint, mypy)
run: uv run ./qa-checks
# - name: QA checks (ruff-format, ruff-lint, mypy)
# run: uv run ./qa-checks

- name: Run unit tests
env: # Test tenant environment variables
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pyatlan-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- name: Install uv
uses: astral-sh/setup-uv@v6
- name: Install dependencies
run: uv sync --extra dev
run: uv sync
- name: check tag
id: check-tag
run: uv run python check_tag.py
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pyatlan-test-cron.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8, 3.12]
python-version: [3.9, 3.12]

steps:
- uses: actions/checkout@v4
Expand Down
33 changes: 33 additions & 0 deletions pyatlan/cache/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
"""
Async cache modules for Atlan.

This module provides async versions of all cache functionality
with the same API as the sync versions, just requiring await.

Pattern: All async cache methods reuse shared business logic from pyatlan.cache.common
to ensure identical behavior with sync cache implementations.
"""

from .atlan_tag_cache import AsyncAtlanTagCache
from .connection_cache import AsyncConnectionCache
from .custom_metadata_cache import AsyncCustomMetadataCache
from .dq_template_config_cache import AsyncDQTemplateConfigCache
from .enum_cache import AsyncEnumCache
from .group_cache import AsyncGroupCache
from .role_cache import AsyncRoleCache
from .source_tag_cache import AsyncSourceTagCache
from .user_cache import AsyncUserCache

__all__ = [
"AsyncAtlanTagCache",
"AsyncConnectionCache",
"AsyncCustomMetadataCache",
"AsyncDQTemplateConfigCache",
"AsyncEnumCache",
"AsyncGroupCache",
"AsyncRoleCache",
"AsyncSourceTagCache",
"AsyncUserCache",
]
162 changes: 162 additions & 0 deletions pyatlan/cache/aio/abstract_asset_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2025 Atlan Pte. Ltd.
from __future__ import annotations

import asyncio
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Dict

from pyatlan.cache.abstract_asset_cache import AbstractAssetName
from pyatlan.errors import ErrorCode
from pyatlan.model.assets import Asset
from pyatlan.model.enums import AtlanConnectorType

if TYPE_CHECKING:
from pyatlan.client.aio import AsyncAtlanClient


class AsyncAbstractAssetCache(ABC):
"""
Async base class for reusable components that are common
to all caches, where a cache is populated entry-by-entry.
"""

def __init__(self, client: AsyncAtlanClient):
self.client = client
self.lock = asyncio.Lock()
self.name_to_guid: Dict[str, str] = dict()
self.guid_to_asset: Dict[str, Asset] = dict()
self.qualified_name_to_guid: Dict[str, str] = dict()

@abstractmethod
async def lookup_by_guid(self, guid: str):
"""Abstract method to lookup asset by guid."""

@abstractmethod
async def lookup_by_qualified_name(self, qualified_name: str):
"""Abstract method to lookup asset by qualified name."""

@abstractmethod
async def lookup_by_name(self, name: Any):
"""Abstract method to lookup asset by name."""

@abstractmethod
def get_name(self, asset: Asset):
"""Abstract method to get name from asset."""

def is_guid_known(self, guid: str) -> bool:
"""
Checks whether the provided Atlan-internal UUID is known.
NOTE: will not refresh the cache itself to determine this.

:param guid: Atlan-internal UUID of the object
:returns: `True` if the object is known, `False` otherwise
"""
return guid in self.guid_to_asset

def is_qualified_name_known(self, qualified_name: str) -> bool:
"""
Checks whether the provided Atlan-internal ID string is known.
NOTE: will not refresh the cache itself to determine this.

:param qualified_name: Atlan-internal ID string of the object
:returns: `True` if the object is known, `False` otherwise
"""
return qualified_name in self.qualified_name_to_guid

def is_name_known(self, name: str) -> bool:
"""
Checks whether the provided Atlan-internal ID string is known.
NOTE: will not refresh the cache itself to determine this.

:param name: human-constructable name of the object
:returns: `True` if the object is known, `False` otherwise
"""
return name in self.name_to_guid

def cache(self, asset: Asset):
"""
Add an entry to the cache.

:param asset: to be cached
"""
name = asset and self.get_name(asset)
if not all([name, asset.guid, asset.qualified_name]):
return
self.name_to_guid[name] = asset.guid # type: ignore[index]
self.guid_to_asset[asset.guid] = asset # type: ignore[index]
self.qualified_name_to_guid[asset.qualified_name] = asset.guid # type: ignore[index]

async def _get_by_guid(self, guid: str, allow_refresh: bool = True):
"""
Retrieve an asset from the cache by its UUID.

:param guid: UUID of the asset in Atlan
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
:returns: the asset (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the object cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no UUID was provided for the object to retrieve
"""
if not guid:
raise ErrorCode.MISSING_ID.exception_with_parameters()
asset = self.guid_to_asset.get(guid)
if not asset and allow_refresh:
await self.lookup_by_guid(guid)
asset = self.guid_to_asset.get(guid)
if not asset:
raise ErrorCode.ASSET_NOT_FOUND_BY_GUID.exception_with_parameters(guid)
return asset

async def _get_by_qualified_name(
self, qualified_name: str, allow_refresh: bool = True
):
"""
Retrieve an asset from the cache by its qualifiedName.

:param qualified_name: qualifiedName of the asset in Atlan
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
:returns: the asset (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the object cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no qualified name was provided for the object to retrieve
"""
if not qualified_name:
raise ErrorCode.MISSING_ID.exception_with_parameters()
guid = self.qualified_name_to_guid.get(qualified_name)
if not guid and allow_refresh:
await self.lookup_by_qualified_name(qualified_name)
guid = self.qualified_name_to_guid.get(qualified_name)
if not guid:
raise ErrorCode.ASSET_NOT_FOUND_BY_QN.exception_with_parameters(
qualified_name,
AtlanConnectorType._get_connector_type_from_qualified_name(
qualified_name
).value,
)

return await self._get_by_guid(guid=guid, allow_refresh=False)

async def _get_by_name(self, name, allow_refresh: bool = True):
"""
Retrieve an asset from the cache by its uniquely identifiable name.

:param name: uniquely identifiable name of the asset in Atlan
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
:returns: the asset (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the object cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no name was provided for the object to retrieve
"""
if not isinstance(name, AbstractAssetName):
raise ErrorCode.MISSING_NAME.exception_with_parameters()
guid = self.name_to_guid.get(str(name))
if not guid and allow_refresh:
await self.lookup_by_name(name)
guid = self.name_to_guid.get(str(name))
if not guid:
raise ErrorCode.ASSET_NOT_FOUND_BY_NAME.exception_with_parameters(
name._TYPE_NAME, name
)

return await self._get_by_guid(guid=guid, allow_refresh=False)
Loading