Skip to content

Commit bd93ef7

Browse files
author
Andrei Neagu
committed
added required fields
1 parent 700426b commit bd93ef7

File tree

3 files changed

+49
-7
lines changed

3 files changed

+49
-7
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from abc import ABC, abstractmethod
2+
from datetime import timedelta
23
from typing import Annotated, ClassVar, Final, TypeAlias, TypedDict
34

45
from fastapi import FastAPI
56
from pydantic import Field, NonNegativeInt, TypeAdapter, validate_call
7+
from servicelib.deferred_tasks import DeferredContext
68

79
from ._errors import (
810
OperationAlreadyRegisteredError,
@@ -17,13 +19,31 @@ class BaseStep(ABC):
1719
def get_step_name(cls) -> StepName:
1820
return cls.__name__
1921

22+
### CREATE
23+
2024
@classmethod
2125
@abstractmethod
2226
async def create(cls, app: FastAPI) -> None:
2327
"""
2428
[mandatory] handler to be implemented with the code resposible for achieving a goal
29+
NOTE: Ensure this is successful if:
30+
- `create` is called multiple times and does not cause duplicate resources
2531
"""
2632

33+
@classmethod
34+
async def get_create_retries(cls, context: DeferredContext) -> int:
35+
"""[optional] amount of retires in case of creation"""
36+
assert context # nosec
37+
return 0
38+
39+
@classmethod
40+
async def get_create_timeout(cls, context: DeferredContext) -> timedelta:
41+
"""[optional] timeout between retires case of creation"""
42+
assert context # nosec
43+
return timedelta(seconds=0)
44+
45+
### DESTROY
46+
2747
@classmethod
2848
async def destroy(cls, app: FastAPI) -> None:
2949
"""
@@ -35,6 +55,18 @@ async def destroy(cls, app: FastAPI) -> None:
3555
"""
3656
_ = app
3757

58+
@classmethod
59+
async def get_destroy_retries(cls, context: DeferredContext) -> int:
60+
"""[optional] amount of retires in case of failure"""
61+
assert context # nosec
62+
return 0
63+
64+
@classmethod
65+
async def get_destroy_timeout(cls, context: DeferredContext) -> timedelta:
66+
"""[optional] timeout between retires in case of failure"""
67+
assert context # nosec
68+
return timedelta(seconds=0)
69+
3870

3971
StepsSubGroup: TypeAlias = Annotated[tuple[type[BaseStep], ...], Field(min_length=1)]
4072

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_store.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,10 @@ async def delete(self, *keys: _DeleteScheduleDataKeys) -> None:
175175
class _StepDict(TypedDict):
176176
status: NotRequired[StepStatus]
177177
deferred_task_uid: NotRequired[TaskUID]
178+
error_traceback: NotRequired[str]
178179

179180

180-
_DeleteStepKeys = Literal["status", "deferred_task_uid"]
181+
_DeleteStepKeys = Literal["status", "deferred_task_uid", "error_traceback"]
181182

182183

183184
class StepStoreProxy:
@@ -211,6 +212,8 @@ def _get_hash_key(self) -> str:
211212
async def get(self, key: Literal["status"]) -> StepStatus: ...
212213
@overload
213214
async def get(self, key: Literal["deferred_task_uid"]) -> TaskUID: ...
215+
@overload
216+
async def get(self, key: Literal["error_traceback"]) -> str: ...
214217
async def get(self, key: str) -> Any:
215218
"""raises KeyNotFoundInHashError if the key is not present in the hash"""
216219
hash_key = self._get_hash_key()
@@ -225,6 +228,8 @@ async def get(self, key: str) -> Any:
225228
async def set(self, key: Literal["status"], value: StepStatus) -> None: ...
226229
@overload
227230
async def set(self, key: Literal["deferred_task_uid"], value: TaskUID) -> None: ...
231+
@overload
232+
async def set(self, key: Literal["error_traceback"], value: str) -> None: ...
228233
async def set(self, key: str, value: Any) -> None:
229234
await self._store.set(self._get_hash_key(), key, value)
230235

services/dynamic-scheduler/tests/unit/service_generic_scheduler/test__store.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,17 +163,16 @@ async def test_schedule_data_store_proxy_workflow(
163163
async def test_step_store_proxy_workflow(
164164
store: Store, schedule_id: ScheduleId, is_creating: bool
165165
):
166-
step_name = "MyStep"
167166
proxy = StepStoreProxy(
168167
store=store,
169168
schedule_id=schedule_id,
170169
operation_name="op1",
171170
step_group_name="sg1",
172-
step_name=step_name,
171+
step_name="step",
173172
is_creating=is_creating,
174173
)
175174
is_creating_str = "C" if is_creating else "D"
176-
hash_key = f"SCH:{schedule_id}:STEPS:op1:sg1:{is_creating_str}:{step_name}"
175+
hash_key = f"SCH:{schedule_id}:STEPS:op1:sg1:{is_creating_str}:step"
177176

178177
# set
179178
await proxy.set("status", StepStatus.RUNNING)
@@ -190,12 +189,18 @@ async def test_step_store_proxy_workflow(
190189

191190
# set multiple
192191
await proxy.set_multiple(
193-
{"status": StepStatus.SUCCESS, "deferred_task_uid": TaskUID("mytask")}
192+
{
193+
"status": StepStatus.SUCCESS,
194+
"deferred_task_uid": TaskUID("mytask"),
195+
"error_traceback": "mock_traceback",
196+
}
194197
)
195198
await _assert_keys(store, {hash_key})
196-
await _assert_keys_in_hash(store, hash_key, {"status", "deferred_task_uid"})
199+
await _assert_keys_in_hash(
200+
store, hash_key, {"status", "deferred_task_uid", "error_traceback"}
201+
)
197202

198203
# remove all keys an even missing ones
199-
await proxy.delete("status", "deferred_task_uid")
204+
await proxy.delete("status", "deferred_task_uid", "error_traceback")
200205
await _assert_keys(store, set())
201206
await _assert_keys_in_hash(store, hash_key, set())

0 commit comments

Comments
 (0)