Skip to content

Commit 3cd1dd6

Browse files
committed
add waiting for calls
1 parent f3c18b1 commit 3cd1dd6

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from unittest import mock
1313

1414
import pytest
15+
from tenacity import retry, stop_after_delay, wait_fixed
1516
from _helpers import PublishedProject
1617
from fastapi import FastAPI
1718
from pytest_mock import MockerFixture
@@ -25,6 +26,7 @@
2526
from simcore_service_director_v2.modules.comp_scheduler._worker import (
2627
_get_scheduler_worker,
2728
)
29+
from settings_library.rabbit import RabbitSettings
2830

2931
pytest_simcore_core_services_selection = ["postgres", "rabbit", "redis"]
3032
pytest_simcore_ops_services_selection = ["adminer"]
@@ -93,19 +95,21 @@ def with_scheduling_concurrency(
9395
)
9496

9597

98+
@pytest.mark.testit
9699
@pytest.mark.parametrize("scheduling_concurrency", [1, 50, 100])
97100
@pytest.mark.parametrize(
98101
"queue_name", [SchedulePipelineRabbitMessage.get_channel_name()]
99102
)
100103
async def test_worker_scheduling_parallelism(
104+
rabbit_service: RabbitSettings,
105+
ensure_parametrized_queue_is_empty: None,
101106
scheduling_concurrency: int,
102107
with_scheduling_concurrency: EnvVarsDict,
103108
with_disabled_auto_scheduling: mock.Mock,
104109
mocked_scheduler_api: mock.Mock,
105110
initialized_app: FastAPI,
106111
publish_project: Callable[[], Awaitable[PublishedProject]],
107112
run_metadata: RunMetadataDict,
108-
ensure_parametrized_queue_is_empty: None,
109113
):
110114
with_disabled_auto_scheduling.assert_called_once()
111115

@@ -125,8 +129,15 @@ async def _project_pipeline_creation_workflow() -> None:
125129
use_on_demand_clusters=False,
126130
)
127131

132+
# whatever scheduling concurrency we call in here, we shall always see the same number of calls to the scheduler
128133
await asyncio.gather(
129134
*(_project_pipeline_creation_workflow() for _ in range(scheduling_concurrency))
130135
)
136+
# the call to run the pipeline is async so we need to wait here
131137
mocked_scheduler_api.assert_called()
132-
assert mocked_scheduler_api.call_count == scheduling_concurrency
138+
139+
@retry(stop=stop_after_delay(5), reraise=True, wait=wait_fixed(0.5))
140+
def _assert_expected_called():
141+
assert mocked_scheduler_api.call_count == scheduling_concurrency
142+
143+
_assert_expected_called()

0 commit comments

Comments
 (0)