Skip to content

Commit 5f3b2f5

Browse files
committed
listing with processed since works
1 parent 83a8613 commit 5f3b2f5

File tree

4 files changed

+88
-8
lines changed

4 files changed

+88
-8
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ async def _set_schedule_done(
233233
project_id: ProjectID,
234234
iteration: Iteration,
235235
) -> None:
236-
await CompRunsRepository.instance(self.db_engine).mark_scheduling_done(
236+
await CompRunsRepository.instance(self.db_engine).mark_as_processed(
237237
user_id=user_id,
238238
project_id=project_id,
239239
iteration=iteration,

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ async def list(
6464
*,
6565
filter_by_state: set[RunningState] | None = None,
6666
never_scheduled: bool = False,
67-
processed_before: datetime.datetime | None = None,
67+
processed_since: datetime.timedelta | None = None,
6868
scheduled_after: datetime.timedelta | None = None,
6969
) -> list[CompRunsAtDB]:
7070
conditions = []
@@ -85,8 +85,9 @@ async def list(
8585
scheduled_cutoff = arrow.utcnow().datetime - scheduled_after
8686
scheduling_or_conditions.append(comp_runs.c.scheduled <= scheduled_cutoff)
8787

88-
if processed_before is not None:
89-
scheduling_or_conditions.append(comp_runs.c.processed <= processed_before)
88+
if processed_since is not None:
89+
processed_cutoff = arrow.utcnow().datetime - processed_since
90+
scheduling_or_conditions.append(comp_runs.c.processed <= processed_cutoff)
9091

9192
if scheduling_or_conditions:
9293
conditions.append(sa.or_(*scheduling_or_conditions))
@@ -210,7 +211,7 @@ async def mark_for_scheduling(
210211
processed=None,
211212
)
212213

213-
async def mark_scheduling_done(
214+
async def mark_as_processed(
214215
self, *, user_id: UserID, project_id: ProjectID, iteration: PositiveInt
215216
) -> CompRunsAtDB | None:
216217
return await self.update(

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_db_repositories_comp_runs.py

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@
77

88
import asyncio
99
import datetime
10+
import random
1011
from collections.abc import Awaitable, Callable
12+
from typing import cast
1113

14+
import arrow
1215
import pytest
1316
from _helpers import PublishedProject
1417
from faker import Faker
@@ -23,6 +26,9 @@
2326
UserNotFoundError,
2427
)
2528
from simcore_service_director_v2.models.comp_runs import CompRunsAtDB, RunMetadataDict
29+
from simcore_service_director_v2.modules.comp_scheduler._constants import (
30+
SCHEDULER_INTERVAL,
31+
)
2632
from simcore_service_director_v2.modules.db.repositories.comp_runs import (
2733
CompRunsRepository,
2834
)
@@ -117,14 +123,87 @@ async def test_list(
117123
)
118124
== []
119125
)
120-
121126
assert sorted(
122127
await CompRunsRepository(aiopg_engine).list(
123128
filter_by_state={RunningState.PUBLISHED}
124129
),
125130
key=lambda x: x.iteration,
126131
) == sorted(created, key=lambda x: x.iteration)
127132

133+
# test with never scheduled filter, let's create a bunch of scheduled entries,
134+
assert sorted(
135+
await CompRunsRepository(aiopg_engine).list(never_scheduled=True),
136+
key=lambda x: x.iteration,
137+
) == sorted(created, key=lambda x: x.iteration)
138+
comp_runs_marked_for_scheduling = random.sample(created, k=25)
139+
await asyncio.gather(
140+
*(
141+
CompRunsRepository(aiopg_engine).mark_for_scheduling(
142+
user_id=comp_run.user_id,
143+
project_id=comp_run.project_uuid,
144+
iteration=comp_run.iteration,
145+
)
146+
for comp_run in comp_runs_marked_for_scheduling
147+
)
148+
)
149+
# filter them away
150+
created = [r for r in created if r not in comp_runs_marked_for_scheduling]
151+
assert sorted(
152+
await CompRunsRepository(aiopg_engine).list(never_scheduled=True),
153+
key=lambda x: x.iteration,
154+
) == sorted(created, key=lambda x: x.iteration)
155+
156+
# now mark a few of them as processed
157+
comp_runs_marked_as_processed = random.sample(comp_runs_marked_for_scheduling, k=11)
158+
await asyncio.gather(
159+
*(
160+
CompRunsRepository(aiopg_engine).mark_as_processed(
161+
user_id=comp_run.user_id,
162+
project_id=comp_run.project_uuid,
163+
iteration=comp_run.iteration,
164+
)
165+
for comp_run in comp_runs_marked_as_processed
166+
)
167+
)
168+
# filter them away
169+
comp_runs_marked_for_scheduling = [
170+
r
171+
for r in comp_runs_marked_for_scheduling
172+
if r not in comp_runs_marked_as_processed
173+
]
174+
# since they were just marked as processed now, we will get nothing
175+
assert (
176+
sorted(
177+
await CompRunsRepository(aiopg_engine).list(
178+
never_scheduled=False, processed_since=SCHEDULER_INTERVAL
179+
),
180+
key=lambda x: x.iteration,
181+
)
182+
== []
183+
)
184+
# now we artificially change the processed time and set it 2x the scheduler interval
185+
fake_processed_time = arrow.utcnow().datetime - 2 * SCHEDULER_INTERVAL
186+
comp_runs_marked_as_processed = await asyncio.gather(
187+
*(
188+
CompRunsRepository(aiopg_engine).update(
189+
user_id=comp_run.user_id,
190+
project_id=comp_run.project_uuid,
191+
iteration=comp_run.iteration,
192+
processed=fake_processed_time,
193+
)
194+
for comp_run in comp_runs_marked_as_processed
195+
)
196+
)
197+
# now we should get them
198+
assert sorted(
199+
await CompRunsRepository(aiopg_engine).list(
200+
never_scheduled=False, processed_since=SCHEDULER_INTERVAL
201+
),
202+
key=lambda x: x.iteration,
203+
) == sorted(
204+
comp_runs_marked_as_processed, key=lambda x: cast(CompRunsAtDB, x).iteration
205+
)
206+
128207

129208
async def test_create(
130209
aiopg_engine,
@@ -387,7 +466,7 @@ async def test_mark_scheduling_done(
387466
assert created.scheduled is None
388467
assert created.processed is None
389468

390-
updated = await CompRunsRepository(aiopg_engine).mark_scheduling_done(
469+
updated = await CompRunsRepository(aiopg_engine).mark_as_processed(
391470
user_id=created.user_id,
392471
project_id=created.project_uuid,
393472
iteration=created.iteration,

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ async def test_schedule_all_pipelines(
191191
assert comp_run.modified == start_modified_time
192192

193193
# once the worker is done, the schedule time is set back to None
194-
await CompRunsRepository(aiopg_engine).mark_scheduling_done(
194+
await CompRunsRepository(aiopg_engine).mark_as_processed(
195195
user_id=comp_run.user_id,
196196
project_id=comp_run.project_uuid,
197197
iteration=comp_run.iteration,

0 commit comments

Comments
 (0)