diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_filter.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_filter.py index 227358f4960e..919d777921ef 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_filter.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_filter.py @@ -24,7 +24,7 @@ PortEvent: TypeAlias = str | None -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) _1_MB: Final[PositiveInt] = TypeAdapter(ByteSize).validate_python("1mib") @@ -164,14 +164,14 @@ async def _worker_upload_events(self) -> None: if port_key is None: break - logger.debug("Request upload for port_key %s", port_key) + _logger.debug("Request upload for port_key %s", port_key) await self.outputs_manager.port_key_content_changed(port_key) async def enqueue(self, port_key: str) -> None: await self._incoming_events_queue.put(port_key) async def start(self) -> None: - with log_context(logger, logging.INFO, f"{EventFilter.__name__} start"): + with log_context(_logger, logging.INFO, f"{EventFilter.__name__} start"): self._task_incoming_event_ingestion = create_task( self._worker_incoming_event_ingestion(), name=self._worker_incoming_event_ingestion.__name__, @@ -195,7 +195,7 @@ async def _cancel_task(task: Task | None) -> None: with suppress(CancelledError): await task - with log_context(logger, logging.INFO, f"{EventFilter.__name__} shutdown"): + with log_context(_logger, logging.INFO, f"{EventFilter.__name__} shutdown"): await self._incoming_events_queue.put(None) await _cancel_task(self._task_incoming_event_ingestion) diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py index 0a275d1d70bb..52c573bc86c2 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py @@ -3,7 +3,7 @@ import asyncio from pathlib import Path -from typing import AsyncIterator, Iterator +from typing import AsyncIterator from unittest.mock import AsyncMock import pytest @@ -24,12 +24,12 @@ from tenacity.stop import stop_after_delay from tenacity.wait import wait_fixed -_TENACITY_RETRY_PARAMS = dict( - reraise=True, - retry=retry_if_exception_type(AssertionError), - stop=stop_after_delay(10), - wait=wait_fixed(0.01), -) +_TENACITY_RETRY_PARAMS = { + "reraise": True, + "retry": retry_if_exception_type(AssertionError), + "stop": stop_after_delay(10), + "wait": wait_fixed(0.01), +} # FIXTURES @@ -75,11 +75,11 @@ async def outputs_manager( @pytest.fixture def mocked_port_key_content_changed( mocker: MockerFixture, outputs_manager: OutputsManager -) -> Iterator[AsyncMock]: +) -> AsyncMock: async def _mock_upload_outputs(*args, **kwargs) -> None: pass - yield mocker.patch.object( + return mocker.patch.object( outputs_manager, "port_key_content_changed", side_effect=_mock_upload_outputs ) @@ -101,8 +101,8 @@ def get_wait_interval(self, dir_size: NonNegativeInt) -> NonNegativeFloat: @pytest.fixture -def mock_get_directory_total_size(mocker: MockerFixture) -> Iterator[AsyncMock]: - yield mocker.patch( +def mock_get_directory_total_size(mocker: MockerFixture) -> AsyncMock: + return mocker.patch( "simcore_service_dynamic_sidecar.modules.outputs._event_filter.get_directory_total_size", return_value=1, ) @@ -127,10 +127,6 @@ async def _wait_for_event_to_trigger(event_filter: EventFilter) -> None: await asyncio.sleep(event_filter.delay_policy.get_min_interval() * 5) -async def _wait_for_event_to_trigger_big_directory(event_filter: EventFilter) -> None: - await asyncio.sleep(event_filter.delay_policy.get_wait_interval(1) * 2) - - # TESTS @@ -170,8 +166,9 @@ async def test_always_trigger_after_delay( # event trigger after correct interval delay correctly for expected_call_count in range(1, 10): await event_filter.enqueue(port_key_1) - await _wait_for_event_to_trigger_big_directory(event_filter) - assert mocked_port_key_content_changed.call_count == expected_call_count + async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): + with attempt: + assert mocked_port_key_content_changed.call_count == expected_call_count async def test_minimum_amount_of_get_directory_total_size_calls( @@ -190,7 +187,6 @@ async def test_minimum_amount_of_get_directory_total_size_calls( assert mocked_port_key_content_changed.call_count == 0 # event finished processing and was dispatched - await _wait_for_event_to_trigger_big_directory(event_filter) async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): with attempt: assert mock_get_directory_total_size.call_count == 2 @@ -221,9 +217,10 @@ async def test_minimum_amount_of_get_directory_total_size_calls_with_continuous_ assert mocked_port_key_content_changed.call_count == 0 # event finished processing and was dispatched - await _wait_for_event_to_trigger_big_directory(event_filter) - assert mock_get_directory_total_size.call_count == 2 - assert mocked_port_key_content_changed.call_count == 1 + async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS): + with attempt: + assert mock_get_directory_total_size.call_count == 2 + assert mocked_port_key_content_changed.call_count == 1 def test_default_delay_policy():