Skip to content

Commit 8ae4186

Browse files
authored
Fix FileWatcher DELETE event reporting and rearrange tests (#86)
This pull request have 2 different but related main changes: * The `FileWatcher` was skipping `DELETE` events, except on very weird cases where the file was deleted and re-created just before the event was triggered by `awatch()`. This also extends the integration test to test both situations (delete events when a file exist and when it doesn't). Because there is usually a considerable delay between a change in the filesystem happens and `awatch()` emits an event it is difficult to decide if we should filter events based on the file existing or not. For example, if a create event is received but before is triggered the file was removed, should we emit the create event or not? Or the other way around, should we emit a delete event if a file was created again before the event was triggered. It is probably best to leave this decision to the user, and let them deal with races. * `FileWatcher` tests were rearranged and split between unit and integration tests. The existing tests were really integration tests, as they are not only testing `FileWatcher`, but also `Select` and `PeriodicTimer`, which is confusing when there is a bug in any of the other classes as we might get test failures here (this actually happened during the development of #79). Because of this the current tests were moved to `test/util/test_integration.py`. We also mark the tests with the `pytest` mark `integration` to make it more clear they are integration tests, and to enable an easy way to only run (exclude) integration tests. New unit tests were added to the current test file, properly mocking `awatch` to avoid interactions with the OS. There are many more integration tests posing as unit tests, but this PR only takes care of `FileWatcher`. With time we should move the rest too.
2 parents 5126a17 + ebb99e6 commit 8ae4186

File tree

5 files changed

+244
-68
lines changed

5 files changed

+244
-68
lines changed

RELEASE_NOTES.md

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
## Summary
44

5-
This release adds support to pass `None` values via channels and revamps the `Timer` class to support custom policies for handling missed ticks and use the loop monotonic clock.
5+
This release adds support to pass `None` values via channels and revamps the `Timer` class to support custom policies for handling missed ticks and use the loop monotonic clock. There is also a fix for the `FileWatcher` which includes a change in behavior when reporting changes for deleted files.
66

77
## Upgrading
88

@@ -27,10 +27,32 @@ This release adds support to pass `None` values via channels and revamps the `Ti
2727

2828
They are not **exactly** the same because the `triggered_datetime` in the second case will not be exactly when the timer had triggered, but that shouldn't be relevant, the important part is when your code can actually react to the timer trigger and to know how much drift there was to be able to take corrective actions.
2929

30-
Also the new `Timer` uses the `asyncio` loop monotonic clock and the old one used the wall clock (`datetime.now()`) to track time. This means that when using `async-solipsism` to test, the new `Timer` will always trigger immediately regarless of the state of the wall clock.
30+
Also the new `Timer` uses the `asyncio` loop monotonic clock and the old one used the wall clock (`datetime.now()`) to track time. This means that when using `async-solipsism` to test, the new `Timer` will always trigger immediately regarless of the state of the wall clock. This also means that we don't need to mock the wall clock with `time-machine` either now.
31+
32+
With the previous timer one needed to create a separate task to run the timer, because otherwise it would block as it loops until the wall clock was at a specific time. Now the code will run like this:
33+
34+
```python
35+
timer = Timer.timeout(timedelta(seconds=1.0))
36+
asyncio.sleep(0.5) # Advances the loop monotonic clock by 0.5 seconds immediately
37+
await drift = timer.receive() # Advances the clock by 0.5 immediately too
38+
assert drift == approx(timedelta(0)) # Because it could trigger exactly at the tick time
39+
40+
# Simulates a delay in the timer trigger time
41+
asyncio.sleep(1.5) # Advances the loop monotonic clock by 1.5 seconds immediately
42+
await drift = timer.receive() # The timer should have triggerd 0.5 seconds ago, so it doesn't even sleep
43+
assert drift == approx(timedelta(seconds=0.5)) # Now we should observe a drift of 0.5 seconds
44+
```
3145

3246
**Note:** Before replacing this code blindly in all uses of `Timer.timeout()`, please consider using the periodic timer constructor `Timer.periodic()` if you need a timer that triggers reliable on a periodic fashion, as the old `Timer` (and `Timer.timeout()`) accumulates drift, which might not be what you want.
3347

48+
* `FileWatcher` now will emit events even if the file doesn't exist anymore.
49+
50+
Because the underlying library has a considerable delay in triggering filesystem events, it can happen that, for example, a `CREATE` event is received but at the time of receiving the file doesn't exist anymore (because if was removed just after creation and before the event was triggered).
51+
52+
Before the `FileWatcher` will only emit events when the file exists, but this doesn't work for `DELETE` events (clearly). Given the nature of this mechanism can lead to races easily, it is better to leave it to the user to decide when these situations happen and just report all events.
53+
54+
Therefore, you should now check a file receiving an event really exist before trying to operate on it.
55+
3456
## New Features
3557

3658
* `util.Timer` was replaced by a more generic implementation that allows for customizable policies to handle missed ticks.
@@ -40,3 +62,7 @@ This release adds support to pass `None` values via channels and revamps the `Ti
4062
## Bug Fixes
4163

4264
* `util.Select` / `util.Merge` / `util.MergeNamed`: Cancel pending tasks in `__del__` methods only if possible (the loop is not already closed).
65+
66+
* `FileWatcher` will now report `DELETE` events correctly.
67+
68+
Due to a bug, before this release `DELETE` events were only reported if the file was re-created before the event was triggered.

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ src_paths = ["src", "examples", "tests"]
7474
[tool.pytest.ini_options]
7575
asyncio_mode = "auto"
7676
required_plugins = ["pytest-asyncio", "pytest-mock"]
77+
markers = [
78+
"integration: integration tests (deselect with '-m \"not integration\"')",
79+
]
7780

7881
[[tool.mypy.overrides]]
7982
module = ["async_solipsism", "async_solipsism.*"]

src/frequenz/channels/util/_file_watcher.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,27 @@ def __init__(
4646
for path in paths
4747
]
4848
self._awatch = awatch(
49-
*self._paths,
50-
stop_event=self._stop_event,
51-
watch_filter=lambda change, path_str: (
52-
change in [event_type.value for event_type in event_types] # type: ignore
53-
and pathlib.Path(path_str).is_file()
54-
),
49+
*self._paths, stop_event=self._stop_event, watch_filter=self._filter_events
5550
)
5651
self._awatch_stopped_exc: Optional[Exception] = None
5752
self._changes: Set[FileChange] = set()
5853

54+
def _filter_events(
55+
self,
56+
change: Change,
57+
path: str, # pylint: disable=unused-argument
58+
) -> bool:
59+
"""Filter events based on the event type and path.
60+
61+
Args:
62+
change: The type of change to be notified.
63+
path: The path of the file that changed.
64+
65+
Returns:
66+
Whether the event should be notified.
67+
"""
68+
return change in [event_type.value for event_type in self.event_types]
69+
5970
def __del__(self) -> None:
6071
"""Cleanup registered watches.
6172

tests/utils/test_file_watcher.py

Lines changed: 87 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -3,72 +3,99 @@
33

44
"""Tests for `channel.FileWatcher`."""
55

6-
import os
6+
from __future__ import annotations
7+
78
import pathlib
8-
from datetime import timedelta
9+
from collections.abc import AsyncGenerator, Iterator, Sequence
10+
from typing import Any
11+
from unittest import mock
912

10-
from frequenz.channels.util import FileWatcher, Select, Timer
13+
import hypothesis
14+
import hypothesis.strategies as st
15+
import pytest
16+
from watchfiles import Change
17+
from watchfiles.main import FileChange
1118

19+
from frequenz.channels.util import FileWatcher
1220

13-
async def test_file_watcher(tmp_path: pathlib.Path) -> None:
14-
"""Ensure file watcher is returning paths on file events.
1521

16-
Args:
17-
tmp_path (pathlib.Path): A tmp directory to run the file watcher on.
18-
Created by pytest.
19-
"""
20-
filename = tmp_path / "test-file"
21-
file_watcher = FileWatcher(paths=[str(tmp_path)])
22+
class _FakeAwatch:
23+
"""Fake awatch class to mock the awatch function."""
2224

23-
number_of_writes = 0
24-
expected_number_of_writes = 3
25+
def __init__(self, changes: Sequence[FileChange] = ()) -> None:
26+
"""Create a `_FakeAwatch` instance.
2527
26-
select = Select(
27-
timer=Timer.timeout(timedelta(seconds=0.1)),
28-
file_watcher=file_watcher,
29-
)
30-
while await select.ready():
31-
if msg := select.timer:
32-
filename.write_text(f"{msg.inner}")
33-
elif msg := select.file_watcher:
34-
assert msg.inner == filename
35-
number_of_writes += 1
36-
# After receiving a write 3 times, unsubscribe from the writes channel
37-
if number_of_writes == expected_number_of_writes:
38-
break
39-
40-
assert number_of_writes == expected_number_of_writes
41-
42-
43-
async def test_file_watcher_change_types(tmp_path: pathlib.Path) -> None:
44-
"""Ensure file watcher is returning paths only on the DELETE change.
45-
46-
Args:
47-
tmp_path (pathlib.Path): A tmp directory to run the file watcher on.
48-
Created by pytest.
49-
"""
50-
filename = tmp_path / "test-file"
51-
file_watcher = FileWatcher(
52-
paths=[str(tmp_path)], event_types={FileWatcher.EventType.DELETE}
53-
)
28+
Args:
29+
changes: A sequence of file changes to be returned by the fake awatch
30+
function.
31+
"""
32+
self.changes: Sequence[FileChange] = changes
33+
34+
async def fake_awatch(
35+
self, *paths: str, **kwargs: Any # pylint: disable=unused-argument
36+
) -> AsyncGenerator[set[FileChange], None]:
37+
"""Fake awatch function.
38+
39+
Args:
40+
paths: Paths to watch.
41+
kwargs: Keyword arguments to pass to the awatch function.
42+
"""
43+
for change in self.changes:
44+
yield {change}
45+
46+
47+
@pytest.fixture
48+
def fake_awatch() -> Iterator[_FakeAwatch]:
49+
"""Fixture to mock the awatch function."""
50+
fake = _FakeAwatch()
51+
with mock.patch(
52+
"frequenz.channels.util._file_watcher.awatch",
53+
autospec=True,
54+
side_effect=fake.fake_awatch,
55+
):
56+
yield fake
5457

55-
select = Select(
56-
write_timer=Timer.timeout(timedelta(seconds=0.1)),
57-
deletion_timer=Timer.timeout(timedelta(seconds=0.25)),
58-
watcher=file_watcher,
58+
59+
async def test_file_watcher_receive_updates(
60+
fake_awatch: _FakeAwatch, # pylint: disable=redefined-outer-name
61+
) -> None:
62+
"""Test the file watcher receive the expected events."""
63+
filename = "test-file"
64+
changes = (
65+
(Change.added, filename),
66+
(Change.deleted, filename),
67+
(Change.modified, filename),
5968
)
60-
number_of_deletes = 0
61-
number_of_write = 0
62-
while await select.ready():
63-
if msg := select.write_timer:
64-
filename.write_text(f"{msg.inner}")
65-
number_of_write += 1
66-
elif _ := select.deletion_timer:
67-
os.remove(filename)
68-
elif _ := select.watcher:
69-
number_of_deletes += 1
70-
break
71-
72-
assert number_of_deletes == 1
73-
# Can be more because the watcher could take some time to trigger
74-
assert number_of_write >= 2
69+
fake_awatch.changes = changes
70+
file_watcher = FileWatcher(paths=[filename])
71+
72+
for change in changes:
73+
recv_changes = await file_watcher.receive()
74+
assert recv_changes == pathlib.Path(change[1])
75+
76+
77+
@hypothesis.given(event_types=st.sets(st.sampled_from(FileWatcher.EventType)))
78+
async def test_file_watcher_filter_events(
79+
event_types: set[FileWatcher.EventType],
80+
) -> None:
81+
"""Test the file watcher events filtering."""
82+
good_path = "good-file"
83+
84+
# We need to reset the mock explicitly because hypothesis runs all the produced
85+
# inputs in the same context.
86+
with mock.patch(
87+
"frequenz.channels.util._file_watcher.awatch", autospec=True
88+
) as awatch_mock:
89+
file_watcher = FileWatcher(paths=[good_path], event_types=event_types)
90+
91+
filter_events = file_watcher._filter_events # pylint: disable=protected-access
92+
93+
assert awatch_mock.mock_calls == [
94+
mock.call(
95+
pathlib.Path(good_path), stop_event=mock.ANY, watch_filter=filter_events
96+
)
97+
]
98+
for event_type in FileWatcher.EventType:
99+
assert filter_events(event_type.value, good_path) == (
100+
event_type in event_types
101+
)

tests/utils/test_integration.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Integration tests for the `util` module."""
5+
6+
from __future__ import annotations
7+
8+
import os
9+
import pathlib
10+
from datetime import timedelta
11+
12+
import pytest
13+
14+
from frequenz.channels.util import FileWatcher, Select, Timer
15+
16+
17+
@pytest.mark.integration
18+
async def test_file_watcher(tmp_path: pathlib.Path) -> None:
19+
"""Ensure file watcher is returning paths on file events.
20+
21+
Args:
22+
tmp_path (pathlib.Path): A tmp directory to run the file watcher on.
23+
Created by pytest.
24+
"""
25+
filename = tmp_path / "test-file"
26+
file_watcher = FileWatcher(paths=[str(tmp_path)])
27+
28+
number_of_writes = 0
29+
expected_number_of_writes = 3
30+
31+
select = Select(
32+
timer=Timer.timeout(timedelta(seconds=0.1)),
33+
file_watcher=file_watcher,
34+
)
35+
while await select.ready():
36+
if msg := select.timer:
37+
filename.write_text(f"{msg.inner}")
38+
elif msg := select.file_watcher:
39+
assert msg.inner == filename
40+
number_of_writes += 1
41+
# After receiving a write 3 times, unsubscribe from the writes channel
42+
if number_of_writes == expected_number_of_writes:
43+
break
44+
45+
assert number_of_writes == expected_number_of_writes
46+
47+
48+
@pytest.mark.integration
49+
async def test_file_watcher_deletes(tmp_path: pathlib.Path) -> None:
50+
"""Ensure file watcher is returning paths only on the DELETE change.
51+
52+
Also ensures that DELETE events are sent even if the file was recreated and even if
53+
the file doesn't exist.
54+
55+
Args:
56+
tmp_path (pathlib.Path): A tmp directory to run the file watcher on.
57+
Created by pytest.
58+
"""
59+
filename = tmp_path / "test-file"
60+
file_watcher = FileWatcher(
61+
paths=[str(tmp_path)], event_types={FileWatcher.EventType.DELETE}
62+
)
63+
64+
select = Select(
65+
write_timer=Timer.timeout(timedelta(seconds=0.1)),
66+
deletion_timer=Timer.timeout(timedelta(seconds=0.25)),
67+
watcher=file_watcher,
68+
)
69+
number_of_write = 0
70+
number_of_deletes = 0
71+
number_of_events = 0
72+
# We want to write to a file and then removed, and then write again (create it
73+
# again) and remove it again and then stop.
74+
# Because awatch takes some time to get notified by the OS, we need to stop writing
75+
# while a delete was done, to make sure the file is not recreated before the
76+
# deletion event arrives.
77+
# For the second round of writes and then delete, we allow writing after the delete
78+
# was done as an extra test.
79+
#
80+
# This is an example timeline for this test:
81+
#
82+
# |-----|--.--|-----|---o-|-----|--.--|-----|--o--|-----|-----|-----|-----|-----|
83+
# W W D E W W D W W E
84+
#
85+
# Where:
86+
# W: Write
87+
# D: Delete
88+
# E: FileWatcher Event
89+
while await select.ready():
90+
if msg := select.write_timer:
91+
if number_of_write >= 2 and number_of_events == 0:
92+
continue
93+
filename.write_text(f"{msg.inner}")
94+
number_of_write += 1
95+
elif _ := select.deletion_timer:
96+
# Avoid removing the file twice
97+
if not pathlib.Path(filename).is_file():
98+
continue
99+
os.remove(filename)
100+
number_of_deletes += 1
101+
elif _ := select.watcher:
102+
number_of_events += 1
103+
if number_of_events >= 2:
104+
break
105+
106+
assert number_of_deletes == 2
107+
# Can be more because the watcher could take some time to trigger
108+
assert number_of_write >= 3
109+
assert number_of_events == 2

0 commit comments

Comments
 (0)