Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
_logger = logging.getLogger(__name__)


def _get_first_entry_or_none(data: list[str]) -> str | None:
return next(iter(data), None)


class _PortKeysEventHandler(SafeFileSystemEventHandler):
# NOTE: runs in the created process

Expand All @@ -42,32 +46,45 @@ def handle_set_outputs_port_keys(self, *, outputs_port_keys: set[str]) -> None:
def handle_toggle_event_propagation(self, *, is_enabled: bool) -> None:
self._is_event_propagation_enabled = is_enabled

def _get_relative_path_parents(self, path: bytes | str) -> list[str]:
try:
spath_relative_to_outputs = Path(
path.decode() if isinstance(path, bytes) else path
).relative_to(self.outputs_path)
except ValueError:
return []
return [f"{x}" for x in spath_relative_to_outputs.parents]

def event_handler(self, event: FileSystemEvent) -> None:
if not self._is_event_propagation_enabled:
return

# NOTE: ignoring all events which are not relative to modifying
# the contents of the `port_key` folders from the outputs directory

path_relative_to_outputs = Path(
event.src_path.decode()
if isinstance(event.src_path, bytes)
else event.src_path
).relative_to(self.outputs_path)
# NOTE: the `port_key` will be present in either the src_path or the dest_path
# depending on the type of event

src_relative_path_parents = self._get_relative_path_parents(event.src_path)
dst_relative_path_parents = self._get_relative_path_parents(event.dest_path)

# discard event if not part of a subfolder
relative_path_parents = path_relative_to_outputs.parents
event_in_subdirs = len(relative_path_parents) > 0
event_in_subdirs = (
len(src_relative_path_parents) > 0 or len(dst_relative_path_parents) > 0
)
if not event_in_subdirs:
return

# only accept events generated inside `port_key` subfolder
port_key_candidate = f"{relative_path_parents[0]}"

if port_key_candidate in self._outputs_port_keys:
# messages in this queue (part of the process),
# will be consumed by the asyncio thread
self.port_key_events_queue.put(port_key_candidate)
src_port_key_candidate = _get_first_entry_or_none(src_relative_path_parents)
dst_port_key_candidate = _get_first_entry_or_none(dst_relative_path_parents)

for port_key_candidate in (src_port_key_candidate, dst_port_key_candidate):
if port_key_candidate in self._outputs_port_keys:
# messages in this queue (part of the process),
# will be consumed by the asyncio thread
self.port_key_events_queue.put(port_key_candidate)
break


class _EventHandlerProcess:
Expand Down Expand Up @@ -137,9 +154,7 @@ def _thread_worker_update_outputs_port_keys(self) -> None:

# Propagate `outputs_port_keys` changes to the `_PortKeysEventHandler`.
while True:
message: dict[
str, Any
] | None = (
message: dict[str, Any] | None = (
self.outputs_context.file_system_event_handler_queue.get() # pylint:disable=no-member
)
_logger.debug("received message %s", message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
],
)

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)


class _ExtendedInotifyBuffer(InotifyBuffer):
Expand Down Expand Up @@ -89,5 +89,5 @@ def on_any_event(self, event: FileSystemEvent) -> None:
# which is running in the context of the
# ExtendedInotifyObserver will cause the
# observer to stop working.
with log_catch(logger, reraise=False):
with log_catch(_logger, reraise=False):
self.event_handler(event)
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
# pylint: disable=protected-access

import asyncio
from collections.abc import AsyncIterable
from pathlib import Path
from typing import AsyncIterable
from typing import Any, Final
from unittest.mock import Mock

import aioprocessing
Expand All @@ -17,8 +18,16 @@
from simcore_service_dynamic_sidecar.modules.outputs._event_handler import (
EventHandlerObserver,
_EventHandlerProcess,
_PortKeysEventHandler,
)
from simcore_service_dynamic_sidecar.modules.outputs._manager import OutputsManager
from watchdog.events import (
DirModifiedEvent,
FileClosedEvent,
FileCreatedEvent,
FileMovedEvent,
FileSystemEvent,
)


@pytest.fixture
Expand Down Expand Up @@ -124,3 +133,111 @@ async def test_event_handler_observer_health_degraded(
await asyncio.sleep(observer_monitor.wait_for_heart_beat_interval_s * 3)
await observer_monitor.stop()
assert outputs_manager.set_all_ports_for_upload.call_count >= 1


_STATE_PATH: Final[Path] = Path("/some/random/fake/path/for/state/")


@pytest.fixture
def mock_state_path() -> Path:
return _STATE_PATH


class _MockAioQueue:
def __init__(self) -> None:
self.items: list[Any] = []

def put(self, item: Any) -> None:
self.items.append(item)

def get(self) -> Any | None:
try:
return self.items.pop()
except IndexError:
return None


@pytest.mark.parametrize(
"event, expected_port_key",
[
pytest.param(
FileCreatedEvent(src_path=f"{_STATE_PATH}/untitled.txt", dest_path=""),
None,
id="file_create_outside",
),
pytest.param(
FileCreatedEvent(
src_path=f"{_STATE_PATH}/output_1/untitled1.txt",
dest_path="",
),
"output_1",
id="file_create_inside_monitored_port",
),
pytest.param(
FileCreatedEvent(
src_path=f"{_STATE_PATH}/output_9/untitled1.txt",
dest_path="",
),
None,
id="file_create_inside_not_monitored_port",
),
pytest.param(
FileMovedEvent(
src_path=f"{_STATE_PATH}/untitled.txt",
dest_path=f"{_STATE_PATH}/asdsadsasad.txt",
),
None,
id="move_outside_any_port",
),
pytest.param(
FileMovedEvent(
src_path=f"{_STATE_PATH}/asdsadsasad.txt",
dest_path=f"{_STATE_PATH}/output_1/asdsadsasad.txt",
),
"output_1",
id="move_to_monitored_port",
),
pytest.param(
FileMovedEvent(
src_path=f"{_STATE_PATH}/asdsadsasad.txt",
dest_path=f"{_STATE_PATH}/output_9/asdsadsasad.txt",
),
None,
id="move_outside_monitored_port",
),
pytest.param(
DirModifiedEvent(src_path=f"{_STATE_PATH}/output_1", dest_path=""),
None,
id="modified_port_dir_does_nothing",
),
pytest.param(
DirModifiedEvent(src_path=f"{_STATE_PATH}", dest_path=""),
None,
id="modified_outer_dir_does_nothing",
),
pytest.param(
FileClosedEvent(src_path=f"{_STATE_PATH}/untitled.txt", dest_path=""),
None,
id="close_file_outside_does_nothing",
),
pytest.param(
FileClosedEvent(
src_path=f"{_STATE_PATH}/output_1/asdsadsasad.txt", dest_path=""
),
"output_1",
id="close_file_inside_triggers_event",
),
],
)
def test_port_keys_event_handler_triggers_for_events(
mock_state_path: Path, event: FileSystemEvent, expected_port_key: str | None
) -> None:

queue = _MockAioQueue()

event_handler = _PortKeysEventHandler(mock_state_path, queue)
event_handler.handle_set_outputs_port_keys(outputs_port_keys={"output_1"})
event_handler.handle_toggle_event_propagation(is_enabled=True)

event_handler.event_handler(event)
assert queue.get() == expected_port_key
Loading