Skip to content

Commit 5aaa2db

Browse files
authored
Merge pull request #633 from atlanhq/APP-6766
APP-6766: Prevent pagination loop break by filtering invalid assets
2 parents 2b09931 + 18da2ac commit 5aaa2db

File tree

4 files changed

+84
-5
lines changed

4 files changed

+84
-5
lines changed

pyatlan/client/asset.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
Range,
100100
SortItem,
101101
Term,
102+
Terms,
102103
with_active_category,
103104
with_active_glossary,
104105
with_active_term,
@@ -171,6 +172,33 @@ def _get_bulk_search_log_message(self, bulk):
171172
+ "Ignoring requests for offset-based paging and using timestamp-based paging instead."
172173
)
173174

175+
@staticmethod
176+
def _ensure_type_filter_present(criteria: IndexSearchRequest) -> None:
177+
"""
178+
Ensures that at least one 'typeName' filter is present in the given search criteria.
179+
If no such filter exists, appends a default filter for 'Referenceable'.
180+
"""
181+
if not (
182+
criteria
183+
and criteria.dsl
184+
and criteria.dsl.query
185+
and isinstance(criteria.dsl.query, Bool)
186+
and criteria.dsl.query.filter
187+
and isinstance(criteria.dsl.query.filter, list)
188+
):
189+
return
190+
191+
has_type_filter = any(
192+
isinstance(f, (Term, Terms))
193+
and f.field == Referenceable.TYPE_NAME.keyword_field_name
194+
for f in criteria.dsl.query.filter
195+
)
196+
197+
if not has_type_filter:
198+
criteria.dsl.query.filter.append(
199+
Term.with_super_type_names(Referenceable.__name__)
200+
)
201+
174202
# TODO: Try adding @validate_arguments to this method once
175203
# the issue below is fixed or when we switch to pydantic v2
176204
# https://github.com/atlanhq/atlan-python/pull/88#discussion_r1260892704
@@ -201,6 +229,7 @@ def search(self, criteria: IndexSearchRequest, bulk=False) -> IndexSearchResults
201229
raise ErrorCode.UNABLE_TO_RUN_BULK_WITH_SORTS.exception_with_parameters()
202230
criteria.dsl.sort = self._prepare_sorts_for_bulk_search(criteria.dsl.sort)
203231
LOGGER.debug(self._get_bulk_search_log_message(bulk))
232+
self._ensure_type_filter_present(criteria)
204233
raw_json = self._client._call_api(
205234
INDEX_SEARCH,
206235
request_obj=criteria,
@@ -1975,8 +2004,8 @@ def _get_next_page_json(self, is_bulk_search: bool = False):
19752004
try:
19762005
self._process_entities(raw_json["entities"])
19772006
if is_bulk_search:
1978-
self._update_first_last_record_creation_times()
19792007
self._filter_processed_assets()
2008+
self._update_first_last_record_creation_times()
19802009
return raw_json
19812010

19822011
except ValidationError as err:

pyatlan/model/audit.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,8 @@ def _get_next_page_json(self, is_bulk_search: bool = False):
341341
List[EntityAudit], raw_json[ENTITY_AUDITS]
342342
)
343343
if is_bulk_search:
344-
self._update_first_last_record_creation_times()
345344
self._filter_processed_entities()
345+
self._update_first_last_record_creation_times()
346346
return raw_json
347347
except ValidationError as err:
348348
raise ErrorCode.JSON_ERROR.exception_with_parameters(

pyatlan/model/search_log.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,8 +502,8 @@ def _get_next_page_json(self, is_bulk_search: bool = False):
502502
try:
503503
self._log_entries = parse_obj_as(List[SearchLogEntry], raw_json["logs"])
504504
if is_bulk_search:
505-
self._update_first_last_record_creation_times()
506505
self._filter_processed_entities()
506+
self._update_first_last_record_creation_times()
507507
return raw_json
508508
except ValidationError as err:
509509
raise ErrorCode.JSON_ERROR.exception_with_parameters(

tests/unit/test_client.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
from pyatlan.model.group import GroupRequest
5656
from pyatlan.model.lineage import LineageListRequest
5757
from pyatlan.model.response import AssetMutationResponse
58-
from pyatlan.model.search import Bool, Term
58+
from pyatlan.model.search import Bool, Term, TermAttributes
5959
from pyatlan.model.search_log import SearchLogRequest
6060
from pyatlan.model.typedef import EnumDef
6161
from pyatlan.model.user import AtlanUser, UserRequest
@@ -1606,6 +1606,56 @@ def test_index_search_with_no_aggregation_results(
16061606
mock_api_caller.reset_mock()
16071607

16081608

1609+
def test_type_name_in_asset_search(mock_api_caller):
1610+
# When the type name is not present in the request
1611+
request = (FluentSearch().where(CompoundQuery.active_assets())).to_request()
1612+
client = AssetClient(mock_api_caller)
1613+
client._ensure_type_filter_present(request)
1614+
1615+
assert request.dsl.query and request.dsl.query.filter
1616+
assert isinstance(request.dsl.query.filter, list)
1617+
1618+
has_type_filter = any(
1619+
isinstance(f, Term) and f.field == TermAttributes.SUPER_TYPE_NAMES.value
1620+
for f in request.dsl.query.filter
1621+
)
1622+
assert has_type_filter is True
1623+
1624+
# When the type name is present in the request (no need to add super type filter)
1625+
request = (
1626+
FluentSearch()
1627+
.where(CompoundQuery.active_assets())
1628+
.where(CompoundQuery.asset_type(AtlasGlossary))
1629+
).to_request()
1630+
client._ensure_type_filter_present(request)
1631+
1632+
assert request.dsl.query and request.dsl.query.filter
1633+
assert isinstance(request.dsl.query.filter, list)
1634+
1635+
has_type_filter = any(
1636+
isinstance(f, Term) and f.field == TermAttributes.SUPER_TYPE_NAMES.value
1637+
for f in request.dsl.query.filter
1638+
)
1639+
assert has_type_filter is False
1640+
1641+
# When multiple type name(s) is present in the request (no need to add super type filter)
1642+
request = (
1643+
FluentSearch()
1644+
.where(CompoundQuery.active_assets())
1645+
.where(CompoundQuery.asset_types([AtlasGlossary, AtlasGlossaryTerm]))
1646+
).to_request()
1647+
client._ensure_type_filter_present(request)
1648+
1649+
assert request.dsl.query and request.dsl.query.filter
1650+
assert isinstance(request.dsl.query.filter, list)
1651+
1652+
has_type_filter = any(
1653+
isinstance(f, Term) and f.field == TermAttributes.SUPER_TYPE_NAMES.value
1654+
for f in request.dsl.query.filter
1655+
)
1656+
assert has_type_filter is False
1657+
1658+
16091659
def _assert_search_results(results, response_json, sorts, bulk=False):
16101660
for i, result in enumerate(results):
16111661
assert result and response_json["entities"][i]
@@ -2553,7 +2603,7 @@ def test_atlan_client_headers(client: AtlanClient):
25532603
assert expected == client._session.headers
25542604

25552605

2556-
def test_get_all_pagation(group_client, mock_api_caller):
2606+
def test_get_all_pagination(group_client, mock_api_caller):
25572607
mock_page_1 = [
25582608
{"id": "1", "alias": "Group3"},
25592609
{"id": "2", "alias": "Group4"},

0 commit comments

Comments
 (0)