Skip to content

Commit 99d70df

Browse files
authored
🐛♻️ Bugfix/handle ever running tasks - refactor director-v2 workflow scheduler (ITISFoundation#2798)
* refactored dask client: use publish/unpublish on dask-scheduler instead of fire_and_forget * dask-client now returns task status/results * improved testing to replay issue * stop raising asyncio.CancelledError from dask-worker as not supported * cancellation is now handled using distributed.Event * set up log level when run as a scheduler * added test for dask compatibility * upgraded dask to latest * added blosc/lz4 to dask-distributed such that director-v2 also has the required libraries
1 parent 8fd4e45 commit 99d70df

File tree

48 files changed

+1610
-911
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1610
-911
lines changed

packages/dask-task-models-library/src/dask_task_models_library/container_tasks/errors.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@
55

66

77
class TaskValueError(PydanticErrorMixin, ValueError):
8-
pass
8+
code = "task.value_error"
9+
10+
11+
class TaskCancelledError(PydanticErrorMixin, RuntimeError):
12+
code = "task.cancelled_error"
13+
msg_template = "The task was cancelled"
914

1015

1116
class ServiceRuntimeError(PydanticErrorMixin, RuntimeError):
12-
code = "service_runtime_error"
17+
code = "service.runtime_error"
1318
msg_template = (
1419
"The service {service_key}:{service_version}"
1520
" in container {container_id} failed with code"

packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABC, abstractmethod
2-
from typing import Optional, Union
2+
from typing import Optional, Type, Union
33

44
from distributed.worker import get_worker
55
from models_library.projects_state import RunningState
@@ -19,21 +19,6 @@ class Config:
1919
extra = Extra.forbid
2020

2121

22-
class TaskCancelEvent(BaseTaskEvent):
23-
@staticmethod
24-
def topic_name() -> str:
25-
return "task_cancel"
26-
27-
class Config(BaseTaskEvent.Config):
28-
schema_extra = {
29-
"examples": [
30-
{
31-
"job_id": "simcore/services/comp/sleeper:1.1.0:projectid_ec7e595a-63ee-46a1-a04a-901b11b649f8:nodeid_39467d89-b659-4914-9359-c40b1b6d1d6d:uuid_5ee5c655-450d-4711-a3ec-32ffe16bc580",
32-
}
33-
]
34-
}
35-
36-
3722
class TaskStateEvent(BaseTaskEvent):
3823
state: RunningState
3924

@@ -111,4 +96,4 @@ class Config(BaseTaskEvent.Config):
11196
}
11297

11398

114-
DaskTaskEvents = Union[TaskLogEvent, TaskProgressEvent, TaskStateEvent]
99+
DaskTaskEvents = Type[Union[TaskLogEvent, TaskProgressEvent, TaskStateEvent]]

packages/dask-task-models-library/src/dask_task_models_library/container_tasks/io.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
)
1818
from pydantic.types import constr
1919

20+
TaskCancelEventName = "cancel_event_{}"
21+
2022

2123
class PortSchema(BaseModel):
2224
required: bool

packages/dask-task-models-library/tests/container_tasks/test_events.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import pytest
99
from dask_task_models_library.container_tasks.events import (
1010
BaseTaskEvent,
11-
TaskCancelEvent,
1211
TaskLogEvent,
1312
TaskProgressEvent,
1413
TaskStateEvent,
@@ -23,9 +22,7 @@ def test_task_event_abstract():
2322
BaseTaskEvent(job_id="some_fake") # type: ignore
2423

2524

26-
@pytest.mark.parametrize(
27-
"model_cls", [TaskStateEvent, TaskProgressEvent, TaskLogEvent, TaskCancelEvent]
28-
)
25+
@pytest.mark.parametrize("model_cls", [TaskStateEvent, TaskProgressEvent, TaskLogEvent])
2926
def test_events_models_examples(model_cls):
3027
examples = model_cls.Config.schema_extra["examples"]
3128

packages/dask-task-models-library/tests/container_tasks/test_io.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Optional
55

66
import pytest
7+
from cloudpickle import dumps, loads
78
from dask_task_models_library.container_tasks.io import (
89
FilePortSchema,
910
FileUrl,
@@ -167,3 +168,21 @@ def test_create_task_output_from_task_does_not_throw_when_there_are_optional_ent
167168
output_file_ext=faker.file_name(),
168169
)
169170
assert len(task_output_data) == 0
171+
172+
173+
@pytest.mark.parametrize(
174+
"model_cls",
175+
(
176+
TaskInputData,
177+
TaskOutputDataSchema,
178+
TaskOutputData,
179+
),
180+
)
181+
def test_objects_are_compatible_with_dask_requirements(model_cls, model_cls_examples):
182+
# NOTE: fcts could also be passed through the same test
183+
for name, example in model_cls_examples.items():
184+
print(name, ":", pformat(example))
185+
186+
model_instance = model_cls.parse_obj(example)
187+
reloaded_instance = loads(dumps(model_instance))
188+
assert reloaded_instance == model_instance

services/catalog/src/simcore_service_catalog/core/application.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
def init_app(settings: Optional[AppSettings] = None) -> FastAPI:
3434
if settings is None:
3535
settings = AppSettings.create_from_envs()
36-
36+
assert settings # nosec
3737
logging.basicConfig(level=settings.CATALOG_LOG_LEVEL.value)
3838
logging.root.setLevel(settings.CATALOG_LOG_LEVEL.value)
3939
logger.debug(settings.json(indent=2))
@@ -50,7 +50,6 @@ def init_app(settings: Optional[AppSettings] = None) -> FastAPI:
5050
)
5151
override_fastapi_openapi_method(app)
5252

53-
logger.debug(settings)
5453
app.state.settings = settings
5554

5655
setup_function_services(app)

services/dask-sidecar/docker/boot.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ fi
3535

3636
if [ ${DASK_START_AS_SCHEDULER+x} ]; then
3737
scheduler_version=$(dask-scheduler --version)
38+
mkdir --parents /home/scu/.config/dask
39+
dask_logging=$(printf "logging:\n distributed: %s\n distributed.scheduler: %s" "${LOG_LEVEL:-warning}" "${LOG_LEVEL:-warning}")
40+
echo "$dask_logging" >> /home/scu/.config/dask/distributed.yaml
41+
3842
echo "$INFO" "Starting as dask-scheduler:${scheduler_version}..."
3943
if [ "${SC_BOOT_MODE}" = "debug-ptvsd" ]; then
4044
exec watchmedo auto-restart \

services/dask-sidecar/requirements/_base.in

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ pydantic[email,dotenv]
2929
blosc
3030
lz4
3131

32+
33+
3234
# Cython implementation of Toolz: A set of utility functions for iterators, functions, and dictionaries.
3335
cytoolz
3436

services/dask-sidecar/requirements/_base.txt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ attrs==20.2.0
3939
# jsonschema
4040
bleach==3.3.0
4141
# via nbconvert
42-
blosc==1.10.4
42+
blosc==1.10.6
4343
# via -r requirements/_base.in
4444
bokeh==2.4.2
4545
# via dask
@@ -59,7 +59,7 @@ cloudpickle==2.0.0
5959
# distributed
6060
cytoolz==0.11.0
6161
# via -r requirements/_base.in
62-
dask==2021.12.0
62+
dask==2022.2.0
6363
# via
6464
# -c requirements/../../../packages/dask-task-models-library/requirements/_base.in
6565
# -r requirements/_base.in
@@ -69,7 +69,7 @@ dask-gateway==0.9.0
6969
# via -r requirements/_base.in
7070
defusedxml==0.7.1
7171
# via nbconvert
72-
distributed==2021.12.0
72+
distributed==2022.2.0
7373
# via
7474
# dask
7575
# dask-gateway
@@ -140,7 +140,7 @@ jupyterlab-pygments==0.1.2
140140
# via nbconvert
141141
locket==0.2.1
142142
# via partd
143-
lz4==3.1.3
143+
lz4==4.0.0
144144
# via -r requirements/_base.in
145145
markupsafe==2.0.1
146146
# via jinja2
@@ -174,6 +174,7 @@ packaging==20.4
174174
# bleach
175175
# bokeh
176176
# dask
177+
# distributed
177178
pandas==1.2.4
178179
# via
179180
# -r requirements/_base.in

services/dask-sidecar/requirements/_dask-complete.in

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,7 @@
88

99
# SEE https://github.com/dask/dask/blob/main/setup.py#L12 for extra reqs
1010
dask[complete]
11+
12+
# compression
13+
blosc
14+
lz4

0 commit comments

Comments
 (0)