This repository was archived by the owner on May 2, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 77
Expand file tree
/
Copy pathtest_file.py
More file actions
76 lines (50 loc) · 2.14 KB
/
test_file.py
File metadata and controls
76 lines (50 loc) · 2.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import os
import time
from pathlib import Path
import pytest
from viu_media.core.utils.file import AtomicWriter, FileLock, check_file_modified, sanitize_filename
def test_atomic_writer_writes_atomically(tmp_path: Path):
target = tmp_path / "data.txt"
with AtomicWriter(target, mode="w", encoding="utf-8") as handle:
handle.write("hello world")
assert target.read_text(encoding="utf-8") == "hello world"
def test_atomic_writer_cleans_temp_file_on_exception(tmp_path: Path):
target = tmp_path / "data.txt"
target.write_text("original", encoding="utf-8")
with pytest.raises(RuntimeError, match="boom"):
with AtomicWriter(target, mode="w", encoding="utf-8") as handle:
handle.write("new")
raise RuntimeError("boom")
assert target.read_text(encoding="utf-8") == "original"
assert list(tmp_path.glob("*.tmp")) == []
def test_file_lock_acquire_release_non_blocking(tmp_path: Path):
lock_path = tmp_path / "my.lock"
lock = FileLock(lock_path, timeout=0, stale_timeout=10)
lock.acquire()
assert lock_path.exists()
lock.release()
assert not lock_path.exists()
def test_file_lock_breaks_stale_lock(tmp_path: Path):
lock_path = tmp_path / "stale.lock"
lock_path.write_text("999999\n0", encoding="utf-8")
stale_timestamp = time.time() - 100
os.utime(lock_path, (stale_timestamp, stale_timestamp))
lock = FileLock(lock_path, timeout=0.5, stale_timeout=0.01)
lock.acquire()
assert lock_path.exists()
lock.release()
assert not lock_path.exists()
def test_check_file_modified_detects_changes(tmp_path: Path):
target = tmp_path / "track.txt"
target.write_text("a", encoding="utf-8")
previous_mtime = target.stat().st_mtime
time.sleep(0.01)
target.write_text("b", encoding="utf-8")
current_mtime, modified = check_file_modified(target, previous_mtime)
assert modified is True
assert current_mtime > previous_mtime
def test_sanitize_filename_removes_invalid_characters():
result = sanitize_filename('My:/Invalid*"Name?', restricted=False)
assert "?" not in result
assert "*" not in result
assert result