Skip to content

Commit 732b23b

Browse files
authored
Merge pull request #604 from atlanhq/APP-5600
APP-5600 : Enhance Python SDK to support workflow-run search by status and interval
2 parents 9bfae6d + eca010e commit 732b23b

File tree

3 files changed

+93
-1
lines changed

3 files changed

+93
-1
lines changed

pyatlan/client/workflow.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,17 @@
2424
)
2525
from pyatlan.errors import ErrorCode
2626
from pyatlan.model.enums import AtlanWorkflowPhase, WorkflowPackage
27-
from pyatlan.model.search import Bool, NestedQuery, Prefix, Query, Regexp, Term
27+
from pyatlan.model.search import (
28+
Bool,
29+
Exists,
30+
NestedQuery,
31+
Prefix,
32+
Query,
33+
Range,
34+
Regexp,
35+
Term,
36+
Terms,
37+
)
2838
from pyatlan.model.workflow import (
2939
ReRunRequest,
3040
ScheduleQueriesSearchRequest,
@@ -149,6 +159,51 @@ def find_run_by_id(self, id: str) -> Optional[WorkflowSearchResult]:
149159
response = self._find_runs(query, size=1)
150160
return results[0] if (results := response.hits and response.hits.hits) else None
151161

162+
@validate_arguments
163+
def find_runs_by_status_and_time_range(
164+
self,
165+
status: List[AtlanWorkflowPhase],
166+
started_at: Optional[str] = None,
167+
finished_at: Optional[str] = None,
168+
) -> List[WorkflowSearchResult]:
169+
"""
170+
Find workflows by status and optional time filters on startedAt and/or finishedAt.
171+
172+
:param status: list of the workflow statuses to filter
173+
:param started_at: (optional) lower bound on 'status.startedAt' (e.g 'now-2h')
174+
:param finished_at: (optional) lower bound on 'status.finishedAt' (e.g 'now-1h')
175+
:returns: list of workflows matching the filters
176+
:raises ValidationError: if inputs are invalid
177+
:raises AtlanError: on any API communication issue
178+
"""
179+
time_filters = []
180+
181+
if started_at:
182+
time_filters.append(Range(field="status.startedAt", gte=started_at))
183+
if finished_at:
184+
time_filters.append(Range(field="status.finishedAt", gte=finished_at))
185+
186+
run_lookup_query = Bool(
187+
must=[
188+
NestedQuery(
189+
query=Terms(
190+
field="metadata.labels.workflows.argoproj.io/phase.keyword",
191+
values=[s.value for s in status],
192+
),
193+
path="metadata",
194+
),
195+
*time_filters,
196+
NestedQuery(
197+
query=Exists(field="metadata.labels.workflows.argoproj.io/creator"),
198+
path="metadata",
199+
),
200+
],
201+
)
202+
203+
run_lookup_results = self._find_runs(run_lookup_query)
204+
205+
return run_lookup_results.hits and run_lookup_results.hits.hits or []
206+
152207
@validate_arguments
153208
def _find_latest_run(self, workflow_name: str) -> Optional[WorkflowSearchResult]:
154209
"""

tests/integration/test_workflow_client.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-License-Identifier: Apache-2.0
22
# Copyright 2024 Atlan Pte. Ltd.
33
import time
4+
from datetime import datetime, timedelta, timezone
45
from typing import Generator
56

67
import pytest
@@ -171,6 +172,25 @@ def test_workflow_get_runs_and_stop(client: AtlanClient, workflow: WorkflowRespo
171172
and workflow_run.source.status.phase == AtlanWorkflowPhase.FAILED
172173
)
173174

175+
# Test find run by status and time range
176+
runs_status = client.workflow.find_runs_by_status_and_time_range(
177+
[AtlanWorkflowPhase.FAILED], started_at="now-1h"
178+
)
179+
assert runs_status
180+
workflow_run_status = runs_status[0]
181+
start_time = workflow_run_status.source.status.startedAt # type: ignore
182+
start_datetime = datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ") # type: ignore
183+
start_datetime = start_datetime.replace(tzinfo=timezone.utc)
184+
current_time = datetime.now(timezone.utc)
185+
time_diff = current_time - start_datetime
186+
assert (
187+
workflow_run_status
188+
and workflow_run_status.source
189+
and workflow_run_status.source.status
190+
and workflow_run_status.source.status.phase == AtlanWorkflowPhase.FAILED
191+
and time_diff < timedelta(hours=1)
192+
)
193+
174194

175195
def test_workflow_get_all_scheduled_runs(
176196
client: AtlanClient, workflow: WorkflowResponse

tests/unit/test_workflow_client.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,23 @@ def test_find_by_type(client: WorkflowClient, mock_api_caller):
272272
)
273273

274274

275+
def test_find_runs_by_status_and_time_range(client: WorkflowClient, mock_api_caller):
276+
raw_json = {"shards": {"dummy": None}, "hits": {"total": {"dummy": None}}}
277+
mock_api_caller._call_api.return_value = raw_json
278+
279+
status = [AtlanWorkflowPhase.SUCCESS, AtlanWorkflowPhase.FAILED]
280+
started_at = "now-2h"
281+
finished_at = "now-1h"
282+
283+
assert (
284+
client.find_runs_by_status_and_time_range(status, started_at, finished_at) == []
285+
)
286+
mock_api_caller._call_api.assert_called_once()
287+
assert isinstance(
288+
mock_api_caller._call_api.call_args.kwargs["request_obj"], WorkflowSearchRequest
289+
)
290+
291+
275292
def test_find_by_id(
276293
client: WorkflowClient, search_response: WorkflowSearchResponse, mock_api_caller
277294
):

0 commit comments

Comments
 (0)