Skip to content

Commit 2d41961

Browse files
GitHKAndrei Neagu
authored andcommitted
🐛 When a file is now moved to the output_x folder it is now detected (ITISFoundation#7301)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent 739f13d commit 2d41961

File tree

3 files changed

+151
-19
lines changed

3 files changed

+151
-19
lines changed

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_handler.py

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
_logger = logging.getLogger(__name__)
2626

2727

28+
def _get_first_entry_or_none(data: list[str]) -> str | None:
29+
return next(iter(data), None)
30+
31+
2832
class _PortKeysEventHandler(SafeFileSystemEventHandler):
2933
# NOTE: runs in the created process
3034

@@ -42,32 +46,45 @@ def handle_set_outputs_port_keys(self, *, outputs_port_keys: set[str]) -> None:
4246
def handle_toggle_event_propagation(self, *, is_enabled: bool) -> None:
4347
self._is_event_propagation_enabled = is_enabled
4448

49+
def _get_relative_path_parents(self, path: bytes | str) -> list[str]:
50+
try:
51+
spath_relative_to_outputs = Path(
52+
path.decode() if isinstance(path, bytes) else path
53+
).relative_to(self.outputs_path)
54+
except ValueError:
55+
return []
56+
return [f"{x}" for x in spath_relative_to_outputs.parents]
57+
4558
def event_handler(self, event: FileSystemEvent) -> None:
4659
if not self._is_event_propagation_enabled:
4760
return
4861

4962
# NOTE: ignoring all events which are not relative to modifying
5063
# the contents of the `port_key` folders from the outputs directory
5164

52-
path_relative_to_outputs = Path(
53-
event.src_path.decode()
54-
if isinstance(event.src_path, bytes)
55-
else event.src_path
56-
).relative_to(self.outputs_path)
65+
# NOTE: the `port_key` will be present in either the src_path or the dest_path
66+
# depending on the type of event
67+
68+
src_relative_path_parents = self._get_relative_path_parents(event.src_path)
69+
dst_relative_path_parents = self._get_relative_path_parents(event.dest_path)
5770

5871
# discard event if not part of a subfolder
59-
relative_path_parents = path_relative_to_outputs.parents
60-
event_in_subdirs = len(relative_path_parents) > 0
72+
event_in_subdirs = (
73+
len(src_relative_path_parents) > 0 or len(dst_relative_path_parents) > 0
74+
)
6175
if not event_in_subdirs:
6276
return
6377

6478
# only accept events generated inside `port_key` subfolder
65-
port_key_candidate = f"{relative_path_parents[0]}"
66-
67-
if port_key_candidate in self._outputs_port_keys:
68-
# messages in this queue (part of the process),
69-
# will be consumed by the asyncio thread
70-
self.port_key_events_queue.put(port_key_candidate)
79+
src_port_key_candidate = _get_first_entry_or_none(src_relative_path_parents)
80+
dst_port_key_candidate = _get_first_entry_or_none(dst_relative_path_parents)
81+
82+
for port_key_candidate in (src_port_key_candidate, dst_port_key_candidate):
83+
if port_key_candidate in self._outputs_port_keys:
84+
# messages in this queue (part of the process),
85+
# will be consumed by the asyncio thread
86+
self.port_key_events_queue.put(port_key_candidate)
87+
break
7188

7289

7390
class _EventHandlerProcess:
@@ -137,9 +154,7 @@ def _thread_worker_update_outputs_port_keys(self) -> None:
137154

138155
# Propagate `outputs_port_keys` changes to the `_PortKeysEventHandler`.
139156
while True:
140-
message: dict[
141-
str, Any
142-
] | None = (
157+
message: dict[str, Any] | None = (
143158
self.outputs_context.file_system_event_handler_queue.get() # pylint:disable=no-member
144159
)
145160
_logger.debug("received message %s", message)

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_watchdog_extensions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
],
2828
)
2929

30-
logger = logging.getLogger(__name__)
30+
_logger = logging.getLogger(__name__)
3131

3232

3333
class _ExtendedInotifyBuffer(InotifyBuffer):
@@ -89,5 +89,5 @@ def on_any_event(self, event: FileSystemEvent) -> None:
8989
# which is running in the context of the
9090
# ExtendedInotifyObserver will cause the
9191
# observer to stop working.
92-
with log_catch(logger, reraise=False):
92+
with log_catch(_logger, reraise=False):
9393
self.event_handler(event)

services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
# pylint: disable=protected-access
33

44
import asyncio
5+
from collections.abc import AsyncIterable
56
from pathlib import Path
6-
from typing import AsyncIterable
7+
from typing import Any, Final
78
from unittest.mock import Mock
89

910
import aioprocessing
@@ -17,8 +18,16 @@
1718
from simcore_service_dynamic_sidecar.modules.outputs._event_handler import (
1819
EventHandlerObserver,
1920
_EventHandlerProcess,
21+
_PortKeysEventHandler,
2022
)
2123
from simcore_service_dynamic_sidecar.modules.outputs._manager import OutputsManager
24+
from watchdog.events import (
25+
DirModifiedEvent,
26+
FileClosedEvent,
27+
FileCreatedEvent,
28+
FileMovedEvent,
29+
FileSystemEvent,
30+
)
2231

2332

2433
@pytest.fixture
@@ -124,3 +133,111 @@ async def test_event_handler_observer_health_degraded(
124133
await asyncio.sleep(observer_monitor.wait_for_heart_beat_interval_s * 3)
125134
await observer_monitor.stop()
126135
assert outputs_manager.set_all_ports_for_upload.call_count >= 1
136+
137+
138+
_STATE_PATH: Final[Path] = Path("/some/random/fake/path/for/state/")
139+
140+
141+
@pytest.fixture
142+
def mock_state_path() -> Path:
143+
return _STATE_PATH
144+
145+
146+
class _MockAioQueue:
147+
def __init__(self) -> None:
148+
self.items: list[Any] = []
149+
150+
def put(self, item: Any) -> None:
151+
self.items.append(item)
152+
153+
def get(self) -> Any | None:
154+
try:
155+
return self.items.pop()
156+
except IndexError:
157+
return None
158+
159+
160+
@pytest.mark.parametrize(
161+
"event, expected_port_key",
162+
[
163+
pytest.param(
164+
FileCreatedEvent(src_path=f"{_STATE_PATH}/untitled.txt", dest_path=""),
165+
None,
166+
id="file_create_outside",
167+
),
168+
pytest.param(
169+
FileCreatedEvent(
170+
src_path=f"{_STATE_PATH}/output_1/untitled1.txt",
171+
dest_path="",
172+
),
173+
"output_1",
174+
id="file_create_inside_monitored_port",
175+
),
176+
pytest.param(
177+
FileCreatedEvent(
178+
src_path=f"{_STATE_PATH}/output_9/untitled1.txt",
179+
dest_path="",
180+
),
181+
None,
182+
id="file_create_inside_not_monitored_port",
183+
),
184+
pytest.param(
185+
FileMovedEvent(
186+
src_path=f"{_STATE_PATH}/untitled.txt",
187+
dest_path=f"{_STATE_PATH}/asdsadsasad.txt",
188+
),
189+
None,
190+
id="move_outside_any_port",
191+
),
192+
pytest.param(
193+
FileMovedEvent(
194+
src_path=f"{_STATE_PATH}/asdsadsasad.txt",
195+
dest_path=f"{_STATE_PATH}/output_1/asdsadsasad.txt",
196+
),
197+
"output_1",
198+
id="move_to_monitored_port",
199+
),
200+
pytest.param(
201+
FileMovedEvent(
202+
src_path=f"{_STATE_PATH}/asdsadsasad.txt",
203+
dest_path=f"{_STATE_PATH}/output_9/asdsadsasad.txt",
204+
),
205+
None,
206+
id="move_outside_monitored_port",
207+
),
208+
pytest.param(
209+
DirModifiedEvent(src_path=f"{_STATE_PATH}/output_1", dest_path=""),
210+
None,
211+
id="modified_port_dir_does_nothing",
212+
),
213+
pytest.param(
214+
DirModifiedEvent(src_path=f"{_STATE_PATH}", dest_path=""),
215+
None,
216+
id="modified_outer_dir_does_nothing",
217+
),
218+
pytest.param(
219+
FileClosedEvent(src_path=f"{_STATE_PATH}/untitled.txt", dest_path=""),
220+
None,
221+
id="close_file_outside_does_nothing",
222+
),
223+
pytest.param(
224+
FileClosedEvent(
225+
src_path=f"{_STATE_PATH}/output_1/asdsadsasad.txt", dest_path=""
226+
),
227+
"output_1",
228+
id="close_file_inside_triggers_event",
229+
),
230+
],
231+
)
232+
def test_port_keys_event_handler_triggers_for_events(
233+
mock_state_path: Path, event: FileSystemEvent, expected_port_key: str | None
234+
) -> None:
235+
236+
queue = _MockAioQueue()
237+
238+
event_handler = _PortKeysEventHandler(mock_state_path, queue)
239+
event_handler.handle_set_outputs_port_keys(outputs_port_keys={"output_1"})
240+
event_handler.handle_toggle_event_propagation(is_enabled=True)
241+
242+
event_handler.event_handler(event)
243+
assert queue.get() == expected_port_key

0 commit comments

Comments
 (0)