Skip to content

Commit 60c042a

Browse files
committed
skeleton for distributed scheduler
1 parent 555a52f commit 60c042a

File tree

1 file changed

+10
-10
lines changed

1 file changed

+10
-10
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_distributed_scheduler.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,10 @@
1313
from servicelib.redis_utils import exclusive
1414
from servicelib.utils import limited_gather
1515
from settings_library.redis import RedisDatabase
16-
from simcore_service_director_v2.utils.rabbitmq import publish_project_log
1716

18-
from ...core.settings import get_application_settings
1917
from ...models.comp_runs import CompRunsAtDB, RunMetadataDict
2018
from ...utils.comp_scheduler import SCHEDULED_STATES
19+
from ...utils.rabbitmq import publish_project_log
2120
from ..comp_scheduler._models import SchedulePipelineRabbitMessage
2221
from ..db import get_db_engine
2322
from ..db.repositories.comp_pipelines import CompPipelinesRepository
@@ -63,7 +62,6 @@ async def _get_pipeline_dag(project_id: ProjectID, db_engine: Engine) -> nx.DiGr
6362

6463
@exclusive(_redis_client_getter, lock_key="computational-distributed-scheduler")
6564
async def schedule_pipelines(app: FastAPI) -> None:
66-
app_settings = get_application_settings(app)
6765
db_engine = get_db_engine(app)
6866
runs_to_schedule = await CompRunsRepository.instance(db_engine).list(
6967
filter_by_state=SCHEDULED_STATES, scheduled_since=_SCHEDULER_INTERVAL
@@ -107,9 +105,11 @@ async def run_new_pipeline(
107105
metadata=run_metadata,
108106
use_on_demand_clusters=use_on_demand_clusters,
109107
)
110-
await _distribute_pipeline(new_run, get_rabbitmq_client(app), db_engine)
108+
109+
rabbitmq_client = get_rabbitmq_client(app)
110+
await _distribute_pipeline(new_run, rabbitmq_client, db_engine)
111111
await publish_project_log(
112-
get_rabbitmq_client(app),
112+
rabbitmq_client,
113113
user_id,
114114
project_id,
115115
log=f"Project pipeline scheduled using {'on-demand clusters' if use_on_demand_clusters else 'pre-defined clusters'}, starting soon...",
@@ -124,18 +124,18 @@ async def stop_pipeline(
124124
project_id: ProjectID,
125125
iteration: int | None = None,
126126
) -> None:
127-
comp_run = await CompRunsRepository.instance(get_db_engine(app)).get(
127+
db_engine = get_db_engine(app)
128+
comp_run = await CompRunsRepository.instance(db_engine).get(
128129
user_id, project_id, iteration
129130
)
130131

131132
# mark the scheduled pipeline for stopping
132133
updated_comp_run = await CompRunsRepository.instance(
133-
get_db_engine(app)
134+
db_engine
134135
).mark_for_cancellation(
135136
user_id=user_id, project_id=project_id, iteration=comp_run.iteration
136137
)
137138
if updated_comp_run:
138139
# ensure the scheduler starts right away
139-
await _distribute_pipeline(
140-
updated_comp_run, get_rabbitmq_client(app), get_db_engine(app)
141-
)
140+
rabbitmq_client = get_rabbitmq_client(app)
141+
await _distribute_pipeline(updated_comp_run, rabbitmq_client, db_engine)

0 commit comments

Comments
 (0)