Skip to content

Commit 2b09931

Browse files
authored
Merge pull request #637 from atlanhq/APP-6827
APP-6827 Add utility method to facilitate processing assets while iterating thru results of a fluent search.
2 parents a130816 + 2a9e5fa commit 2b09931

File tree

2 files changed

+82
-0
lines changed

2 files changed

+82
-0
lines changed

pyatlan/client/asset.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@
99
from enum import Enum
1010
from typing import (
1111
TYPE_CHECKING,
12+
Callable,
1213
Dict,
1314
Generator,
1415
Iterable,
1516
List,
1617
Optional,
18+
Protocol,
1719
Set,
1820
Type,
1921
TypeVar,
@@ -134,6 +136,11 @@
134136
LOGGER = logging.getLogger(__name__)
135137

136138

139+
class IndexSearchRequestProvider(Protocol):
140+
def to_request(self) -> IndexSearchRequest:
141+
pass
142+
143+
137144
class AssetClient:
138145
"""
139146
This class can be used to retrieve information about assets. This class does not need to be instantiated
@@ -1861,6 +1868,45 @@ def get_hierarchy(
18611868
)
18621869
return CategoryHierarchy(top_level=top_categories, stub_dict=category_dict)
18631870

1871+
def process_assets(
1872+
self, search: IndexSearchRequestProvider, func: Callable[[Asset], None]
1873+
) -> int:
1874+
"""
1875+
Process assets matching a search query and apply a processing function to each unique asset.
1876+
1877+
This function iteratively searches for assets using the search provider and processes each
1878+
unique asset using the provided callable function. The uniqueness of assets is determined
1879+
based on their GUIDs. If new assets are found in later iterations that haven't been
1880+
processed yet, the process continues until no more new assets are available to process.
1881+
1882+
Arguments:
1883+
search: IndexSearchRequestProvider
1884+
The search provider that generates search queries and contains the criteria for
1885+
searching the assets such as a FluentSearch.
1886+
func: Callable[[Asset], None]
1887+
A callable function that receives each unique asset as its parameter and performs
1888+
the required operations on it.
1889+
1890+
Returns:
1891+
int: The total number of unique assets that have been processed.
1892+
"""
1893+
guids_processed: set[str] = set()
1894+
has_assets_to_process: bool = True
1895+
iteration_count = 0
1896+
while has_assets_to_process:
1897+
iteration_count += 1
1898+
has_assets_to_process = False
1899+
response = self.search(search.to_request())
1900+
LOGGER.debug(
1901+
"Iteration %d found %d assets.", iteration_count, response.count
1902+
)
1903+
for asset in response:
1904+
if asset.guid not in guids_processed:
1905+
guids_processed.add(asset.guid)
1906+
has_assets_to_process = True
1907+
func(asset)
1908+
return len(guids_processed)
1909+
18641910

18651911
class SearchResults(ABC, Iterable):
18661912
"""

tests/integration/test_client.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
MODULE_NAME = TestId.make_unique("Client")
5555
TEST_USER_DESCRIPTION = "Automated testing of the Python SDK. (USER)"
5656
TEST_SYSTEM_DESCRIPTION = "Automated testing of the Python SDK. (SYSTEM)"
57+
call_count = 0
5758

5859

5960
@pytest.fixture(scope="module")
@@ -1536,3 +1537,38 @@ def test_client_401_token_refresh(
15361537
# Confirm the API key has been updated and results are returned
15371538
assert client.api_key != expired_api_token
15381539
assert results and results.count >= 1
1540+
1541+
1542+
def test_process_assets_when_no_assets_found(client: AtlanClient):
1543+
def should_never_be_called(_: Asset):
1544+
pytest.fail("Should not be called")
1545+
1546+
search = (
1547+
FluentSearch()
1548+
.where(Term.with_state("ACTIVE"))
1549+
.where(Asset.NAME.startswith("zXZ"))
1550+
)
1551+
1552+
processed_count = client.asset.process_assets(
1553+
search=search, func=should_never_be_called
1554+
)
1555+
assert processed_count == 0
1556+
1557+
1558+
def test_process_assets_when_assets_found(client: AtlanClient):
1559+
def doit(asset: Asset):
1560+
global call_count
1561+
call_count += 1
1562+
1563+
search = (
1564+
FluentSearch()
1565+
.where(Term.with_state("ACTIVE"))
1566+
.where(Asset.TYPE_NAME.eq("Table"))
1567+
.where(Asset.NAME.startswith("B"))
1568+
)
1569+
expected_count = client.asset.search(search.to_request()).count
1570+
1571+
processed_count = client.asset.process_assets(search=search, func=doit)
1572+
1573+
assert call_count == expected_count
1574+
assert processed_count == expected_count

0 commit comments

Comments
 (0)