Skip to content

Commit d1401c5

Browse files
committed
Added new method find_by_status_and_interval() along with it's unit and integration tests
1 parent c784abf commit d1401c5

File tree

3 files changed

+75
-1
lines changed

3 files changed

+75
-1
lines changed

pyatlan/client/workflow.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,17 @@
2424
)
2525
from pyatlan.errors import ErrorCode
2626
from pyatlan.model.enums import AtlanWorkflowPhase, WorkflowPackage
27-
from pyatlan.model.search import Bool, NestedQuery, Prefix, Query, Regexp, Term
27+
from pyatlan.model.search import (
28+
Bool,
29+
Exists,
30+
NestedQuery,
31+
Prefix,
32+
Query,
33+
Range,
34+
Regexp,
35+
Term,
36+
Terms,
37+
)
2838
from pyatlan.model.workflow import (
2939
ReRunRequest,
3040
ScheduleQueriesSearchRequest,
@@ -149,6 +159,43 @@ def find_run_by_id(self, id: str) -> Optional[WorkflowSearchResult]:
149159
response = self._find_runs(query, size=1)
150160
return results[0] if (results := response.hits and response.hits.hits) else None
151161

162+
@validate_arguments
163+
def find_by_status_and_interval(
164+
self, status: List[AtlanWorkflowPhase], interval: int
165+
) -> List[WorkflowSearchResult]:
166+
"""
167+
Find workflows based on their status and interval
168+
169+
:status: list of the status of the workflows to filter with
170+
:interval: time interval in hours to search for workflows
171+
:returns: the list of workflows of the provided type, with the most-recently created first
172+
:raises ValidationError: If the provided status is an invalid AtlanWorkflowPhase
173+
:raises AtlanError: on any API communication issue
174+
175+
"""
176+
177+
run_lookup_query = Bool(
178+
must=[
179+
NestedQuery(
180+
query=Terms(
181+
field="metadata.labels.workflows.argoproj.io/phase.keyword",
182+
values=[state.value for state in status],
183+
),
184+
path="metadata",
185+
),
186+
Range(field="status.finishedAt", gt=f"now-{interval}h"),
187+
NestedQuery(
188+
query=Exists(field="metadata.labels.workflows.argoproj.io/creator"),
189+
path="metadata",
190+
),
191+
],
192+
must_not=[NestedQuery(query=Exists(field="spec.shutdown"), path="spec")],
193+
)
194+
195+
run_lookup_results = self._find_runs(run_lookup_query)
196+
197+
return run_lookup_results.hits and run_lookup_results.hits.hits or []
198+
152199
@validate_arguments
153200
def _find_latest_run(self, workflow_name: str) -> Optional[WorkflowSearchResult]:
154201
"""

tests/integration/test_workflow_client.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,19 @@ def test_workflow_get_runs_and_stop(client: AtlanClient, workflow: WorkflowRespo
162162
workflow_status = client.workflow.monitor(workflow_name=workflow_name)
163163
assert workflow_status == AtlanWorkflowPhase.FAILED
164164

165+
# Test find run by status and interval
166+
runs_status = client.workflow.find_by_status_and_interval(
167+
[AtlanWorkflowPhase.FAILED], 1
168+
)
169+
assert runs_status
170+
workflow_run_status = runs_status[0]
171+
assert (
172+
workflow_run_status
173+
and workflow_run_status.source
174+
and workflow_run_status.source.status
175+
and workflow_run_status.source.status.phase == AtlanWorkflowPhase.FAILED
176+
)
177+
165178
# Test find run by id
166179
workflow_run = client.workflow.find_run_by_id(id=run.id)
167180
assert (

tests/unit/test_workflow_client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,20 @@ def test_find_by_type(client: WorkflowClient, mock_api_caller):
272272
)
273273

274274

275+
def test_find_by_status_and_interval(client: WorkflowClient, mock_api_caller):
276+
raw_json = {"shards": {"dummy": None}, "hits": {"total": {"dummy": None}}}
277+
mock_api_caller._call_api.return_value = raw_json
278+
279+
status = [AtlanWorkflowPhase.SUCCESS, AtlanWorkflowPhase.FAILED]
280+
interval = 50
281+
282+
assert client.find_by_status_and_interval(status, interval) == []
283+
mock_api_caller._call_api.assert_called_once()
284+
assert isinstance(
285+
mock_api_caller._call_api.call_args.kwargs["request_obj"], WorkflowSearchRequest
286+
)
287+
288+
275289
def test_find_by_id(
276290
client: WorkflowClient, search_response: WorkflowSearchResponse, mock_api_caller
277291
):

0 commit comments

Comments
 (0)