Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

PortEvent: TypeAlias = str | None

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)


_1_MB: Final[PositiveInt] = TypeAdapter(ByteSize).validate_python("1mib")
Expand Down Expand Up @@ -164,14 +164,14 @@ async def _worker_upload_events(self) -> None:
if port_key is None:
break

logger.debug("Request upload for port_key %s", port_key)
_logger.debug("Request upload for port_key %s", port_key)
await self.outputs_manager.port_key_content_changed(port_key)

async def enqueue(self, port_key: str) -> None:
await self._incoming_events_queue.put(port_key)

async def start(self) -> None:
with log_context(logger, logging.INFO, f"{EventFilter.__name__} start"):
with log_context(_logger, logging.INFO, f"{EventFilter.__name__} start"):
self._task_incoming_event_ingestion = create_task(
self._worker_incoming_event_ingestion(),
name=self._worker_incoming_event_ingestion.__name__,
Expand All @@ -195,7 +195,7 @@ async def _cancel_task(task: Task | None) -> None:
with suppress(CancelledError):
await task

with log_context(logger, logging.INFO, f"{EventFilter.__name__} shutdown"):
with log_context(_logger, logging.INFO, f"{EventFilter.__name__} shutdown"):
await self._incoming_events_queue.put(None)
await _cancel_task(self._task_incoming_event_ingestion)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import asyncio
from pathlib import Path
from typing import AsyncIterator, Iterator
from typing import AsyncIterator
from unittest.mock import AsyncMock

import pytest
Expand All @@ -24,12 +24,12 @@
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed

_TENACITY_RETRY_PARAMS = dict(
reraise=True,
retry=retry_if_exception_type(AssertionError),
stop=stop_after_delay(10),
wait=wait_fixed(0.01),
)
_TENACITY_RETRY_PARAMS = {
"reraise": True,
"retry": retry_if_exception_type(AssertionError),
"stop": stop_after_delay(10),
"wait": wait_fixed(0.01),
}

# FIXTURES

Expand Down Expand Up @@ -75,11 +75,11 @@ async def outputs_manager(
@pytest.fixture
def mocked_port_key_content_changed(
mocker: MockerFixture, outputs_manager: OutputsManager
) -> Iterator[AsyncMock]:
) -> AsyncMock:
async def _mock_upload_outputs(*args, **kwargs) -> None:
pass

yield mocker.patch.object(
return mocker.patch.object(
outputs_manager, "port_key_content_changed", side_effect=_mock_upload_outputs
)

Expand All @@ -101,8 +101,8 @@ def get_wait_interval(self, dir_size: NonNegativeInt) -> NonNegativeFloat:


@pytest.fixture
def mock_get_directory_total_size(mocker: MockerFixture) -> Iterator[AsyncMock]:
yield mocker.patch(
def mock_get_directory_total_size(mocker: MockerFixture) -> AsyncMock:
return mocker.patch(
"simcore_service_dynamic_sidecar.modules.outputs._event_filter.get_directory_total_size",
return_value=1,
)
Expand All @@ -127,10 +127,6 @@ async def _wait_for_event_to_trigger(event_filter: EventFilter) -> None:
await asyncio.sleep(event_filter.delay_policy.get_min_interval() * 5)


async def _wait_for_event_to_trigger_big_directory(event_filter: EventFilter) -> None:
await asyncio.sleep(event_filter.delay_policy.get_wait_interval(1) * 2)


# TESTS


Expand Down Expand Up @@ -170,8 +166,9 @@ async def test_always_trigger_after_delay(
# event trigger after correct interval delay correctly
for expected_call_count in range(1, 10):
await event_filter.enqueue(port_key_1)
await _wait_for_event_to_trigger_big_directory(event_filter)
assert mocked_port_key_content_changed.call_count == expected_call_count
async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS):
with attempt:
assert mocked_port_key_content_changed.call_count == expected_call_count


async def test_minimum_amount_of_get_directory_total_size_calls(
Expand All @@ -190,7 +187,6 @@ async def test_minimum_amount_of_get_directory_total_size_calls(
assert mocked_port_key_content_changed.call_count == 0

# event finished processing and was dispatched
await _wait_for_event_to_trigger_big_directory(event_filter)
async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS):
with attempt:
assert mock_get_directory_total_size.call_count == 2
Expand Down Expand Up @@ -221,9 +217,10 @@ async def test_minimum_amount_of_get_directory_total_size_calls_with_continuous_
assert mocked_port_key_content_changed.call_count == 0

# event finished processing and was dispatched
await _wait_for_event_to_trigger_big_directory(event_filter)
assert mock_get_directory_total_size.call_count == 2
assert mocked_port_key_content_changed.call_count == 1
async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS):
with attempt:
assert mock_get_directory_total_size.call_count == 2
assert mocked_port_key_content_changed.call_count == 1


def test_default_delay_policy():
Expand Down
Loading