Skip to content

Commit 597bd5f

Browse files
committed
createing tests for repository
1 parent f793456 commit 597bd5f

File tree

5 files changed

+88
-31
lines changed

5 files changed

+88
-31
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
from typing import Final
33

4+
import arrow
45
import networkx as nx
56
from aiopg.sa import Engine
67
from fastapi import FastAPI
@@ -123,13 +124,15 @@ async def schedule_all_pipelines(app: FastAPI) -> None:
123124
with log_context(_logger, logging.DEBUG, msg="scheduling pipelines"):
124125
db_engine = get_db_engine(app)
125126
runs_to_schedule = await CompRunsRepository.instance(db_engine).list(
126-
filter_by_state=SCHEDULED_STATES, need_scheduling=True
127+
filter_by_state=SCHEDULED_STATES,
128+
never_scheduled=True,
129+
processed_before=arrow.utcnow().datetime - SCHEDULER_INTERVAL,
127130
)
128131
possibly_lost_scheduled_pipelines = await CompRunsRepository.instance(
129132
db_engine
130133
).list(
131134
filter_by_state=SCHEDULED_STATES,
132-
scheduled_since=SCHEDULER_INTERVAL * _LOST_TASKS_FACTOR,
135+
scheduled_after=SCHEDULER_INTERVAL * _LOST_TASKS_FACTOR,
133136
)
134137
if possibly_lost_scheduled_pipelines:
135138
_logger.error(

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ async def _set_schedule_done(
233233
project_id: ProjectID,
234234
iteration: Iteration,
235235
) -> None:
236-
await CompRunsRepository.instance(self.db_engine).mark_as_scheduled_done(
236+
await CompRunsRepository.instance(self.db_engine).mark_scheduling_done(
237237
user_id=user_id,
238238
project_id=project_id,
239239
iteration=iteration,

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,12 @@ async def get(
5555

5656
async def list(
5757
self,
58+
*,
5859
filter_by_state: set[RunningState] | None = None,
59-
need_scheduling: bool | None = None,
60-
scheduled_since: datetime.timedelta | None = None,
60+
never_scheduled: bool = False,
61+
processed_before: datetime.datetime | None = None,
62+
scheduled_after: datetime.timedelta | None = None,
6163
) -> list[CompRunsAtDB]:
62-
6364
conditions = []
6465
if filter_by_state:
6566
conditions.append(
@@ -72,13 +73,14 @@ async def list(
7273
)
7374

7475
scheduling_or_conditions = []
75-
if need_scheduling is not None:
76-
scheduling_or_conditions.append(comp_runs.c.last_scheduled.is_(None))
77-
if scheduled_since is not None:
78-
scheduled_cutoff = arrow.utcnow().datetime - scheduled_since
79-
scheduling_or_conditions.append(
80-
comp_runs.c.last_scheduled <= scheduled_cutoff
81-
)
76+
if never_scheduled:
77+
scheduling_or_conditions.append(comp_runs.c.scheduled.is_(None))
78+
if scheduled_after is not None:
79+
scheduled_cutoff = arrow.utcnow().datetime - scheduled_after
80+
scheduling_or_conditions.append(comp_runs.c.scheduled <= scheduled_cutoff)
81+
82+
if processed_before is not None:
83+
scheduling_or_conditions.append(comp_runs.c.processed <= processed_before)
8284

8385
if scheduling_or_conditions:
8486
conditions.append(sa.or_(*scheduling_or_conditions))
@@ -189,15 +191,16 @@ async def mark_for_scheduling(
189191
user_id,
190192
project_id,
191193
iteration,
192-
last_scheduled=arrow.utcnow().datetime,
194+
scheduled=arrow.utcnow().datetime,
195+
processed=None,
193196
)
194197

195-
async def mark_as_scheduled_done(
198+
async def mark_scheduling_done(
196199
self, *, user_id: UserID, project_id: ProjectID, iteration: PositiveInt
197200
) -> CompRunsAtDB | None:
198201
return await self.update(
199202
user_id,
200203
project_id,
201204
iteration,
202-
last_scheduled=None,
205+
processed=arrow.utcnow().datetime,
203206
)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import pytest
2+
from models_library.projects import ProjectID
3+
from models_library.users import UserID
4+
from simcore_service_director_v2.core.errors import ComputationalRunNotFoundError
5+
from simcore_service_director_v2.modules.db.repositories.comp_runs import (
6+
CompRunsRepository,
7+
)
8+
9+
pytest_simcore_core_services_selection = [
10+
"postgres",
11+
]
12+
pytest_simcore_ops_services_selection = [
13+
"adminer",
14+
]
15+
16+
17+
async def test_get(aiopg_engine, user_id: UserID, project_id: ProjectID):
18+
with pytest.raises(ComputationalRunNotFoundError):
19+
await CompRunsRepository(aiopg_engine).get(user_id, project_id)
20+
21+
22+
async def test_list():
23+
...
24+
25+
26+
async def test_create():
27+
...
28+
29+
30+
async def test_update():
31+
...
32+
33+
34+
async def test_delete():
35+
...
36+
37+
38+
async def test_set_run_result():
39+
...
40+
41+
42+
async def test_mark_for_cancellation():
43+
...
44+
45+
46+
async def test_mark_for_scheduling():
47+
...
48+
49+
50+
async def test_mark_scheduling_done():
51+
...

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -177,21 +177,21 @@ async def test_schedule_all_pipelines(
177177
assert comp_run.cluster_id == DEFAULT_CLUSTER_ID
178178
assert comp_run.metadata == run_metadata
179179
assert comp_run.result is RunningState.PUBLISHED
180-
assert comp_run.last_scheduled is not None
181-
start_schedule_time = comp_run.last_scheduled
180+
assert comp_run.scheduled is not None
181+
start_schedule_time = comp_run.scheduled
182182
start_modified_time = comp_run.modified
183183

184184
# this will now not schedule the pipeline since it was already scheduled
185185
await schedule_all_pipelines(initialized_app)
186186
scheduler_rabbit_client_parser.assert_not_called()
187187
comp_runs = await assert_comp_runs(sqlalchemy_async_engine, expected_total=1)
188188
comp_run = comp_runs[0]
189-
assert comp_run.last_scheduled == start_schedule_time, "scheduled time changed!"
189+
assert comp_run.scheduled == start_schedule_time, "scheduled time changed!"
190190
assert comp_run.cancelled is None
191191
assert comp_run.modified == start_modified_time
192192

193193
# once the worker is done, the schedule time is set back to None
194-
await CompRunsRepository(aiopg_engine).mark_as_scheduled_done(
194+
await CompRunsRepository(aiopg_engine).mark_scheduling_done(
195195
user_id=comp_run.user_id,
196196
project_id=comp_run.project_uuid,
197197
iteration=comp_run.iteration,
@@ -210,9 +210,9 @@ async def test_schedule_all_pipelines(
210210
scheduler_rabbit_client_parser.reset_mock()
211211
comp_runs = await assert_comp_runs(sqlalchemy_async_engine, expected_total=1)
212212
comp_run = comp_runs[0]
213-
assert comp_run.last_scheduled is not None
214-
assert comp_run.last_scheduled > start_schedule_time
215-
last_schedule_time = comp_run.last_scheduled
213+
assert comp_run.scheduled is not None
214+
assert comp_run.scheduled > start_schedule_time
215+
last_schedule_time = comp_run.scheduled
216216
assert comp_run.cancelled is None
217217
assert comp_run.modified > start_modified_time
218218

@@ -233,8 +233,8 @@ async def test_schedule_all_pipelines(
233233
scheduler_rabbit_client_parser.reset_mock()
234234
comp_runs = await assert_comp_runs(sqlalchemy_async_engine, expected_total=1)
235235
comp_run = comp_runs[0]
236-
assert comp_run.last_scheduled is not None
237-
assert comp_run.last_scheduled > last_schedule_time
236+
assert comp_run.scheduled is not None
237+
assert comp_run.scheduled > last_schedule_time
238238
assert comp_run.cancelled is not None
239239

240240

@@ -277,16 +277,16 @@ async def test_schedule_all_pipelines_logs_error_if_it_find_old_pipelines(
277277
assert comp_run.cluster_id == DEFAULT_CLUSTER_ID
278278
assert comp_run.metadata == run_metadata
279279
assert comp_run.result is RunningState.PUBLISHED
280-
assert comp_run.last_scheduled is not None
281-
start_schedule_time = comp_run.last_scheduled
280+
assert comp_run.scheduled is not None
281+
start_schedule_time = comp_run.scheduled
282282
start_modified_time = comp_run.modified
283283

284284
# this will now not schedule the pipeline since it was already scheduled
285285
await schedule_all_pipelines(initialized_app)
286286
scheduler_rabbit_client_parser.assert_not_called()
287287
comp_runs = await assert_comp_runs(sqlalchemy_async_engine, expected_total=1)
288288
comp_run = comp_runs[0]
289-
assert comp_run.last_scheduled == start_schedule_time, "scheduled time changed!"
289+
assert comp_run.scheduled == start_schedule_time, "scheduled time changed!"
290290
assert comp_run.cancelled is None
291291
assert comp_run.modified == start_modified_time
292292

@@ -295,7 +295,7 @@ async def test_schedule_all_pipelines_logs_error_if_it_find_old_pipelines(
295295
comp_run.user_id,
296296
comp_run.project_uuid,
297297
comp_run.iteration,
298-
last_scheduled=datetime.datetime.now(tz=datetime.UTC)
298+
scheduled=datetime.datetime.now(tz=datetime.UTC)
299299
- SCHEDULER_INTERVAL * (_LOST_TASKS_FACTOR + 1),
300300
)
301301
with caplog.at_level(logging.ERROR):
@@ -313,8 +313,8 @@ async def test_schedule_all_pipelines_logs_error_if_it_find_old_pipelines(
313313
scheduler_rabbit_client_parser.reset_mock()
314314
comp_runs = await assert_comp_runs(sqlalchemy_async_engine, expected_total=1)
315315
comp_run = comp_runs[0]
316-
assert comp_run.last_scheduled is not None
317-
assert comp_run.last_scheduled > start_schedule_time
316+
assert comp_run.scheduled is not None
317+
assert comp_run.scheduled > start_schedule_time
318318
assert comp_run.cancelled is None
319319
assert comp_run.modified > start_modified_time
320320

0 commit comments

Comments
 (0)