Skip to content

Commit 1b872c5

Browse files
committed
the callback is run in a separate thread
1 parent 89fb6a2 commit 1b872c5

File tree

2 files changed

+17
-12
lines changed
  • services/director-v2

2 files changed

+17
-12
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,34 @@
11
import functools
22
import logging
3-
from typing import cast
3+
from typing import Callable, cast
44

55
from fastapi import FastAPI
6+
from models_library.projects import ProjectID
7+
from models_library.users import UserID
68
from servicelib.logging_utils import log_context
79

810
from ..rabbitmq import get_rabbitmq_client
9-
from ._models import SchedulePipelineRabbitMessage
11+
from ._models import Iteration, SchedulePipelineRabbitMessage
1012
from ._scheduler_base import BaseCompScheduler
1113
from ._scheduler_factory import create_scheduler
1214

1315
_logger = logging.getLogger(__name__)
1416

1517

1618
def _empty_wake_up_callack(
17-
# app: FastAPI, user_id: UserID, project_id: ProjectID, iteration: Iteration
18-
) -> None:
19+
app: FastAPI, user_id: UserID, project_id: ProjectID, iteration: Iteration
20+
) -> Callable[[], None]:
21+
def _cb() -> None:
22+
...
23+
1924
# async def _async_cb():
2025
# db_engine = get_db_engine(app)
2126
# rabbit_mq_client = get_rabbitmq_client(app)
2227
# comp_run = await CompRunsRepository.instance(db_engine).get(
2328
# user_id=user_id, project_id=project_id, iteration=iteration
2429
# )
2530
# await request_pipeline_scheduling(comp_run, rabbit_mq_client, db_engine)
26-
...
31+
return _cb
2732

2833

2934
def _get_scheduler_worker(app: FastAPI) -> BaseCompScheduler:
@@ -34,15 +39,16 @@ async def _handle_distributed_pipeline(app: FastAPI, data: bytes) -> bool:
3439

3540
with log_context(_logger, logging.DEBUG, msg="handling scheduling"):
3641
to_schedule_pipeline = SchedulePipelineRabbitMessage.parse_raw(data)
37-
# get_rabbitmq_client(app).publish(
38-
# SchedulePipelineRabbitMessage.get_channel_name(),
39-
# to_schedule_pipeline,
40-
# )
4142
await _get_scheduler_worker(app).schedule_pipeline(
4243
user_id=to_schedule_pipeline.user_id,
4344
project_id=to_schedule_pipeline.project_id,
4445
iteration=to_schedule_pipeline.iteration,
45-
wake_up_callback=_empty_wake_up_callack,
46+
wake_up_callback=_empty_wake_up_callack(
47+
app,
48+
to_schedule_pipeline.user_id,
49+
to_schedule_pipeline.project_id,
50+
to_schedule_pipeline.iteration,
51+
),
4652
)
4753
return True
4854

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from simcore_service_director_v2.models.comp_runs import RunMetadataDict
1818
from simcore_service_director_v2.modules.comp_scheduler._manager import run_new_pipeline
1919
from simcore_service_director_v2.modules.comp_scheduler._worker import (
20-
_empty_wake_up_callack,
2120
_get_scheduler_worker,
2221
)
2322

@@ -68,5 +67,5 @@ async def test_worker_properly_calls_scheduler_api(
6867
user_id=published_project.project.prj_owner,
6968
project_id=published_project.project.uuid,
7069
iteration=1,
71-
wake_up_callback=_empty_wake_up_callack,
70+
wake_up_callback=mock.ANY,
7271
)

0 commit comments

Comments
 (0)