Skip to content

Commit f38f089

Browse files
[flyteagent] FileSensor Timeout (Remote Execution Only) (#2745)
* Add timeout for task Signed-off-by: Future-Outlier <eric901201@gmail.com> * new interface Signed-off-by: Future-Outlier <eric901201@gmail.com> * upload pingsu discussion Signed-off-by: Future-Outlier <eric901201@gmail.com> * Sensor TimeOut Signed-off-by: Future-Outlier <eric901201@gmail.com> * update Signed-off-by: Future-Outlier <eric901201@gmail.com> * update Signed-off-by: Future-Outlier <eric901201@gmail.com> --------- Signed-off-by: Future-Outlier <eric901201@gmail.com>
1 parent 2f667e9 commit f38f089

File tree

4 files changed

+70
-5
lines changed

4 files changed

+70
-5
lines changed

flytekit/extend/backend/base_agent.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from rich.logging import RichHandler
2222
from rich.progress import Progress
2323

24-
from flytekit import FlyteContext, PythonFunctionTask
24+
from flytekit import FlyteContext, PythonFunctionTask, logger
2525
from flytekit.configuration import ImageConfig, SerializationSettings
2626
from flytekit.core import utils
2727
from flytekit.core.base_task import PythonTask
@@ -285,6 +285,8 @@ def execute(self: PythonTask, **kwargs) -> LiteralMap:
285285
ctx = FlyteContext.current_context()
286286
ss = ctx.serialization_settings or SerializationSettings(ImageConfig())
287287
task_template = get_serializable(OrderedDict(), ss, self).template
288+
if task_template.metadata.timeout:
289+
logger.info("Timeout is not supported for local execution.\n" "Ignoring the timeout.")
288290
output_prefix = ctx.file_access.get_random_remote_directory()
289291

290292
agent = AgentRegistry.get_agent(task_template.type, task_template.task_type_version)
@@ -336,6 +338,8 @@ def execute(self: PythonTask, **kwargs) -> LiteralMap:
336338
from flytekit.tools.translator import get_serializable
337339

338340
task_template = get_serializable(OrderedDict(), ss, self).template
341+
if task_template.metadata.timeout:
342+
logger.info("Timeout is not supported for local execution.\n" "Ignoring the timeout.")
339343
self._agent = AgentRegistry.get_agent(task_template.type, task_template.task_type_version)
340344

341345
resource_meta = local_agent_loop.run_until_complete(

flytekit/sensor/base_sensor.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import collections
2+
import datetime
23
import inspect
34
import typing
45
from abc import abstractmethod
56
from dataclasses import asdict, dataclass
6-
from typing import Any, Dict, Optional, TypeVar
7+
from typing import Any, Dict, Optional, TypeVar, Union
78

89
from typing_extensions import Protocol, get_type_hints, runtime_checkable
910

1011
from flytekit.configuration import SerializationSettings
11-
from flytekit.core.base_task import PythonTask
12+
from flytekit.core.base_task import PythonTask, TaskMetadata
1213
from flytekit.core.interface import Interface
1314
from flytekit.extend.backend.base_agent import AsyncAgentExecutorMixin, ResourceMeta
1415

@@ -50,6 +51,7 @@ class BaseSensor(AsyncAgentExecutorMixin, PythonTask):
5051
def __init__(
5152
self,
5253
name: str,
54+
timeout: Optional[Union[datetime.timedelta, int]] = None,
5355
sensor_config: Optional[T] = None,
5456
task_type: str = "sensor",
5557
**kwargs,
@@ -61,11 +63,17 @@ def __init__(
6163
annotation = type_hints.get(k, None)
6264
inputs[k] = annotation
6365

66+
if kwargs.get("metadata", None) and timeout:
67+
raise ValueError("You cannot set timeout and metadata at the same time in the sensor")
68+
69+
metadata = TaskMetadata(timeout=timeout)
70+
6471
super().__init__(
6572
task_type=task_type,
6673
name=name,
6774
task_config=None,
6875
interface=Interface(inputs=inputs),
76+
metadata=metadata,
6977
**kwargs,
7078
)
7179
self._sensor_config = sensor_config

flytekit/sensor/file_sensor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
import datetime
2+
from typing import Optional, Union
3+
14
from flytekit import FlyteContextManager
25
from flytekit.sensor.base_sensor import BaseSensor
36

47

58
class FileSensor(BaseSensor):
6-
def __init__(self, name: str, **kwargs):
7-
super().__init__(name=name, **kwargs)
9+
def __init__(self, name: str, timeout: Optional[Union[datetime.timedelta, int]] = None, **kwargs):
10+
super().__init__(name=name, timeout=timeout, **kwargs)
811

912
async def poke(self, path: str) -> bool:
1013
file_access = FlyteContextManager.current_context().file_access

tests/flytekit/unit/sensor/test_file_sensor.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
import datetime
12
import tempfile
23

4+
import pytest
5+
36
from flytekit import task, workflow
47
from flytekit.configuration import ImageConfig, SerializationSettings
8+
from flytekit.core.task import TaskMetadata
9+
from flytekit.sensor.base_sensor import BaseSensor
510
from flytekit.sensor.file_sensor import FileSensor
611
from tests.flytekit.unit.test_translator import default_img
712

@@ -34,3 +39,48 @@ def wf():
3439

3540
if __name__ == "__main__":
3641
wf()
42+
43+
44+
def test_base_sensor_timeout_initialization():
45+
# Test with no timeout
46+
sensor = BaseSensor(name="test_sensor")
47+
assert sensor.metadata.timeout is None
48+
49+
# Test with integer timeout (converted to timedelta internally)
50+
sensor = BaseSensor(name="test_sensor", timeout=60)
51+
assert sensor.metadata.timeout == datetime.timedelta(seconds=60)
52+
53+
# Test with timedelta timeout
54+
timeout = datetime.timedelta(minutes=5)
55+
sensor = BaseSensor(name="test_sensor", timeout=timeout)
56+
assert sensor.metadata.timeout == timeout
57+
58+
59+
def test_file_sensor_timeout_initialization():
60+
# Test with no timeout
61+
sensor = FileSensor(name="test_file_sensor")
62+
assert sensor.metadata.timeout is None
63+
64+
# Test with integer timeout (converted to timedelta internally)
65+
sensor = FileSensor(name="test_file_sensor", timeout=60)
66+
assert sensor.metadata.timeout == datetime.timedelta(seconds=60)
67+
68+
# Test with timedelta timeout
69+
timeout = datetime.timedelta(minutes=5)
70+
sensor = FileSensor(name="test_file_sensor", timeout=timeout)
71+
assert sensor.metadata.timeout == timeout
72+
73+
74+
def test_agent_executor_timeout_logging():
75+
# Create a sensor with timeout
76+
sensor = BaseSensor(name="test_sensor", timeout=60)
77+
78+
# Verify the timeout is set correctly (converted to timedelta internally)
79+
assert sensor.metadata.timeout == datetime.timedelta(seconds=60)
80+
81+
def test_file_sensor_set_timeout_and_metadata_at_the_same_time():
82+
timeout = datetime.timedelta(seconds=60)
83+
metadata = TaskMetadata(timeout=timeout)
84+
85+
with pytest.raises(ValueError, match="You cannot set timeout and metadata at the same time in the sensor"):
86+
FileSensor(name="test_sensor", timeout=60, metadata=metadata)

0 commit comments

Comments
 (0)