Skip to content

Commit d472f48

Browse files
authored
Merge pull request #569 from atlanhq/APP-5740
APP-5740 : Add support for iterative pagination to `User`, `Group`, and `Workflow` results
2 parents 8c26eda + 4835908 commit d472f48

File tree

14 files changed

+288
-141
lines changed

14 files changed

+288
-141
lines changed

pyatlan/client/atlan.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -847,7 +847,7 @@ def get_groups(
847847
def get_all_groups(
848848
self,
849849
limit: int = 20,
850-
) -> List[AtlanGroup]:
850+
) -> GroupResponse:
851851
"""Deprecated - use group.get_all() instead."""
852852
warn(
853853
"This method is deprecated, please use 'group.get_all' instead, which offers identical functionality.",
@@ -860,7 +860,7 @@ def get_group_by_name(
860860
self,
861861
alias: str,
862862
limit: int = 20,
863-
) -> Optional[List[AtlanGroup]]:
863+
) -> Optional[GroupResponse]:
864864
"""Deprecated - use group.get_by_name() instead."""
865865
warn(
866866
"This method is deprecated, please use 'group.get_by_name' instead, which offers identical functionality.",
@@ -982,7 +982,7 @@ def get_users(
982982
def get_all_users(
983983
self,
984984
limit: int = 20,
985-
) -> List[AtlanUser]:
985+
) -> UserResponse:
986986
"""Deprecated - use user.get_all() instead."""
987987
warn(
988988
"This method is deprecated, please use 'user.get_all' instead, which offers identical functionality.",
@@ -995,7 +995,7 @@ def get_users_by_email(
995995
self,
996996
email: str,
997997
limit: int = 20,
998-
) -> Optional[List[AtlanUser]]:
998+
) -> Optional[UserResponse]:
999999
"""Deprecated - use user.get_by_email() instead."""
10001000
warn(
10011001
"This method is deprecated, please use 'user.get_by_email' instead, which offers identical functionality.",

pyatlan/client/group.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -142,45 +142,45 @@ def get_all(
142142
offset: int = 0,
143143
sort: Optional[str] = "name",
144144
columns: Optional[List[str]] = None,
145-
) -> List[AtlanGroup]:
145+
) -> GroupResponse:
146146
"""
147-
Retrieve all groups defined in Atlan.
147+
Retrieve a GroupResponse object containing a list of all groups defined in Atlan.
148148
149149
:param limit: maximum number of results to be returned
150150
:param offset: starting point for the list of groups when paging
151151
:param sort: property by which to sort the results, by default : name
152152
:param columns: provides columns projection support for groups endpoint
153-
:returns: a list of all the groups in Atlan
153+
:returns: a GroupResponse object with all groups based on the parameters; results are iterable.
154154
"""
155-
if response := self.get(offset=offset, limit=limit, sort=sort, columns=columns):
156-
return response.records # type: ignore
157-
return None # type: ignore
155+
response: GroupResponse = self.get(
156+
offset=offset, limit=limit, sort=sort, columns=columns
157+
)
158+
return response
158159

159160
@validate_arguments
160161
def get_by_name(
161162
self,
162163
alias: str,
163164
limit: int = 20,
164165
offset: int = 0,
165-
) -> Optional[List[AtlanGroup]]:
166+
) -> Optional[GroupResponse]:
166167
"""
167-
Retrieve all groups with a name that contains the provided string.
168+
Retrieves a GroupResponse object containing a list of groups that match the specified string.
168169
(This could include a complete group name, in which case there should be at most
169170
a single item in the returned list, or could be a partial group name to retrieve
170171
all groups with that naming convention.)
171172
172173
:param alias: name (as it appears in the UI) on which to filter the groups
173174
:param limit: maximum number of groups to retrieve
174175
:param offset: starting point for the list of groups when paging
175-
:returns: all groups whose name (in the UI) contains the provided string
176+
:returns: a GroupResponse object containing a list of groups whose UI names include the given string; the results are iterable.
176177
"""
177-
if response := self.get(
178+
response: GroupResponse = self.get(
178179
offset=offset,
179180
limit=limit,
180181
post_filter='{"$and":[{"alias":{"$ilike":"%' + alias + '%"}}]}',
181-
):
182-
return response.records
183-
return None
182+
)
183+
return response
184184

185185
@validate_arguments
186186
def get_members(

pyatlan/client/user.py

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@ def __init__(self, client: ApiCaller):
4848
@validate_arguments
4949
def create(
5050
self, users: List[AtlanUser], return_info: bool = False
51-
) -> Optional[List[AtlanUser]]:
51+
) -> Optional[UserResponse]:
5252
"""
5353
Create one or more new users.
5454
5555
:param users: the details of the new users
5656
:param return_info: whether to return the details of created users, defaults to `False`
5757
:raises AtlanError: on any API communication issue
58-
:returns: the list of details of created users if `return_info` is `True`, otherwise `None`
58+
:returns: a UserResponse object which contains the list of details of created users if `return_info` is `True`, otherwise `None`
5959
"""
6060
from pyatlan.client.atlan import AtlanClient
6161

@@ -201,27 +201,27 @@ def get_all(
201201
limit: int = 20,
202202
offset: int = 0,
203203
sort: Optional[str] = "username",
204-
) -> List[AtlanUser]:
204+
) -> UserResponse:
205205
"""
206-
Retrieve all users defined in Atlan.
206+
Retrieve a UserResponse object containing a list of all users defined in Atlan.
207207
208208
:param limit: maximum number of users to retrieve
209209
:param offset: starting point for the list of users when paging
210210
:param sort: property by which to sort the results, by default : `username`
211-
:returns: a list of all the users in Atlan
211+
:returns: a UserResponse object with all users based on the parameters; results are iterable.
212212
"""
213213
response: UserResponse = self.get(offset=offset, limit=limit, sort=sort)
214-
return [user for user in response]
214+
return response
215215

216216
@validate_arguments
217217
def get_by_email(
218218
self,
219219
email: str,
220220
limit: int = 20,
221221
offset: int = 0,
222-
) -> Optional[List[AtlanUser]]:
222+
) -> Optional[UserResponse]:
223223
"""
224-
Retrieves all users with email addresses that contain the provided email.
224+
Retrieves a UserResponse object containing a list of users with email addresses that contain the provided email.
225225
(This could include a complete email address, in which case there should be at
226226
most a single item in the returned list, or could be a partial email address
227227
such as "@example.com" to retrieve all users with that domain in their email
@@ -230,35 +230,35 @@ def get_by_email(
230230
:param email: on which to filter the users
231231
:param limit: maximum number of users to retrieve
232232
:param offset: starting point for the list of users when pagin
233-
:returns: all users whose email addresses contain the provided string
233+
:returns: a UserResponse object containing a list of users whose email addresses contain the provided string
234234
"""
235-
if response := self.get(
235+
response: UserResponse = self.get(
236236
offset=offset,
237237
limit=limit,
238238
post_filter='{"email":{"$ilike":"%' + email + '%"}}',
239-
):
240-
return response.records
241-
return None
239+
)
240+
return response
242241

243242
@validate_arguments
244243
def get_by_emails(
245244
self,
246245
emails: List[str],
247246
limit: int = 20,
248247
offset: int = 0,
249-
) -> Optional[List[AtlanUser]]:
248+
) -> Optional[UserResponse]:
250249
"""
251-
Retrieves all users with email addresses that match the provided list of emails.
250+
Retrieves a UserResponse object containing a list of users with email addresses that match the provided list of emails.
252251
253252
:param emails: list of email addresses to filter the users
254253
:param limit: maximum number of users to retrieve
255254
:param offset: starting point for the list of users when paginating
256-
:returns: list of users whose email addresses match the provided list
255+
:returns: a UserResponse object containing a list of users whose email addresses match the provided list
257256
"""
258257
email_filter = '{"email":{"$in":' + dumps(emails or [""]) + "}}"
259-
if response := self.get(offset=offset, limit=limit, post_filter=email_filter):
260-
return response.records
261-
return None
258+
response: UserResponse = self.get(
259+
offset=offset, limit=limit, post_filter=email_filter
260+
)
261+
return response
262262

263263
@validate_arguments
264264
def get_by_username(self, username: str) -> Optional[AtlanUser]:
@@ -281,21 +281,20 @@ def get_by_username(self, username: str) -> Optional[AtlanUser]:
281281
@validate_arguments
282282
def get_by_usernames(
283283
self, usernames: List[str], limit: int = 5, offset: int = 0
284-
) -> Optional[List[AtlanUser]]:
284+
) -> Optional[UserResponse]:
285285
"""
286-
Retrieves users based on their usernames.
286+
Retrieves a UserResponse object containing a list of users based on their usernames.
287287
288288
:param usernames: the list of usernames by which to find the users
289289
:param limit: maximum number of users to retrieve
290290
:param offset: starting point for the list of users when paginating
291-
:returns: the users with the specified usernames
291+
:returns: a UserResponse object containing list of users with the specified usernames
292292
"""
293293
username_filter = '{"username":{"$in":' + dumps(usernames or [""]) + "}}"
294-
if response := self.get(
294+
response: UserResponse = self.get(
295295
offset=offset, limit=limit, post_filter=username_filter
296-
):
297-
return response.records
298-
return None
296+
)
297+
return response
299298

300299
@validate_arguments
301300
def add_to_groups(

pyatlan/client/workflow.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,16 +167,16 @@ def find_runs_by_status_and_time_range(
167167
finished_at: Optional[str] = None,
168168
from_: int = 0,
169169
size: int = 100,
170-
) -> List[WorkflowSearchResult]:
170+
) -> WorkflowSearchResponse:
171171
"""
172-
Find workflow runs based on their status and time range.
172+
Retrieves a WorkflowSearchResponse object containing workflow runs based on their status and time range.
173173
174174
:param status: list of the workflow statuses to filter
175175
:param started_at: (optional) lower bound on 'status.startedAt' (e.g 'now-2h')
176176
:param finished_at: (optional) lower bound on 'status.finishedAt' (e.g 'now-1h')
177177
:param from_:(optional) starting index of the search results (default: `0`).
178178
:param size: (optional) maximum number of search results to return (default: `100`).
179-
:returns: list of workflows matching the filters
179+
:returns: a WorkflowSearchResponse object containing a list of workflows matching the filters
180180
:raises ValidationError: if inputs are invalid
181181
:raises AtlanError: on any API communication issue
182182
"""
@@ -205,7 +205,7 @@ def find_runs_by_status_and_time_range(
205205
run_lookup_results = self._find_runs(
206206
query=run_lookup_query, from_=from_, size=size
207207
)
208-
return run_lookup_results.hits and run_lookup_results.hits.hits or []
208+
return run_lookup_results
209209

210210
@validate_arguments
211211
def _find_latest_run(self, workflow_name: str) -> Optional[WorkflowSearchResult]:
@@ -280,7 +280,16 @@ def _find_runs(
280280
WORKFLOW_INDEX_RUN_SEARCH,
281281
request_obj=request,
282282
)
283-
return WorkflowSearchResponse(**raw_json)
283+
return WorkflowSearchResponse(
284+
client=self._client,
285+
endpoint=WORKFLOW_INDEX_RUN_SEARCH,
286+
criteria=query,
287+
start=request.from_,
288+
size=request.size,
289+
took=raw_json.get("took"),
290+
hits=raw_json.get("hits"),
291+
shards=raw_json.get("_shards"),
292+
)
284293

285294
def _add_schedule(
286295
self,
@@ -517,7 +526,7 @@ def get_runs(
517526
workflow_phase: AtlanWorkflowPhase,
518527
from_: int = 0,
519528
size: int = 100,
520-
) -> Optional[List[WorkflowSearchResult]]:
529+
) -> Optional[WorkflowSearchResponse]:
521530
"""
522531
Retrieves all workflow runs.
523532
@@ -542,7 +551,7 @@ def get_runs(
542551
filter=[Term(field="status.phase.keyword", value=workflow_phase.value)],
543552
)
544553
response = self._find_runs(query, from_=from_, size=size)
545-
return results if (results := response.hits and response.hits.hits) else None
554+
return response
546555

547556
@validate_arguments
548557
def stop(

pyatlan/model/user.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ def _get_next_page(self):
255255
self._criteria.offset = self._start
256256
self._criteria.limit = self._size
257257
raw_json = self._client._call_api(
258-
api=self._endpoint,
258+
api=self._endpoint.format_path_with_params(),
259259
query_params=self._criteria.query_params,
260260
)
261261
if not raw_json.get("records"):

pyatlan/model/workflow.py

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
# SPDX-License-Identifier: Apache-2.0
22
# Copyright 2022 Atlan Pte. Ltd.
3-
from typing import Any, Dict, List, Optional
3+
from typing import Any, Dict, Generator, List, Optional
44

5-
from pydantic.v1 import Field
5+
from pydantic.v1 import Field, PrivateAttr, ValidationError, parse_obj_as
66

7+
from pyatlan.client.common import ApiCaller
8+
from pyatlan.errors import ErrorCode
79
from pyatlan.model.core import AtlanObject
810
from pyatlan.model.enums import AtlanWorkflowPhase, SortOrder
911
from pyatlan.model.search import Query, SortItem
12+
from pyatlan.utils import API
1013

1114

1215
class PackageParameter(AtlanObject):
@@ -132,12 +135,6 @@ class WorkflowSearchHits(AtlanObject):
132135
hits: Optional[List[WorkflowSearchResult]] = Field(default=None)
133136

134137

135-
class WorkflowSearchResponse(AtlanObject):
136-
took: Optional[int] = Field(default=None)
137-
hits: Optional[WorkflowSearchHits] = Field(default=None)
138-
shards: Optional[Dict[str, Any]] = Field(alias="_shards", default=None)
139-
140-
141138
class ReRunRequest(AtlanObject):
142139
namespace: Optional[str] = Field(default="default")
143140
resource_kind: Optional[str] = Field(default="WorkflowTemplate")
@@ -207,6 +204,7 @@ class WorkflowSearchRequest(AtlanObject):
207204
)
208205
],
209206
)
207+
source: Optional[WorkflowSearchResultDetail] = Field(default=None, alias="_source")
210208

211209
class Config:
212210
json_encoders = {Query: lambda v: v.to_dict(), SortItem: lambda v: v.to_dict()}
@@ -216,3 +214,62 @@ def __init__(__pydantic_self__, **data: Any) -> None:
216214
__pydantic_self__.__fields_set__.update(
217215
["from_", "size", "track_total_hits", "sort"]
218216
)
217+
218+
219+
class WorkflowSearchResponse(AtlanObject):
220+
_size: int = PrivateAttr()
221+
_start: int = PrivateAttr()
222+
_endpoint: API = PrivateAttr()
223+
_client: ApiCaller = PrivateAttr()
224+
_criteria: WorkflowSearchRequest = PrivateAttr()
225+
took: Optional[int] = Field(default=None)
226+
hits: Optional[WorkflowSearchHits] = Field(default=None)
227+
shards: Optional[Dict[str, Any]] = Field(alias="_shards", default=None)
228+
229+
def __init__(self, **data: Any):
230+
super().__init__(**data)
231+
self._endpoint = data.get("endpoint") # type: ignore[assignment]
232+
self._client = data.get("client") # type: ignore[assignment]
233+
self._criteria = data.get("criteria") # type: ignore[assignment]
234+
self._size = data.get("size") # type: ignore[assignment]
235+
self._start = data.get("start") # type: ignore[assignment]
236+
237+
@property
238+
def count(self):
239+
return self.hits.total.get("value", 0) if self.hits and self.hits.total else 0
240+
241+
def current_page(self) -> Optional[List[WorkflowSearchResult]]:
242+
return self.hits.hits # type: ignore
243+
244+
def next_page(self, start=None, size=None) -> bool:
245+
self._start = start or self._start + self._size
246+
if size:
247+
self._size = size
248+
return self._get_next_page() if self.hits.hits else False # type: ignore
249+
250+
def _get_next_page(self):
251+
request = WorkflowSearchRequest(
252+
query=self._criteria, from_=self._start, size=self._size
253+
)
254+
raw_json = self._client._call_api(
255+
api=self._endpoint,
256+
request_obj=request,
257+
)
258+
if not raw_json.get("hits", {}).get("hits"):
259+
self.hits.hits = []
260+
return False
261+
try:
262+
self.hits.hits = parse_obj_as(
263+
List[WorkflowSearchResult], raw_json["hits"]["hits"]
264+
)
265+
except ValidationError as err:
266+
raise ErrorCode.JSON_ERROR.exception_with_parameters(
267+
raw_json, 200, str(err)
268+
) from err
269+
return True
270+
271+
def __iter__(self) -> Generator[WorkflowSearchResult, None, None]: # type: ignore[override]
272+
while True:
273+
yield from self.current_page() or []
274+
if not self.next_page():
275+
break

0 commit comments

Comments
 (0)