Skip to content

Commit 8d8f781

Browse files
committed
Merge branch 'master' into add-functions-locust-test
2 parents d902093 + 6c43a0d commit 8d8f781

File tree

10 files changed

+859
-750
lines changed

10 files changed

+859
-750
lines changed

packages/dask-task-models-library/src/dask_task_models_library/container_tasks/io.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,9 @@ def from_task_output(
195195
with suppress(json.JSONDecodeError):
196196
# NOTE: The suppression here is ok, since if the data is empty,
197197
# there will be a validation error anyway
198-
data = json_loads(output_data_file.read_text())
198+
loaded_data = json_loads(output_data_file.read_text())
199+
# ignore what is not in the schema
200+
data = {k: v for k, v in loaded_data.items() if k in schema}
199201

200202
for output_key, output_params in schema.items():
201203
if isinstance(output_params, FilePortSchema):

packages/dask-task-models-library/tests/container_tasks/test_io.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,3 +184,39 @@ def test_objects_are_compatible_with_dask_requirements(model_cls, model_cls_exam
184184
model_instance = model_cls.model_validate(example)
185185
reloaded_instance = loads(dumps(model_instance))
186186
assert reloaded_instance == model_instance
187+
188+
189+
def test_create_task_output_from_task_ignores_additional_entries(
190+
tmp_path: Path, faker: Faker
191+
):
192+
task_output_schema = TaskOutputDataSchema.model_validate(
193+
{
194+
"some_output_1": {
195+
"required": True,
196+
},
197+
"some_output_2": {
198+
"required": True,
199+
},
200+
}
201+
)
202+
output_file = _create_fake_outputs(task_output_schema, tmp_path, False, faker)
203+
assert output_file
204+
# Add more data to the output file to simulate additional entries
205+
file_path = tmp_path / output_file
206+
data = json.loads(file_path.read_text())
207+
# Ensure the file contains the expected keys first
208+
for key in task_output_schema:
209+
assert key in data
210+
# Add an extra key
211+
data["extra_key"] = "extra_value"
212+
file_path.write_text(json.dumps(data))
213+
214+
task_output_data = TaskOutputData.from_task_output(
215+
schema=task_output_schema,
216+
output_folder=tmp_path,
217+
output_file_ext=output_file,
218+
)
219+
# Only keys defined in the schema should be present
220+
assert set(task_output_data.keys()) == set(
221+
task_output_schema.keys()
222+
), "Should only contain the expected keys"

scripts/maintenance/computational-clusters/pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ dependencies = [
66
"black",
77
"boto3",
88
# NOTE: these must be in sync with ospar
9-
"cloudpickle==3.1.0",
10-
"dask[distributed]==2024.12.0",
9+
"cloudpickle",
10+
"dask[distributed]",
1111
"mypy_boto3_ec2",
1212
"types-boto3",
1313
"parse",
@@ -20,7 +20,7 @@ dependencies = [
2020
"sqlalchemy[asyncio]",
2121
"sshtunnel",
2222
"ansible>=10.7.0",
23-
"lz4==4.3.3",
23+
"lz4",
2424
]
2525
name = "autoscaled-monitor"
2626
version = "1.0.0"

scripts/maintenance/computational-clusters/uv.lock

Lines changed: 766 additions & 726 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

services/director-v2/src/simcore_service_director_v2/core/errors.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
""" Defines the different exceptions that may arise in the director
1+
"""Defines the different exceptions that may arise in the director
22
33
44
TODO: Exceptions should provide all info to create Error instances of the API model
@@ -88,13 +88,14 @@ def __init__(self, project_id: ProjectID, node_id: NodeID, **ctx: Any) -> None:
8888

8989
def get_errors(self) -> list[ErrorDict]:
9090
# default implementation
91+
9192
return [
9293
{
9394
"loc": (
9495
f"{self.project_id}",
9596
f"{self.node_id}",
9697
),
97-
"msg": f"{self.args[0]}",
98+
"msg": f"{self}",
9899
"type": self.code,
99100
},
100101
]

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -115,48 +115,54 @@ class SortedTasks:
115115
potentially_lost: list[CompTaskAtDB]
116116

117117

118+
_MAX_WAITING_TIME_FOR_UNKNOWN_TASKS: Final[datetime.timedelta] = datetime.timedelta(
119+
seconds=30
120+
)
121+
122+
118123
async def _triage_changed_tasks(
119-
changed_tasks_or_executing: list[tuple[_Previous, _Current]],
124+
changed_tasks: list[tuple[_Previous, _Current]],
120125
) -> SortedTasks:
121126
started_tasks = [
122127
current
123-
for previous, current in changed_tasks_or_executing
128+
for previous, current in changed_tasks
124129
if current.state in RUNNING_STATES
125130
or (
126131
previous.state in WAITING_FOR_START_STATES
127132
and current.state in COMPLETED_STATES
128133
)
129134
]
130135

131-
# NOTE: some tasks can be both started and completed since we might have the time they were running
132136
completed_tasks = [
133-
current
134-
for _, current in changed_tasks_or_executing
135-
if current.state in COMPLETED_STATES
137+
current for _, current in changed_tasks if current.state in COMPLETED_STATES
136138
]
137139

138140
waiting_for_resources_tasks = [
139141
current
140-
for previous, current in changed_tasks_or_executing
142+
for previous, current in changed_tasks
141143
if current.state in WAITING_FOR_START_STATES
142144
]
143145

144-
lost_or_momentarily_lost_tasks = [
146+
lost_tasks = [
145147
current
146-
for _, current in changed_tasks_or_executing
147-
if current.state is RunningState.UNKNOWN
148+
for previous, current in changed_tasks
149+
if (current.state is RunningState.UNKNOWN)
150+
and (
151+
(arrow.utcnow().datetime - previous.modified)
152+
> _MAX_WAITING_TIME_FOR_UNKNOWN_TASKS
153+
)
148154
]
149-
if lost_or_momentarily_lost_tasks:
155+
if lost_tasks:
150156
_logger.warning(
151157
"%s are currently in unknown state. TIP: If they are running in an external cluster and it is not yet ready, that might explain it. But inform @sanderegg nevertheless!",
152-
[t.node_id for t in lost_or_momentarily_lost_tasks],
158+
[t.node_id for t in lost_tasks],
153159
)
154160

155161
return SortedTasks(
156162
started_tasks,
157163
completed_tasks,
158164
waiting_for_resources_tasks,
159-
lost_or_momentarily_lost_tasks,
165+
lost_tasks,
160166
)
161167

162168

@@ -500,7 +506,7 @@ async def _update_states_from_comp_backend(
500506
# PENDING -> WAITING_FOR_RESOURCES (workers creation or missing) -> PENDING -> STARTED (worker started processing the task) -> SUCCESS/FAILED
501507
# or ABORTED (user cancelled) or UNKNOWN (lost task - it might be transient, be careful with this one)
502508
sorted_tasks = await _triage_changed_tasks(tasks_with_changed_states)
503-
509+
_logger.debug("found the following %s tasks with changed states", sorted_tasks)
504510
# now process the tasks
505511
if sorted_tasks.started:
506512
# NOTE: the dask-scheduler cannot differentiate between tasks that are effectively computing and

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@
2323
from models_library.users import UserID
2424
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
2525
from servicelib.logging_utils import log_catch
26-
from simcore_service_director_v2.modules.comp_scheduler._utils import (
27-
WAITING_FOR_START_STATES,
28-
)
2926

3027
from ...core.errors import (
3128
ComputationalBackendNotConnectedError,
@@ -52,6 +49,9 @@
5249
)
5350
from ..db.repositories.comp_tasks import CompTasksRepository
5451
from ._scheduler_base import BaseCompScheduler
52+
from ._utils import (
53+
WAITING_FOR_START_STATES,
54+
)
5555

5656
_logger = logging.getLogger(__name__)
5757

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ async def get_task_result(self, job_id: str) -> TaskOutputData:
517517
except KeyError as exc:
518518
raise ComputationalBackendTaskNotFoundError(job_id=job_id) from exc
519519
except distributed.TimeoutError as exc:
520-
raise ComputationalBackendTaskResultsNotReadyError from exc
520+
raise ComputationalBackendTaskResultsNotReadyError(job_id=job_id) from exc
521521

522522
async def release_task_result(self, job_id: str) -> None:
523523
_logger.debug("releasing results for %s", f"{job_id=}")

services/director-v2/src/simcore_service_director_v2/utils/dask.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
NodeportsException,
3535
S3InvalidPathError,
3636
StorageInvalidCall,
37+
UnboundPortError,
3738
)
3839
from simcore_sdk.node_ports_v2 import FileLinkType, Port, links, port_utils
3940
from simcore_sdk.node_ports_v2.links import ItemValue as _NPItemValue
@@ -147,6 +148,20 @@ async def parse_output_data(
147148
await (await ports.outputs)[port_key].set_value(value_to_transfer)
148149
except ValidationError as err:
149150
ports_errors.extend(_get_port_validation_errors(port_key, err))
151+
except UnboundPortError as err:
152+
ports_errors.extend(
153+
[
154+
{
155+
"loc": (
156+
f"{project_id}",
157+
f"{node_id}",
158+
f"{port_key}",
159+
),
160+
"msg": str(err),
161+
"type": "unbound_port",
162+
}
163+
]
164+
)
150165

151166
if ports_errors:
152167
raise PortsValidationError(

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1316,6 +1316,14 @@ async def test_handling_of_disconnected_scheduler_dask(
13161316
)
13171317

13181318

1319+
@pytest.fixture
1320+
def with_disabled_unknown_max_time(mocker: MockerFixture) -> None:
1321+
mocker.patch(
1322+
"simcore_service_director_v2.modules.comp_scheduler._scheduler_base._MAX_WAITING_TIME_FOR_UNKNOWN_TASKS",
1323+
new=datetime.timedelta(0),
1324+
)
1325+
1326+
13191327
@dataclass(frozen=True, kw_only=True)
13201328
class RebootState:
13211329
dask_task_status: RunningState
@@ -1397,6 +1405,7 @@ class RebootState:
13971405
async def test_handling_scheduled_tasks_after_director_reboots(
13981406
with_disabled_auto_scheduling: mock.Mock,
13991407
with_disabled_scheduler_publisher: mock.Mock,
1408+
with_disabled_unknown_max_time: None,
14001409
mocked_dask_client: mock.MagicMock,
14011410
sqlalchemy_async_engine: AsyncEngine,
14021411
running_project: RunningProject,

0 commit comments

Comments
 (0)