Skip to content

Commit cac0f24

Browse files
committed
removed wake_up callback from api interface
1 parent 9e940f6 commit cac0f24

File tree

4 files changed

+21
-69
lines changed

4 files changed

+21
-69
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,22 @@
7676
_MAX_WAITING_FOR_CLUSTER_TIMEOUT_IN_MIN: Final[int] = 10
7777

7878

79+
def _temporary_empty_wake_up_callack(
80+
user_id: UserID, project_id: ProjectID, iteration: Iteration
81+
) -> Callable[[], None]:
82+
def _cb() -> None:
83+
...
84+
85+
# async def _async_cb():
86+
# db_engine = get_db_engine(app)
87+
# rabbit_mq_client = get_rabbitmq_client(app)
88+
# comp_run = await CompRunsRepository.instance(db_engine).get(
89+
# user_id=user_id, project_id=project_id, iteration=iteration
90+
# )
91+
# await request_pipeline_scheduling(comp_run, rabbit_mq_client, db_engine)
92+
return _cb
93+
94+
7995
@dataclass(frozen=True, slots=True)
8096
class SortedTasks:
8197
started: list[CompTaskAtDB]
@@ -496,9 +512,6 @@ async def schedule_pipeline(
496512
user_id: UserID,
497513
project_id: ProjectID,
498514
iteration: Iteration,
499-
wake_up_callback: Callable[
500-
[], None
501-
], # TODO: this should not be in the interface
502515
) -> None:
503516
"""schedules a pipeline for a given user, project and iteration.
504517
@@ -537,7 +550,9 @@ async def schedule_pipeline(
537550
comp_tasks=comp_tasks,
538551
dag=dag,
539552
comp_run=comp_run,
540-
wake_up_callback=wake_up_callback,
553+
wake_up_callback=_temporary_empty_wake_up_callack(
554+
user_id, project_id, iteration
555+
),
541556
)
542557
# 4. timeout if waiting for cluster has been there for more than X minutes
543558
comp_tasks = await self._timeout_if_waiting_for_cluster_too_long(

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,18 @@
11
import functools
22
import logging
3-
from typing import Callable, cast
3+
from typing import cast
44

55
from fastapi import FastAPI
6-
from models_library.projects import ProjectID
7-
from models_library.users import UserID
86
from servicelib.logging_utils import log_context
97

108
from ..rabbitmq import get_rabbitmq_client
11-
from ._models import Iteration, SchedulePipelineRabbitMessage
9+
from ._models import SchedulePipelineRabbitMessage
1210
from ._scheduler_base import BaseCompScheduler
1311
from ._scheduler_factory import create_scheduler
1412

1513
_logger = logging.getLogger(__name__)
1614

1715

18-
def _empty_wake_up_callack(
19-
app: FastAPI, user_id: UserID, project_id: ProjectID, iteration: Iteration
20-
) -> Callable[[], None]:
21-
def _cb() -> None:
22-
...
23-
24-
# async def _async_cb():
25-
# db_engine = get_db_engine(app)
26-
# rabbit_mq_client = get_rabbitmq_client(app)
27-
# comp_run = await CompRunsRepository.instance(db_engine).get(
28-
# user_id=user_id, project_id=project_id, iteration=iteration
29-
# )
30-
# await request_pipeline_scheduling(comp_run, rabbit_mq_client, db_engine)
31-
return _cb
32-
33-
3416
def _get_scheduler_worker(app: FastAPI) -> BaseCompScheduler:
3517
return cast(BaseCompScheduler, app.state.scheduler_worker)
3618

@@ -43,12 +25,6 @@ async def _handle_distributed_pipeline(app: FastAPI, data: bytes) -> bool:
4325
user_id=to_schedule_pipeline.user_id,
4426
project_id=to_schedule_pipeline.project_id,
4527
iteration=to_schedule_pipeline.iteration,
46-
wake_up_callback=_empty_wake_up_callack(
47-
app,
48-
to_schedule_pipeline.user_id,
49-
to_schedule_pipeline.project_id,
50-
to_schedule_pipeline.iteration,
51-
),
5228
)
5329
return True
5430

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted(
173173
fake_workbench_adjacency: dict[str, Any],
174174
sqlalchemy_async_engine: AsyncEngine,
175175
run_metadata: RunMetadataDict,
176-
mocked_wake_up_callback: mock.Mock,
177176
):
178177
"""A pipeline which comp_tasks are missing should not be scheduled.
179178
It shall be aborted and shown as such in the comp_runs db"""
@@ -211,10 +210,7 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted(
211210
user_id=run_entry.user_id,
212211
project_id=run_entry.project_uuid,
213212
iteration=run_entry.iteration,
214-
wake_up_callback=mocked_wake_up_callback,
215213
)
216-
# the pipeline is misconfigured, so the callback will NOT be called since nothing ran
217-
mocked_wake_up_callback.assert_not_called()
218214

219215
# check the database entry is correctly updated
220216
await assert_comp_runs(
@@ -270,7 +266,6 @@ async def _assert_schedule_pipeline_PENDING( # noqa: N802
270266
published_tasks: list[CompTaskAtDB],
271267
mocked_dask_client: mock.MagicMock,
272268
scheduler: BaseCompScheduler,
273-
wake_up_callback: Callable[[], None],
274269
) -> list[CompTaskAtDB]:
275270
expected_pending_tasks = [
276271
published_tasks[1],
@@ -288,7 +283,6 @@ async def _return_tasks_pending(job_ids: list[str]) -> list[DaskClientTaskState]
288283
user_id=published_project.project.prj_owner,
289284
project_id=published_project.project.uuid,
290285
iteration=1,
291-
wake_up_callback=wake_up_callback,
292286
)
293287
_assert_dask_client_correctly_initialized(mocked_dask_client, scheduler)
294288
await assert_comp_runs(
@@ -340,7 +334,6 @@ async def _return_tasks_pending(job_ids: list[str]) -> list[DaskClientTaskState]
340334
user_id=published_project.project.prj_owner,
341335
project_id=published_project.project.uuid,
342336
iteration=1,
343-
wake_up_callback=wake_up_callback,
344337
)
345338
await assert_comp_runs(
346339
sqlalchemy_async_engine,
@@ -485,7 +478,6 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915
485478
instrumentation_rabbit_client_parser: mock.AsyncMock,
486479
resource_tracking_rabbit_client_parser: mock.AsyncMock,
487480
run_metadata: RunMetadataDict,
488-
mocked_wake_up_callback: mock.Mock,
489481
):
490482
_mock_send_computation_tasks(published_project.tasks, mocked_dask_client)
491483

@@ -504,7 +496,6 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915
504496
expected_published_tasks,
505497
mocked_dask_client,
506498
scheduler,
507-
mocked_wake_up_callback,
508499
)
509500

510501
# -------------------------------------------------------------------------------
@@ -528,7 +519,6 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[DaskClientTaskSta
528519
user_id=run_in_db.user_id,
529520
project_id=run_in_db.project_uuid,
530521
iteration=run_in_db.iteration,
531-
wake_up_callback=mocked_wake_up_callback,
532522
)
533523
await assert_comp_runs(
534524
sqlalchemy_async_engine,
@@ -586,7 +576,6 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[DaskClientTaskSta
586576
user_id=run_in_db.user_id,
587577
project_id=run_in_db.project_uuid,
588578
iteration=run_in_db.iteration,
589-
wake_up_callback=mocked_wake_up_callback,
590579
)
591580
# comp_run, the comp_task switch to STARTED
592581
await assert_comp_runs(
@@ -663,7 +652,6 @@ async def _return_random_task_result(job_id) -> TaskOutputData:
663652
user_id=run_in_db.user_id,
664653
project_id=run_in_db.project_uuid,
665654
iteration=run_in_db.iteration,
666-
wake_up_callback=mocked_wake_up_callback,
667655
)
668656
await assert_comp_runs(
669657
sqlalchemy_async_engine,
@@ -771,7 +759,6 @@ async def _return_2nd_task_running(job_ids: list[str]) -> list[DaskClientTaskSta
771759
user_id=run_in_db.user_id,
772760
project_id=run_in_db.project_uuid,
773761
iteration=run_in_db.iteration,
774-
wake_up_callback=mocked_wake_up_callback,
775762
)
776763
await assert_comp_runs(
777764
sqlalchemy_async_engine,
@@ -826,7 +813,6 @@ async def _return_2nd_task_failed(job_ids: list[str]) -> list[DaskClientTaskStat
826813
user_id=run_in_db.user_id,
827814
project_id=run_in_db.project_uuid,
828815
iteration=run_in_db.iteration,
829-
wake_up_callback=mocked_wake_up_callback,
830816
)
831817
await assert_comp_runs(
832818
sqlalchemy_async_engine,
@@ -886,7 +872,6 @@ async def _return_3rd_task_success(job_ids: list[str]) -> list[DaskClientTaskSta
886872
user_id=run_in_db.user_id,
887873
project_id=run_in_db.project_uuid,
888874
iteration=run_in_db.iteration,
889-
wake_up_callback=mocked_wake_up_callback,
890875
)
891876
await assert_comp_runs(
892877
sqlalchemy_async_engine,
@@ -938,7 +923,6 @@ async def test_task_progress_triggers(
938923
mocked_parse_output_data_fct: None,
939924
mocked_clean_task_output_and_log_files_if_invalid: None,
940925
run_metadata: RunMetadataDict,
941-
mocked_wake_up_callback: mock.Mock,
942926
):
943927
_mock_send_computation_tasks(published_project.tasks, mocked_dask_client)
944928
_run_in_db, expected_published_tasks = await _assert_start_pipeline(
@@ -955,7 +939,6 @@ async def test_task_progress_triggers(
955939
expected_published_tasks,
956940
mocked_dask_client,
957941
scheduler,
958-
mocked_wake_up_callback,
959942
)
960943

961944
# send some progress
@@ -1008,7 +991,6 @@ async def test_handling_of_disconnected_scheduler_dask(
1008991
published_project: PublishedProject,
1009992
backend_error: SchedulerError,
1010993
run_metadata: RunMetadataDict,
1011-
mocked_wake_up_callback: mock.Mock,
1012994
):
1013995
# this will create a non connected backend issue that will trigger re-connection
1014996
mocked_dask_client_send_task = mocker.patch(
@@ -1060,7 +1042,6 @@ async def test_handling_of_disconnected_scheduler_dask(
10601042
user_id=run_in_db.user_id,
10611043
project_id=run_in_db.project_uuid,
10621044
iteration=run_in_db.iteration,
1063-
wake_up_callback=mocked_wake_up_callback,
10641045
)
10651046
# after this step the tasks are marked as ABORTED
10661047
await assert_comp_tasks(
@@ -1079,7 +1060,6 @@ async def test_handling_of_disconnected_scheduler_dask(
10791060
user_id=run_in_db.user_id,
10801061
project_id=run_in_db.project_uuid,
10811062
iteration=run_in_db.iteration,
1082-
wake_up_callback=mocked_wake_up_callback,
10831063
)
10841064
# now the run should be ABORTED
10851065
await assert_comp_runs(
@@ -1181,7 +1161,6 @@ async def test_handling_scheduling_after_reboot(
11811161
mocked_parse_output_data_fct: mock.MagicMock,
11821162
mocked_clean_task_output_fct: mock.MagicMock,
11831163
reboot_state: RebootState,
1184-
mocked_wake_up_callback: mock.Mock,
11851164
):
11861165
"""After the dask client is rebooted, or that the director-v2 reboots the dv-2 internal scheduler
11871166
shall continue scheduling correctly. Even though the task might have continued to run
@@ -1203,7 +1182,6 @@ async def mocked_get_task_result(_job_id: str) -> TaskOutputData:
12031182
user_id=running_project.project.prj_owner,
12041183
project_id=running_project.project.uuid,
12051184
iteration=1,
1206-
wake_up_callback=mocked_wake_up_callback,
12071185
)
12081186
# the status will be called once for all RUNNING tasks
12091187
mocked_dask_client.get_tasks_status.assert_called_once()
@@ -1275,7 +1253,6 @@ async def test_handling_cancellation_of_jobs_after_reboot(
12751253
scheduler: BaseCompScheduler,
12761254
mocked_parse_output_data_fct: mock.MagicMock,
12771255
mocked_clean_task_output_fct: mock.MagicMock,
1278-
mocked_wake_up_callback: mock.Mock,
12791256
):
12801257
"""A running pipeline was cancelled by a user and the DV-2 was restarted BEFORE
12811258
It could actually cancel the task. On reboot the DV-2 shall recover
@@ -1314,7 +1291,6 @@ async def mocked_get_tasks_status(job_ids: list[str]) -> list[DaskClientTaskStat
13141291
user_id=run_in_db.user_id,
13151292
project_id=run_in_db.project_uuid,
13161293
iteration=run_in_db.iteration,
1317-
wake_up_callback=mocked_wake_up_callback,
13181294
)
13191295
mocked_dask_client.abort_computation_task.assert_called()
13201296
assert mocked_dask_client.abort_computation_task.call_count == len(
@@ -1364,7 +1340,6 @@ async def _return_random_task_result(job_id) -> TaskOutputData:
13641340
user_id=run_in_db.user_id,
13651341
project_id=run_in_db.project_uuid,
13661342
iteration=run_in_db.iteration,
1367-
wake_up_callback=mocked_wake_up_callback,
13681343
)
13691344
# now should be stopped
13701345
await assert_comp_tasks(
@@ -1410,7 +1385,6 @@ async def test_running_pipeline_triggers_heartbeat(
14101385
published_project: PublishedProject,
14111386
resource_tracking_rabbit_client_parser: mock.AsyncMock,
14121387
run_metadata: RunMetadataDict,
1413-
mocked_wake_up_callback: mock.Mock,
14141388
):
14151389
_mock_send_computation_tasks(published_project.tasks, mocked_dask_client)
14161390
run_in_db, expected_published_tasks = await _assert_start_pipeline(
@@ -1427,7 +1401,6 @@ async def test_running_pipeline_triggers_heartbeat(
14271401
expected_published_tasks,
14281402
mocked_dask_client,
14291403
scheduler,
1430-
mocked_wake_up_callback,
14311404
)
14321405
# -------------------------------------------------------------------------------
14331406
# 2. the "worker" starts processing a task
@@ -1458,7 +1431,6 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[DaskClientTaskSta
14581431
user_id=run_in_db.user_id,
14591432
project_id=run_in_db.project_uuid,
14601433
iteration=run_in_db.iteration,
1461-
wake_up_callback=mocked_wake_up_callback,
14621434
)
14631435

14641436
messages = await _assert_message_received(
@@ -1475,13 +1447,11 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[DaskClientTaskSta
14751447
user_id=run_in_db.user_id,
14761448
project_id=run_in_db.project_uuid,
14771449
iteration=run_in_db.iteration,
1478-
wake_up_callback=mocked_wake_up_callback,
14791450
)
14801451
await scheduler.schedule_pipeline(
14811452
user_id=run_in_db.user_id,
14821453
project_id=run_in_db.project_uuid,
14831454
iteration=run_in_db.iteration,
1484-
wake_up_callback=mocked_wake_up_callback,
14851455
)
14861456
messages = await _assert_message_received(
14871457
resource_tracking_rabbit_client_parser,
@@ -1497,13 +1467,11 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[DaskClientTaskSta
14971467
user_id=run_in_db.user_id,
14981468
project_id=run_in_db.project_uuid,
14991469
iteration=run_in_db.iteration,
1500-
wake_up_callback=mocked_wake_up_callback,
15011470
)
15021471
await scheduler.schedule_pipeline(
15031472
user_id=run_in_db.user_id,
15041473
project_id=run_in_db.project_uuid,
15051474
iteration=run_in_db.iteration,
1506-
wake_up_callback=mocked_wake_up_callback,
15071475
)
15081476
messages = await _assert_message_received(
15091477
resource_tracking_rabbit_client_parser,
@@ -1531,7 +1499,6 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits(
15311499
run_metadata: RunMetadataDict,
15321500
mocked_get_or_create_cluster: mock.Mock,
15331501
faker: Faker,
1534-
mocked_wake_up_callback: mock.Mock,
15351502
):
15361503
mocked_get_or_create_cluster.side_effect = (
15371504
ComputationalBackendOnDemandNotReadyError(
@@ -1578,7 +1545,6 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits(
15781545
user_id=run_in_db.user_id,
15791546
project_id=run_in_db.project_uuid,
15801547
iteration=run_in_db.iteration,
1581-
wake_up_callback=mocked_wake_up_callback,
15821548
)
15831549
mocked_get_or_create_cluster.assert_called()
15841550
assert mocked_get_or_create_cluster.call_count == 1
@@ -1604,7 +1570,6 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits(
16041570
user_id=run_in_db.user_id,
16051571
project_id=run_in_db.project_uuid,
16061572
iteration=run_in_db.iteration,
1607-
wake_up_callback=mocked_wake_up_callback,
16081573
)
16091574
mocked_get_or_create_cluster.assert_called()
16101575
assert mocked_get_or_create_cluster.call_count == 1
@@ -1641,7 +1606,6 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
16411606
run_metadata: RunMetadataDict,
16421607
mocked_get_or_create_cluster: mock.Mock,
16431608
get_or_create_exception: Exception,
1644-
mocked_wake_up_callback: mock.Mock,
16451609
):
16461610
mocked_get_or_create_cluster.side_effect = get_or_create_exception
16471611
# running the pipeline will trigger a call to the clusters-keeper
@@ -1683,7 +1647,6 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
16831647
user_id=run_in_db.user_id,
16841648
project_id=run_in_db.project_uuid,
16851649
iteration=run_in_db.iteration,
1686-
wake_up_callback=mocked_wake_up_callback,
16871650
)
16881651
mocked_get_or_create_cluster.assert_called()
16891652
assert mocked_get_or_create_cluster.call_count == 1
@@ -1709,7 +1672,6 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
17091672
user_id=run_in_db.user_id,
17101673
project_id=run_in_db.project_uuid,
17111674
iteration=run_in_db.iteration,
1712-
wake_up_callback=mocked_wake_up_callback,
17131675
)
17141676
mocked_get_or_create_cluster.assert_not_called()
17151677
await assert_comp_runs(

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,5 +67,4 @@ async def test_worker_properly_calls_scheduler_api(
6767
user_id=published_project.project.prj_owner,
6868
project_id=published_project.project.uuid,
6969
iteration=1,
70-
wake_up_callback=mock.ANY,
7170
)

0 commit comments

Comments
 (0)