Skip to content

Commit 89faa8b

Browse files
vaibhavatlanAryamanz29
authored andcommitted
Made major changes to workflow iterative pagination
1 parent 0a91791 commit 89faa8b

File tree

3 files changed

+30
-30
lines changed

3 files changed

+30
-30
lines changed

pyatlan/client/workflow.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ def find_runs_by_status_and_time_range(
167167
finished_at: Optional[str] = None,
168168
from_: int = 0,
169169
size: int = 100,
170-
) -> List[WorkflowSearchResult]:
170+
) -> WorkflowSearchResponse:
171171
"""
172172
Find workflow runs based on their status and time range.
173173
@@ -205,7 +205,7 @@ def find_runs_by_status_and_time_range(
205205
run_lookup_results = self._find_runs(
206206
query=run_lookup_query, from_=from_, size=size
207207
)
208-
return run_lookup_results.hits and run_lookup_results.hits.hits or []
208+
return run_lookup_results
209209

210210
@validate_arguments
211211
def _find_latest_run(self, workflow_name: str) -> Optional[WorkflowSearchResult]:
@@ -280,7 +280,16 @@ def _find_runs(
280280
WORKFLOW_INDEX_RUN_SEARCH,
281281
request_obj=request,
282282
)
283-
return WorkflowSearchResponse(**raw_json)
283+
return WorkflowSearchResponse(
284+
client=self._client,
285+
endpoint=WORKFLOW_INDEX_RUN_SEARCH,
286+
criteria=query,
287+
start=request.from_,
288+
size=request.size,
289+
took=raw_json.get("took"),
290+
hits=raw_json.get("hits"),
291+
shards=raw_json.get("_shards"),
292+
)
284293

285294
def _add_schedule(
286295
self,
@@ -517,7 +526,7 @@ def get_runs(
517526
workflow_phase: AtlanWorkflowPhase,
518527
from_: int = 0,
519528
size: int = 100,
520-
) -> Optional[List[WorkflowSearchResult]]:
529+
) -> Optional[WorkflowSearchResponse]:
521530
"""
522531
Retrieves all workflow runs.
523532
@@ -542,7 +551,7 @@ def get_runs(
542551
filter=[Term(field="status.phase.keyword", value=workflow_phase.value)],
543552
)
544553
response = self._find_runs(query, from_=from_, size=size)
545-
return results if (results := response.hits and response.hits.hits) else None
554+
return response
546555

547556
@validate_arguments
548557
def stop(

pyatlan/model/workflow.py

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -215,17 +215,6 @@ def __init__(__pydantic_self__, **data: Any) -> None:
215215
["from_", "size", "track_total_hits", "sort"]
216216
)
217217

218-
@property
219-
def query_params(self) -> dict:
220-
qp: Dict[str, object] = {}
221-
if self.post_filter:
222-
qp["filter"] = self.post_filter
223-
if self.sort:
224-
qp["sort"] = self.sort
225-
qp["from_"] = self.from_
226-
qp["size"] = self.size
227-
return qp
228-
229218

230219
class WorkflowSearchResponse(AtlanObject):
231220
_size: int = PrivateAttr()
@@ -243,30 +232,31 @@ def __init__(self, **data: Any):
243232
self._client = data.get("client") # type: ignore[assignment]
244233
self._criteria = data.get("criteria") # type: ignore[assignment]
245234
self._size = data.get("size") # type: ignore[assignment]
246-
self._start = data.get("from") # type: ignore[assignment]
235+
self._start = data.get("start") # type: ignore[assignment]
247236

248237
def current_page(self) -> Optional[List[WorkflowSearchResult]]:
249-
return self.records
238+
return self.hits.hits # type: ignore
250239

251240
def next_page(self, start=None, size=None) -> bool:
252241
self._start = start or self._start + self._size
253242
if size:
254243
self._size = size
255-
return self._get_next_page() if self.records else False
244+
return self._get_next_page() if self.hits.hits else False # type: ignore
256245

257246
def _get_next_page(self):
258-
self._criteria.from_ = self._start
259-
self._criteria.size = self._size
247+
request = WorkflowSearchRequest(
248+
query=self._criteria, from_=self._start, size=self._size
249+
)
260250
raw_json = self._client._call_api(
261-
api=self._endpoint.format_path_with_params(),
262-
query_params=self._criteria.query_params,
251+
api=self._endpoint,
252+
request_obj=request,
263253
)
264-
if not raw_json.get("records"):
265-
self.records = []
254+
if not raw_json.get("hits", {}).get("hits"):
255+
self.hits.hits = []
266256
return False
267257
try:
268-
self.records = parse_obj_as(
269-
List[WorkflowSearchResult], raw_json.get("records")
258+
self.hits.hits = parse_obj_as(
259+
List[WorkflowSearchResult], raw_json["hits"]["hits"]
270260
)
271261
except ValidationError as err:
272262
raise ErrorCode.JSON_ERROR.exception_with_parameters(

tests/integration/test_workflow_client.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,9 @@ def test_workflow_get_runs_and_stop(client: AtlanClient, workflow: WorkflowRespo
135135
workflow_name=workflow.metadata.name, workflow_phase=AtlanWorkflowPhase.RUNNING
136136
)
137137
assert runs
138-
assert len(runs) == 1
139-
run = runs[0]
138+
total_runs = runs.hits.hits # type: ignore
139+
assert len(total_runs) == 1 # type: ignore
140+
run = total_runs[0] # type: ignore
140141
assert run and run.id
141142
assert workflow.metadata.name and (workflow.metadata.name in run.id)
142143

@@ -177,7 +178,7 @@ def test_workflow_get_runs_and_stop(client: AtlanClient, workflow: WorkflowRespo
177178
[AtlanWorkflowPhase.FAILED], started_at="now-1h"
178179
)
179180
assert runs_status
180-
workflow_run_status = runs_status[0]
181+
workflow_run_status = runs_status.hits.hits[0] # type: ignore
181182
start_time = workflow_run_status.source.status.startedAt # type: ignore
182183
start_datetime = datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ") # type: ignore
183184
start_datetime = start_datetime.replace(tzinfo=timezone.utc)

0 commit comments

Comments
 (0)