Skip to content

Commit d717200

Browse files
committed
manager checks for properly scheduled tasks and lost ones
1 parent fb59f2e commit d717200

File tree

3 files changed

+137
-24
lines changed

3 files changed

+137
-24
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from typing import Final
23

34
import networkx as nx
45
from aiopg.sa import Engine
@@ -111,6 +112,9 @@ async def _get_pipeline_dag(project_id: ProjectID, db_engine: Engine) -> nx.DiGr
111112
return pipeline_at_db.get_graph()
112113

113114

115+
_LOST_TASKS_FACTOR: Final[int] = 10
116+
117+
114118
@exclusive(
115119
get_redis_client_from_app,
116120
lock_key=get_redis_lock_key(MODULE_NAME_SCHEDULER, unique_lock_key_builder=None),
@@ -119,23 +123,35 @@ async def schedule_all_pipelines(app: FastAPI) -> None:
119123
with log_context(_logger, logging.DEBUG, msg="scheduling pipelines"):
120124
db_engine = get_db_engine(app)
121125
runs_to_schedule = await CompRunsRepository.instance(db_engine).list(
122-
filter_by_state=SCHEDULED_STATES, scheduled_since=SCHEDULER_INTERVAL
126+
filter_by_state=SCHEDULED_STATES, need_scheduling=True
123127
)
128+
possibly_lost_scheduled_pipelines = await CompRunsRepository.instance(
129+
db_engine
130+
).list(
131+
filter_by_state=SCHEDULED_STATES,
132+
scheduled_since=SCHEDULER_INTERVAL * _LOST_TASKS_FACTOR,
133+
)
134+
if possibly_lost_scheduled_pipelines:
135+
_logger.error(
136+
"found %d lost pipelines, they will be re-scheduled now",
137+
len(possibly_lost_scheduled_pipelines),
138+
)
124139

125140
rabbitmq_client = get_rabbitmq_client(app)
126-
await limited_gather(
127-
*(
128-
request_pipeline_scheduling(
129-
rabbitmq_client,
130-
db_engine,
131-
user_id=run.user_id,
132-
project_id=run.project_uuid,
133-
iteration=run.iteration,
134-
)
135-
for run in runs_to_schedule
136-
),
137-
limit=MAX_CONCURRENT_PIPELINE_SCHEDULING,
138-
)
141+
with log_context(_logger, logging.DEBUG, msg="distributing pipelines"):
142+
await limited_gather(
143+
*(
144+
request_pipeline_scheduling(
145+
rabbitmq_client,
146+
db_engine,
147+
user_id=run.user_id,
148+
project_id=run.project_uuid,
149+
iteration=run.iteration,
150+
)
151+
for run in runs_to_schedule + possibly_lost_scheduled_pipelines
152+
),
153+
limit=MAX_CONCURRENT_PIPELINE_SCHEDULING,
154+
)
139155
if runs_to_schedule:
140156
_logger.debug("distributed %d pipelines", len(runs_to_schedule))
141157

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ async def get(
5656
async def list(
5757
self,
5858
filter_by_state: set[RunningState] | None = None,
59+
need_scheduling: bool | None = None,
5960
scheduled_since: datetime.timedelta | None = None,
6061
) -> list[CompRunsAtDB]:
6162

@@ -69,15 +70,19 @@ async def list(
6970
]
7071
)
7172
)
73+
74+
scheduling_or_conditions = []
75+
if need_scheduling is not None:
76+
scheduling_or_conditions.append(comp_runs.c.last_scheduled.is_(None))
7277
if scheduled_since is not None:
7378
scheduled_cutoff = arrow.utcnow().datetime - scheduled_since
74-
conditions.append(
75-
or_(
76-
comp_runs.c.last_scheduled.is_(None),
77-
comp_runs.c.last_scheduled <= scheduled_cutoff,
78-
)
79+
scheduling_or_conditions.append(
80+
comp_runs.c.last_scheduled <= scheduled_cutoff
7981
)
8082

83+
if scheduling_or_conditions:
84+
conditions.append(sa.or_(*scheduling_or_conditions))
85+
8186
async with self.db_engine.acquire() as conn:
8287
return [
8388
CompRunsAtDB.model_validate(row)

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from simcore_service_director_v2.models.comp_pipelines import CompPipelineAtDB
3030
from simcore_service_director_v2.models.comp_runs import RunMetadataDict
3131
from simcore_service_director_v2.modules.comp_scheduler._manager import (
32+
_LOST_TASKS_FACTOR,
3233
SCHEDULER_INTERVAL,
3334
run_new_pipeline,
3435
schedule_all_pipelines,
@@ -37,6 +38,9 @@
3738
from simcore_service_director_v2.modules.comp_scheduler._models import (
3839
SchedulePipelineRabbitMessage,
3940
)
41+
from simcore_service_director_v2.modules.db.repositories.comp_runs import (
42+
CompRunsRepository,
43+
)
4044
from sqlalchemy.ext.asyncio import AsyncEngine
4145

4246
pytest_simcore_core_services_selection = ["postgres", "rabbit", "redis"]
@@ -141,6 +145,7 @@ async def test_schedule_all_pipelines(
141145
initialized_app: FastAPI,
142146
published_project: PublishedProject,
143147
sqlalchemy_async_engine: AsyncEngine,
148+
aiopg_engine,
144149
run_metadata: RunMetadataDict,
145150
scheduler_rabbit_client_parser: mock.AsyncMock,
146151
):
@@ -164,8 +169,7 @@ async def test_schedule_all_pipelines(
164169
).body()
165170
)
166171
scheduler_rabbit_client_parser.reset_mock()
167-
comp_runs = await assert_comp_runs(sqlalchemy_async_engine, expected_total=1)
168-
comp_run = comp_runs[0]
172+
comp_run = (await assert_comp_runs(sqlalchemy_async_engine, expected_total=1))[0]
169173
assert comp_run.project_uuid == published_project.project.uuid
170174
assert comp_run.user_id == published_project.project.prj_owner
171175
assert comp_run.iteration == 1
@@ -177,7 +181,7 @@ async def test_schedule_all_pipelines(
177181
start_schedule_time = comp_run.last_scheduled
178182
start_modified_time = comp_run.modified
179183

180-
# this will now not schedule the pipeline since it was last scheduled
184+
# this will now not schedule the pipeline since it was already scheduled
181185
await schedule_all_pipelines(initialized_app)
182186
scheduler_rabbit_client_parser.assert_not_called()
183187
comp_runs = await assert_comp_runs(sqlalchemy_async_engine, expected_total=1)
@@ -186,8 +190,15 @@ async def test_schedule_all_pipelines(
186190
assert comp_run.cancelled is None
187191
assert comp_run.modified == start_modified_time
188192

189-
# this will now schedule the pipeline since the time passed
190-
await asyncio.sleep(SCHEDULER_INTERVAL.total_seconds() + 1)
193+
# once the worker is done, the schedule time is set back to None
194+
await CompRunsRepository(aiopg_engine).mark_as_scheduled_done(
195+
user_id=comp_run.user_id,
196+
project_id=comp_run.project_uuid,
197+
iteration=comp_run.iteration,
198+
)
199+
200+
# now we schedule a pipeline again, but we wait for the scheduler interval to pass
201+
# this will trigger a new schedule
191202
await schedule_all_pipelines(initialized_app)
192203
scheduler_rabbit_client_parser.assert_called_once_with(
193204
SchedulePipelineRabbitMessage(
@@ -227,6 +238,87 @@ async def test_schedule_all_pipelines(
227238
assert comp_run.cancelled is not None
228239

229240

241+
async def test_schedule_all_pipelines_logs_error_if_it_find_old_pipelines(
242+
with_disabled_auto_scheduling: mock.Mock,
243+
with_disabled_scheduler_worker: mock.Mock,
244+
initialized_app: FastAPI,
245+
published_project: PublishedProject,
246+
sqlalchemy_async_engine: AsyncEngine,
247+
aiopg_engine,
248+
run_metadata: RunMetadataDict,
249+
scheduler_rabbit_client_parser: mock.AsyncMock,
250+
caplog: pytest.LogCaptureFixture,
251+
):
252+
await assert_comp_runs_empty(sqlalchemy_async_engine)
253+
assert published_project.project.prj_owner
254+
# now we schedule a pipeline
255+
await run_new_pipeline(
256+
initialized_app,
257+
user_id=published_project.project.prj_owner,
258+
project_id=published_project.project.uuid,
259+
cluster_id=DEFAULT_CLUSTER_ID,
260+
run_metadata=run_metadata,
261+
use_on_demand_clusters=False,
262+
)
263+
# this directly schedule a new pipeline
264+
scheduler_rabbit_client_parser.assert_called_once_with(
265+
SchedulePipelineRabbitMessage(
266+
user_id=published_project.project.prj_owner,
267+
project_id=published_project.project.uuid,
268+
iteration=1,
269+
).body()
270+
)
271+
scheduler_rabbit_client_parser.reset_mock()
272+
comp_run = (await assert_comp_runs(sqlalchemy_async_engine, expected_total=1))[0]
273+
assert comp_run.project_uuid == published_project.project.uuid
274+
assert comp_run.user_id == published_project.project.prj_owner
275+
assert comp_run.iteration == 1
276+
assert comp_run.cancelled is None
277+
assert comp_run.cluster_id == DEFAULT_CLUSTER_ID
278+
assert comp_run.metadata == run_metadata
279+
assert comp_run.result is RunningState.PUBLISHED
280+
assert comp_run.last_scheduled is not None
281+
start_schedule_time = comp_run.last_scheduled
282+
start_modified_time = comp_run.modified
283+
284+
# this will now not schedule the pipeline since it was already scheduled
285+
await schedule_all_pipelines(initialized_app)
286+
scheduler_rabbit_client_parser.assert_not_called()
287+
comp_runs = await assert_comp_runs(sqlalchemy_async_engine, expected_total=1)
288+
comp_run = comp_runs[0]
289+
assert comp_run.last_scheduled == start_schedule_time, "scheduled time changed!"
290+
assert comp_run.cancelled is None
291+
assert comp_run.modified == start_modified_time
292+
293+
# now we artificially set the last_schedule time well in the past
294+
await CompRunsRepository(aiopg_engine).update(
295+
comp_run.user_id,
296+
comp_run.project_uuid,
297+
comp_run.iteration,
298+
last_scheduled=datetime.datetime.now(tz=datetime.UTC)
299+
- SCHEDULER_INTERVAL * (_LOST_TASKS_FACTOR + 1),
300+
)
301+
with caplog.at_level(logging.ERROR):
302+
await schedule_all_pipelines(initialized_app)
303+
assert (
304+
"found 1 lost pipelines, they will be re-scheduled now" in caplog.messages
305+
)
306+
scheduler_rabbit_client_parser.assert_called_once_with(
307+
SchedulePipelineRabbitMessage(
308+
user_id=published_project.project.prj_owner,
309+
project_id=published_project.project.uuid,
310+
iteration=1,
311+
).body()
312+
)
313+
scheduler_rabbit_client_parser.reset_mock()
314+
comp_runs = await assert_comp_runs(sqlalchemy_async_engine, expected_total=1)
315+
comp_run = comp_runs[0]
316+
assert comp_run.last_scheduled is not None
317+
assert comp_run.last_scheduled > start_schedule_time
318+
assert comp_run.cancelled is None
319+
assert comp_run.modified > start_modified_time
320+
321+
230322
async def test_empty_pipeline_is_not_scheduled(
231323
with_disabled_auto_scheduling: mock.Mock,
232324
with_disabled_scheduler_worker: mock.Mock,

0 commit comments

Comments
 (0)