Skip to content

Commit eca010e

Browse files
committed
Major changes to the method
1 parent b23c3f2 commit eca010e

File tree

3 files changed

+35
-17
lines changed

3 files changed

+35
-17
lines changed

pyatlan/client/workflow.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -160,35 +160,44 @@ def find_run_by_id(self, id: str) -> Optional[WorkflowSearchResult]:
160160
return results[0] if (results := response.hits and response.hits.hits) else None
161161

162162
@validate_arguments
163-
def find_by_status_and_interval(
164-
self, status: List[AtlanWorkflowPhase], interval: int
163+
def find_runs_by_status_and_time_range(
164+
self,
165+
status: List[AtlanWorkflowPhase],
166+
started_at: Optional[str] = None,
167+
finished_at: Optional[str] = None,
165168
) -> List[WorkflowSearchResult]:
166169
"""
167-
Find workflows based on their status and interval
170+
Find workflows by status and optional time filters on startedAt and/or finishedAt.
168171
169-
:param status: list of the status of the workflows to filter with
170-
:param interval: time interval in hours to search for workflows
171-
:returns: the list of workflows of the provided type, with the most-recently created first
172-
:raises ValidationError: If the provided status is an invalid AtlanWorkflowPhase
172+
:param status: list of the workflow statuses to filter
173+
:param started_at: (optional) lower bound on 'status.startedAt' (e.g 'now-2h')
174+
:param finished_at: (optional) lower bound on 'status.finishedAt' (e.g 'now-1h')
175+
:returns: list of workflows matching the filters
176+
:raises ValidationError: if inputs are invalid
173177
:raises AtlanError: on any API communication issue
174178
"""
179+
time_filters = []
180+
181+
if started_at:
182+
time_filters.append(Range(field="status.startedAt", gte=started_at))
183+
if finished_at:
184+
time_filters.append(Range(field="status.finishedAt", gte=finished_at))
175185

176186
run_lookup_query = Bool(
177187
must=[
178188
NestedQuery(
179189
query=Terms(
180190
field="metadata.labels.workflows.argoproj.io/phase.keyword",
181-
values=[state.value for state in status],
191+
values=[s.value for s in status],
182192
),
183193
path="metadata",
184194
),
185-
Range(field="status.finishedAt", gt=f"now-{interval}h"),
195+
*time_filters,
186196
NestedQuery(
187197
query=Exists(field="metadata.labels.workflows.argoproj.io/creator"),
188198
path="metadata",
189199
),
190200
],
191-
must_not=[NestedQuery(query=Exists(field="spec.shutdown"), path="spec")],
192201
)
193202

194203
run_lookup_results = self._find_runs(run_lookup_query)

tests/integration/test_workflow_client.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-License-Identifier: Apache-2.0
22
# Copyright 2024 Atlan Pte. Ltd.
33
import time
4+
from datetime import datetime, timedelta, timezone
45
from typing import Generator
56

67
import pytest
@@ -171,18 +172,23 @@ def test_workflow_get_runs_and_stop(client: AtlanClient, workflow: WorkflowRespo
171172
and workflow_run.source.status.phase == AtlanWorkflowPhase.FAILED
172173
)
173174

174-
175-
def test_workflow_find_by_status_and_interval(client: AtlanClient):
176-
runs_status = client.workflow.find_by_status_and_interval(
177-
[AtlanWorkflowPhase.FAILED], 1
175+
# Test find run by status and time range
176+
runs_status = client.workflow.find_runs_by_status_and_time_range(
177+
[AtlanWorkflowPhase.FAILED], started_at="now-1h"
178178
)
179179
assert runs_status
180180
workflow_run_status = runs_status[0]
181+
start_time = workflow_run_status.source.status.startedAt # type: ignore
182+
start_datetime = datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ") # type: ignore
183+
start_datetime = start_datetime.replace(tzinfo=timezone.utc)
184+
current_time = datetime.now(timezone.utc)
185+
time_diff = current_time - start_datetime
181186
assert (
182187
workflow_run_status
183188
and workflow_run_status.source
184189
and workflow_run_status.source.status
185190
and workflow_run_status.source.status.phase == AtlanWorkflowPhase.FAILED
191+
and time_diff < timedelta(hours=1)
186192
)
187193

188194

tests/unit/test_workflow_client.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -272,14 +272,17 @@ def test_find_by_type(client: WorkflowClient, mock_api_caller):
272272
)
273273

274274

275-
def test_find_by_status_and_interval(client: WorkflowClient, mock_api_caller):
275+
def test_find_runs_by_status_and_time_range(client: WorkflowClient, mock_api_caller):
276276
raw_json = {"shards": {"dummy": None}, "hits": {"total": {"dummy": None}}}
277277
mock_api_caller._call_api.return_value = raw_json
278278

279279
status = [AtlanWorkflowPhase.SUCCESS, AtlanWorkflowPhase.FAILED]
280-
interval = 50
280+
started_at = "now-2h"
281+
finished_at = "now-1h"
281282

282-
assert client.find_by_status_and_interval(status, interval) == []
283+
assert (
284+
client.find_runs_by_status_and_time_range(status, started_at, finished_at) == []
285+
)
283286
mock_api_caller._call_api.assert_called_once()
284287
assert isinstance(
285288
mock_api_caller._call_api.call_args.kwargs["request_obj"], WorkflowSearchRequest

0 commit comments

Comments
 (0)