Skip to content

Commit 01fa10d

Browse files
fix
1 parent 673f113 commit 01fa10d

File tree

6 files changed

+17
-6
lines changed

6 files changed

+17
-6
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ async def _send_running_tasks_heartbeat(
284284
self,
285285
user_id: UserID,
286286
project_id: ProjectID,
287+
run_id: PositiveInt,
287288
iteration: Iteration,
288289
dag: nx.DiGraph,
289290
) -> None:
@@ -322,7 +323,7 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool:
322323
await asyncio.gather(
323324
*(
324325
comp_tasks_repo.update_project_task_last_heartbeat(
325-
t.project_id, t.node_id, utc_now
326+
t.project_id, t.node_id, run_id, utc_now
326327
)
327328
for t in running_tasks
328329
)
@@ -643,7 +644,7 @@ async def apply(
643644
)
644645
# 5. send a heartbeat
645646
await self._send_running_tasks_heartbeat(
646-
user_id, project_id, iteration, dag
647+
user_id, project_id, comp_run.run_id, iteration, dag
647648
)
648649

649650
# 6. Update the run result

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,10 +391,10 @@ async def _task_progress_change_handler(
391391
node_id = task_progress_event.task_owner.node_id
392392
comp_tasks_repo = CompTasksRepository(self.db_engine)
393393
task = await comp_tasks_repo.get_task(project_id, node_id)
394+
run = await CompRunsRepository(self.db_engine).get(user_id, project_id)
394395
if task.state in WAITING_FOR_START_STATES:
395396
task.state = RunningState.STARTED
396397
task.progress = task_progress_event.progress
397-
run = await CompRunsRepository(self.db_engine).get(user_id, project_id)
398398
await self._process_started_tasks(
399399
[task],
400400
user_id=user_id,
@@ -405,7 +405,7 @@ async def _task_progress_change_handler(
405405
)
406406
else:
407407
await comp_tasks_repo.update_project_task_progress(
408-
project_id, node_id, task_progress_event.progress
408+
project_id, node_id, run.run_id, task_progress_event.progress
409409
)
410410
await publish_service_progress(
411411
self.rabbitmq_client,

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
from models_library.resource_tracker import HardwareInfo
5858
from models_library.services import ServiceRunID
5959
from models_library.users import UserID
60-
from pydantic import PositiveInt, TypeAdapter, ValidationError
60+
from pydantic import TypeAdapter, ValidationError
6161
from pydantic.networks import AnyUrl
6262
from servicelib.logging_utils import log_catch, log_context
6363
from settings_library.s3 import S3Settings
@@ -267,7 +267,6 @@ async def send_computation_tasks(
267267
*,
268268
user_id: UserID,
269269
project_id: ProjectID,
270-
run_id: PositiveInt,
271270
tasks: dict[NodeID, Image],
272271
callback: _UserCallbackInSepThread,
273272
remote_fct: ContainerRemoteFct | None = None,

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ async def test_schedule_all_pipelines(
156156
project_id=published_project.project.uuid,
157157
run_metadata=run_metadata,
158158
use_on_demand_clusters=False,
159+
filtered_comp_tasks_in_db=[],
159160
)
160161
# this directly schedule a new pipeline
161162
scheduler_rabbit_client_parser.assert_called_once_with(
@@ -257,6 +258,7 @@ async def test_schedule_all_pipelines_logs_error_if_it_find_old_pipelines(
257258
project_id=published_project.project.uuid,
258259
run_metadata=run_metadata,
259260
use_on_demand_clusters=False,
261+
filtered_comp_tasks_in_db=[],
260262
)
261263
# this directly schedule a new pipeline
262264
scheduler_rabbit_client_parser.assert_called_once_with(
@@ -340,6 +342,7 @@ async def test_empty_pipeline_is_not_scheduled(
340342
project_id=empty_project.uuid,
341343
run_metadata=run_metadata,
342344
use_on_demand_clusters=False,
345+
filtered_comp_tasks_in_db=[],
343346
)
344347
await assert_comp_runs_empty(sqlalchemy_async_engine)
345348
scheduler_rabbit_client_parser.assert_not_called()
@@ -355,6 +358,7 @@ async def test_empty_pipeline_is_not_scheduled(
355358
project_id=empty_project.uuid,
356359
run_metadata=run_metadata,
357360
use_on_demand_clusters=False,
361+
filtered_comp_tasks_in_db=[],
358362
)
359363
assert len(caplog.records) == 1
360364
assert "no computational dag defined" in caplog.records[0].message

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ async def _assert_start_pipeline(
169169
project_id=published_project.project.uuid,
170170
run_metadata=run_metadata,
171171
use_on_demand_clusters=False,
172+
filtered_comp_tasks_in_db=[],
172173
)
173174

174175
# check the database is correctly updated, the run is published
@@ -1124,6 +1125,7 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted(
11241125
project_id=sleepers_project.uuid,
11251126
run_metadata=run_metadata,
11261127
use_on_demand_clusters=False,
1128+
filtered_comp_tasks_in_db=[],
11271129
)
11281130
with_disabled_scheduler_publisher.assert_called_once()
11291131
# we shall have a a new comp_runs row with the new pipeline job
@@ -1251,6 +1253,7 @@ async def test_handling_of_disconnected_scheduler_dask(
12511253
project_id=published_project.project.uuid,
12521254
run_metadata=run_metadata,
12531255
use_on_demand_clusters=False,
1256+
filtered_comp_tasks_in_db=[],
12541257
)
12551258

12561259
# since there is no cluster, there is no dask-scheduler,
@@ -1766,6 +1769,7 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits(
17661769
project_id=published_project.project.uuid,
17671770
run_metadata=run_metadata,
17681771
use_on_demand_clusters=True,
1772+
filtered_comp_tasks_in_db=[],
17691773
)
17701774

17711775
# we ask to use an on-demand cluster, therefore the tasks are published first
@@ -1870,6 +1874,7 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
18701874
project_id=published_project.project.uuid,
18711875
run_metadata=run_metadata,
18721876
use_on_demand_clusters=True,
1877+
filtered_comp_tasks_in_db=[],
18731878
)
18741879

18751880
# we ask to use an on-demand cluster, therefore the tasks are published first

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ async def test_worker_properly_autocalls_scheduler_api(
6969
project_id=published_project.project.uuid,
7070
run_metadata=run_metadata,
7171
use_on_demand_clusters=False,
72+
filtered_comp_tasks_in_db=[],
7273
)
7374
mocked_get_scheduler_worker.assert_called_once_with(initialized_app)
7475
mocked_get_scheduler_worker.return_value.apply.assert_called_once_with(
@@ -126,6 +127,7 @@ async def _project_pipeline_creation_workflow() -> None:
126127
project_id=published_project.project.uuid,
127128
run_metadata=run_metadata,
128129
use_on_demand_clusters=False,
130+
filtered_comp_tasks_in_db=[],
129131
)
130132

131133
# whatever scheduling concurrency we call in here, we shall always see the same number of calls to the scheduler

0 commit comments

Comments
 (0)