From 899379cd580c6c76a39ff3b33ecdb18083971a19 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 4 Mar 2025 10:08:39 +0100 Subject: [PATCH 1/4] fixed issue with not detecing copy move to folder --- .../modules/outputs/_event_handler.py | 47 ++++++++++++------- .../modules/outputs/_watchdog_extensions.py | 4 +- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_handler.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_handler.py index 8d347d75e109..d7754824fca7 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_handler.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_handler.py @@ -25,6 +25,10 @@ _logger = logging.getLogger(__name__) +def _get_first_entry_or_none(data: list[str]) -> str | None: + return data[0] if len(data) > 0 else None + + class _PortKeysEventHandler(SafeFileSystemEventHandler): # NOTE: runs in the created process @@ -42,6 +46,15 @@ def handle_set_outputs_port_keys(self, *, outputs_port_keys: set[str]) -> None: def handle_toggle_event_propagation(self, *, is_enabled: bool) -> None: self._is_event_propagation_enabled = is_enabled + def _get_relative_path_parents(self, path: bytes | str) -> list[str]: + try: + spath_relative_to_outputs = Path( + path.decode() if isinstance(path, bytes) else path + ).relative_to(self.outputs_path) + except ValueError: + return [] + return [f"{x}" for x in spath_relative_to_outputs.parents] + def event_handler(self, event: FileSystemEvent) -> None: if not self._is_event_propagation_enabled: return @@ -49,25 +62,29 @@ def event_handler(self, event: FileSystemEvent) -> None: # NOTE: ignoring all events which are not relative to modifying # the contents of the `port_key` folders from the outputs directory - path_relative_to_outputs = Path( - event.src_path.decode() - if isinstance(event.src_path, bytes) - else event.src_path - ).relative_to(self.outputs_path) + # NOTE: the `port_key` will be present in either the src_path or the dest_path + # depending on the type of event + + src_relative_path_parents = self._get_relative_path_parents(event.src_path) + dst_relative_path_parents = self._get_relative_path_parents(event.dest_path) # discard event if not part of a subfolder - relative_path_parents = path_relative_to_outputs.parents - event_in_subdirs = len(relative_path_parents) > 0 + event_in_subdirs = ( + len(src_relative_path_parents) > 0 or len(dst_relative_path_parents) > 0 + ) if not event_in_subdirs: return # only accept events generated inside `port_key` subfolder - port_key_candidate = f"{relative_path_parents[0]}" - - if port_key_candidate in self._outputs_port_keys: - # messages in this queue (part of the process), - # will be consumed by the asyncio thread - self.port_key_events_queue.put(port_key_candidate) + src_port_key_candidate = _get_first_entry_or_none(src_relative_path_parents) + dst_port_key_candidate = _get_first_entry_or_none(dst_relative_path_parents) + + for port_key_candidate in (src_port_key_candidate, dst_port_key_candidate): + if port_key_candidate in self._outputs_port_keys: + # messages in this queue (part of the process), + # will be consumed by the asyncio thread + self.port_key_events_queue.put(port_key_candidate) + break class _EventHandlerProcess: @@ -137,9 +154,7 @@ def _thread_worker_update_outputs_port_keys(self) -> None: # Propagate `outputs_port_keys` changes to the `_PortKeysEventHandler`. while True: - message: dict[ - str, Any - ] | None = ( + message: dict[str, Any] | None = ( self.outputs_context.file_system_event_handler_queue.get() # pylint:disable=no-member ) _logger.debug("received message %s", message) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_watchdog_extensions.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_watchdog_extensions.py index 0d4d4c08be2a..2b77249e355c 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_watchdog_extensions.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_watchdog_extensions.py @@ -27,7 +27,7 @@ ], ) -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) class _ExtendedInotifyBuffer(InotifyBuffer): @@ -89,5 +89,5 @@ def on_any_event(self, event: FileSystemEvent) -> None: # which is running in the context of the # ExtendedInotifyObserver will cause the # observer to stop working. - with log_catch(logger, reraise=False): + with log_catch(_logger, reraise=False): self.event_handler(event) From ce239574b5b53249c9ee27884883650eeb846177 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 4 Mar 2025 11:11:08 +0100 Subject: [PATCH 2/4] added tests --- .../test_modules_outputs_event_handler.py | 70 ++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py index 35ccc7d72df7..646041288cd0 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py @@ -2,8 +2,9 @@ # pylint: disable=protected-access import asyncio +from collections.abc import AsyncIterable from pathlib import Path -from typing import AsyncIterable +from typing import Any, Final from unittest.mock import Mock import aioprocessing @@ -17,8 +18,16 @@ from simcore_service_dynamic_sidecar.modules.outputs._event_handler import ( EventHandlerObserver, _EventHandlerProcess, + _PortKeysEventHandler, ) from simcore_service_dynamic_sidecar.modules.outputs._manager import OutputsManager +from watchdog.events import ( + DirModifiedEvent, + FileClosedEvent, + FileCreatedEvent, + FileMovedEvent, + FileSystemEvent, +) @pytest.fixture @@ -124,3 +133,62 @@ async def test_event_handler_observer_health_degraded( await asyncio.sleep(observer_monitor.wait_for_heart_beat_interval_s * 3) await observer_monitor.stop() assert outputs_manager.set_all_ports_for_upload.call_count >= 1 + + +_STATE_PATH: Final[Path] = Path("/some/random/fake/path/for/state/") + + +@pytest.fixture +def mock_state_path() -> Path: + return _STATE_PATH + + +class _MockAioQueue: + def __init__(self) -> None: + self.items: list[Any] = [] + + def put(self, item: Any) -> None: + self.items.append(item) + + def get(self) -> Any | None: + try: + return self.items.pop() + except IndexError: + return None + + +@pytest.mark.parametrize( + "event, expected_port_key", + [ + (FileCreatedEvent(src_path=f"{_STATE_PATH}/untitled.txt", dest_path=""), None), + (DirModifiedEvent(src_path=f"{_STATE_PATH}", dest_path=""), None), + (FileClosedEvent(src_path=f"{_STATE_PATH}/untitled.txt", dest_path=""), None), + ( + FileMovedEvent( + src_path=f"{_STATE_PATH}/untitled.txt", + dest_path=f"{_STATE_PATH}/asdsadsasad.txt", + ), + None, + ), + ( + FileMovedEvent( + src_path=f"{_STATE_PATH}/asdsadsasad.txt", + dest_path=f"{_STATE_PATH}/output_1/asdsadsasad.txt", + ), + "output_1", + ), + (DirModifiedEvent(src_path=f"{_STATE_PATH}/output_1", dest_path=""), None), + ], +) +def test_port_keys_event_handler_triggers_for_events( + mock_state_path: Path, event: FileSystemEvent, expected_port_key: str | None +) -> None: + + queue = _MockAioQueue() + + event_handler = _PortKeysEventHandler(mock_state_path, queue) + event_handler.handle_set_outputs_port_keys(outputs_port_keys={"output_1"}) + event_handler.handle_toggle_event_propagation(is_enabled=True) + + event_handler.event_handler(event) + assert queue.get() == expected_port_key From 2c507a39696470b67f47db2caa0d8e80a9ffd12f Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 4 Mar 2025 11:19:31 +0100 Subject: [PATCH 3/4] refactor tests --- .../test_modules_outputs_event_handler.py | 61 +++++++++++++++++-- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py index 646041288cd0..49d38d946eaf 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py @@ -160,24 +160,73 @@ def get(self) -> Any | None: @pytest.mark.parametrize( "event, expected_port_key", [ - (FileCreatedEvent(src_path=f"{_STATE_PATH}/untitled.txt", dest_path=""), None), - (DirModifiedEvent(src_path=f"{_STATE_PATH}", dest_path=""), None), - (FileClosedEvent(src_path=f"{_STATE_PATH}/untitled.txt", dest_path=""), None), - ( + pytest.param( + FileCreatedEvent(src_path=f"{_STATE_PATH}/untitled.txt", dest_path=""), + None, + id="file_create_outside", + ), + pytest.param( + FileCreatedEvent( + src_path=f"{_STATE_PATH}/output_1/untitled1.txt", + dest_path="", + ), + "output_1", + id="file_create_inside_monitored_port", + ), + pytest.param( + FileCreatedEvent( + src_path=f"{_STATE_PATH}/output_9/untitled1.txt", + dest_path="", + ), + None, + id="file_create_inside_not_monitored_port", + ), + pytest.param( FileMovedEvent( src_path=f"{_STATE_PATH}/untitled.txt", dest_path=f"{_STATE_PATH}/asdsadsasad.txt", ), None, + id="move_outside_any_port", ), - ( + pytest.param( FileMovedEvent( src_path=f"{_STATE_PATH}/asdsadsasad.txt", dest_path=f"{_STATE_PATH}/output_1/asdsadsasad.txt", ), "output_1", + id="move_to_monitored_port", + ), + pytest.param( + FileMovedEvent( + src_path=f"{_STATE_PATH}/asdsadsasad.txt", + dest_path=f"{_STATE_PATH}/output_9/asdsadsasad.txt", + ), + None, + id="move_outside_monitored_port", + ), + pytest.param( + DirModifiedEvent(src_path=f"{_STATE_PATH}/output_1", dest_path=""), + None, + id="modified_port_dir_does_nothing", + ), + pytest.param( + DirModifiedEvent(src_path=f"{_STATE_PATH}", dest_path=""), + None, + id="modified_outer_dir_does_nothing", + ), + pytest.param( + FileClosedEvent(src_path=f"{_STATE_PATH}/untitled.txt", dest_path=""), + None, + id="close_file_outside_does_nothing", + ), + pytest.param( + FileClosedEvent( + src_path=f"{_STATE_PATH}/output_1/asdsadsasad.txt", dest_path="" + ), + "output_1", + id="close_file_inside_triggers_event", ), - (DirModifiedEvent(src_path=f"{_STATE_PATH}/output_1", dest_path=""), None), ], ) def test_port_keys_event_handler_triggers_for_events( From 1efbd0c8402671b94dcbcdeb728c7c081dabd44e Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 4 Mar 2025 13:30:59 +0100 Subject: [PATCH 4/4] minor tip --- .../modules/outputs/_event_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_handler.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_handler.py index d7754824fca7..a61ea375286c 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_handler.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_handler.py @@ -26,7 +26,7 @@ def _get_first_entry_or_none(data: list[str]) -> str | None: - return data[0] if len(data) > 0 else None + return next(iter(data), None) class _PortKeysEventHandler(SafeFileSystemEventHandler):