Skip to content

Commit 1c5627b

Browse files
committed
handling of processed and scheduled
1 parent 5f3b2f5 commit 1c5627b

File tree

5 files changed

+92
-23
lines changed

5 files changed

+92
-23
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22
from typing import Final
33

4-
import arrow
54
import networkx as nx
65
from aiopg.sa import Engine
76
from fastapi import FastAPI
@@ -126,13 +125,13 @@ async def schedule_all_pipelines(app: FastAPI) -> None:
126125
runs_to_schedule = await CompRunsRepository.instance(db_engine).list(
127126
filter_by_state=SCHEDULED_STATES,
128127
never_scheduled=True,
129-
processed_before=arrow.utcnow().datetime - SCHEDULER_INTERVAL,
128+
processed_since=SCHEDULER_INTERVAL,
130129
)
131130
possibly_lost_scheduled_pipelines = await CompRunsRepository.instance(
132131
db_engine
133132
).list(
134133
filter_by_state=SCHEDULED_STATES,
135-
scheduled_after=SCHEDULER_INTERVAL * _LOST_TASKS_FACTOR,
134+
scheduled_since=SCHEDULER_INTERVAL * _LOST_TASKS_FACTOR,
136135
)
137136
if possibly_lost_scheduled_pipelines:
138137
_logger.error(

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async def list(
6565
filter_by_state: set[RunningState] | None = None,
6666
never_scheduled: bool = False,
6767
processed_since: datetime.timedelta | None = None,
68-
scheduled_after: datetime.timedelta | None = None,
68+
scheduled_since: datetime.timedelta | None = None,
6969
) -> list[CompRunsAtDB]:
7070
conditions = []
7171
if filter_by_state:
@@ -81,13 +81,31 @@ async def list(
8181
scheduling_or_conditions = []
8282
if never_scheduled:
8383
scheduling_or_conditions.append(comp_runs.c.scheduled.is_(None))
84-
if scheduled_after is not None:
85-
scheduled_cutoff = arrow.utcnow().datetime - scheduled_after
86-
scheduling_or_conditions.append(comp_runs.c.scheduled <= scheduled_cutoff)
84+
if scheduled_since is not None:
85+
# a scheduled run is a run that has been scheduled but not processed yet
86+
# e.g. the processing timepoint is either null or before the scheduling timepoint
87+
scheduled_cutoff = arrow.utcnow().datetime - scheduled_since
88+
scheduling_filter = (
89+
comp_runs.c.scheduled.is_not(None)
90+
& (
91+
comp_runs.c.processed.is_(None)
92+
| (comp_runs.c.scheduled > comp_runs.c.processed)
93+
)
94+
& (comp_runs.c.scheduled <= scheduled_cutoff)
95+
)
96+
scheduling_or_conditions.append(scheduling_filter)
8797

8898
if processed_since is not None:
99+
# a processed run is a run that has been scheduled and processed
100+
# and the processing timepoint is after the scheduling timepoint
89101
processed_cutoff = arrow.utcnow().datetime - processed_since
90-
scheduling_or_conditions.append(comp_runs.c.processed <= processed_cutoff)
102+
processed_filter = (
103+
comp_runs.c.processed.is_not(None)
104+
& (comp_runs.c.processed > comp_runs.c.scheduled)
105+
& (comp_runs.c.processed <= processed_cutoff)
106+
)
107+
108+
scheduling_or_conditions.append(processed_filter)
91109

92110
if scheduling_or_conditions:
93111
conditions.append(sa.or_(*scheduling_or_conditions))

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -181,17 +181,25 @@ async def test_list(
181181
)
182182
== []
183183
)
184-
# now we artificially change the processed time and set it 2x the scheduler interval
185-
fake_processed_time = arrow.utcnow().datetime - 2 * SCHEDULER_INTERVAL
186-
comp_runs_marked_as_processed = await asyncio.gather(
187-
*(
188-
CompRunsRepository(aiopg_engine).update(
189-
user_id=comp_run.user_id,
190-
project_id=comp_run.project_uuid,
191-
iteration=comp_run.iteration,
192-
processed=fake_processed_time,
193-
)
194-
for comp_run in comp_runs_marked_as_processed
184+
# now we artificially change the scheduled/processed time and set it 2x the scheduler interval
185+
# these are correctly processed ones, so we should get them back
186+
fake_scheduled_time = arrow.utcnow().datetime - 2 * SCHEDULER_INTERVAL
187+
fake_processed_time = fake_scheduled_time + 0.5 * SCHEDULER_INTERVAL
188+
comp_runs_marked_as_processed = (
189+
cast( # NOTE: the cast here is ok since gather will raise if there is an error
190+
list[CompRunsAtDB],
191+
await asyncio.gather(
192+
*(
193+
CompRunsRepository(aiopg_engine).update(
194+
user_id=comp_run.user_id,
195+
project_id=comp_run.project_uuid,
196+
iteration=comp_run.iteration,
197+
scheduled=fake_scheduled_time,
198+
processed=fake_processed_time,
199+
)
200+
for comp_run in comp_runs_marked_as_processed
201+
)
202+
),
195203
)
196204
)
197205
# now we should get them
@@ -200,8 +208,48 @@ async def test_list(
200208
never_scheduled=False, processed_since=SCHEDULER_INTERVAL
201209
),
202210
key=lambda x: x.iteration,
211+
) == sorted(comp_runs_marked_as_processed, key=lambda x: x.iteration)
212+
213+
# now some of them were never processed (e.g. processed time is either null or before schedule time)
214+
comp_runs_waiting_for_processing_or_never_processed = random.sample(
215+
comp_runs_marked_as_processed, k=6
216+
)
217+
comp_runs_marked_as_processed = [
218+
r
219+
for r in comp_runs_marked_as_processed
220+
if r not in comp_runs_waiting_for_processing_or_never_processed
221+
]
222+
# now we artificially change the processed time to be before the scheduled time
223+
comp_runs_waiting_for_processing_or_never_processed = cast(
224+
list[CompRunsAtDB],
225+
await asyncio.gather(
226+
*(
227+
CompRunsRepository(aiopg_engine).update(
228+
user_id=comp_run.user_id,
229+
project_id=comp_run.project_uuid,
230+
iteration=comp_run.iteration,
231+
scheduled=fake_processed_time, # NOTE: we invert here the timings
232+
processed=random.choice([fake_scheduled_time, None]), # noqa: S311
233+
)
234+
for comp_run in comp_runs_waiting_for_processing_or_never_processed
235+
)
236+
),
237+
)
238+
# so the processed ones shall remain
239+
assert sorted(
240+
await CompRunsRepository(aiopg_engine).list(
241+
never_scheduled=False, processed_since=SCHEDULER_INTERVAL
242+
),
243+
key=lambda x: x.iteration,
244+
) == sorted(comp_runs_marked_as_processed, key=lambda x: x.iteration)
245+
# the ones waiting for scheduling now
246+
assert sorted(
247+
await CompRunsRepository(aiopg_engine).list(
248+
never_scheduled=False, scheduled_since=SCHEDULER_INTERVAL
249+
),
250+
key=lambda x: x.iteration,
203251
) == sorted(
204-
comp_runs_marked_as_processed, key=lambda x: cast(CompRunsAtDB, x).iteration
252+
comp_runs_waiting_for_processing_or_never_processed, key=lambda x: x.iteration
205253
)
206254

207255

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ async def test_schedule_all_pipelines(
178178
assert comp_run.metadata == run_metadata
179179
assert comp_run.result is RunningState.PUBLISHED
180180
assert comp_run.scheduled is not None
181+
assert comp_run.processed is None
181182
start_schedule_time = comp_run.scheduled
182183
start_modified_time = comp_run.modified
183184

@@ -186,15 +187,18 @@ async def test_schedule_all_pipelines(
186187
scheduler_rabbit_client_parser.assert_not_called()
187188
comp_runs = await assert_comp_runs(sqlalchemy_async_engine, expected_total=1)
188189
comp_run = comp_runs[0]
190+
assert comp_run.scheduled
189191
assert comp_run.scheduled == start_schedule_time, "scheduled time changed!"
190192
assert comp_run.cancelled is None
191193
assert comp_run.modified == start_modified_time
192194

193-
# once the worker is done, the schedule time is set back to None
194-
await CompRunsRepository(aiopg_engine).mark_as_processed(
195+
# to simulate that the worker did its job we will set times in the past
196+
await CompRunsRepository(aiopg_engine).update(
195197
user_id=comp_run.user_id,
196198
project_id=comp_run.project_uuid,
197199
iteration=comp_run.iteration,
200+
scheduled=comp_run.scheduled - 1.5 * SCHEDULER_INTERVAL,
201+
processed=comp_run.scheduled - 1.1 * SCHEDULER_INTERVAL,
198202
)
199203

200204
# now we schedule a pipeline again, but we wait for the scheduler interval to pass

services/director-v2/tests/unit/with_dbs/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ async def _(
123123
),
124124
"node_class": to_node_class(node_data.key),
125125
"internal_id": internal_id + 1,
126-
"submit": datetime.datetime.now(),
126+
"submit": datetime.datetime.now(datetime.UTC),
127127
"job_id": generate_dask_job_id(
128128
service_key=node_data.key,
129129
service_version=node_data.version,

0 commit comments

Comments
 (0)