|
| 1 | +import os |
| 2 | +import time |
| 3 | +import sys |
| 4 | +import struct |
| 5 | +import subprocess |
| 6 | +import pathlib |
| 7 | +import tempfile |
| 8 | +import pytest |
| 9 | +import fcntl |
| 10 | + |
| 11 | +topdir = pathlib.Path(__file__).parent.parent |
| 12 | + |
| 13 | +@pytest.fixture |
| 14 | +def filesystem(request): |
| 15 | + fstype = request.node.get_closest_marker("fstype").args[0] |
| 16 | + |
| 17 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 18 | + st_dev = os.stat(tmpdir).st_dev |
| 19 | + proc = subprocess.Popen([sys.executable, "-d", topdir / "example" / f"{fstype}.py", tmpdir], stdin=subprocess.DEVNULL) |
| 20 | + |
| 21 | + deadline = time.time() + 1 |
| 22 | + while time.time() < deadline: |
| 23 | + new_st_dev = os.stat(tmpdir).st_dev |
| 24 | + if new_st_dev != st_dev: |
| 25 | + break |
| 26 | + time.sleep(.01) |
| 27 | + if new_st_dev == st_dev: |
| 28 | + proc.terminate() |
| 29 | + raise RuntimeError("Filesystem did not mount within 1s") |
| 30 | + |
| 31 | + |
| 32 | + yield pathlib.Path(tmpdir) |
| 33 | + |
| 34 | + subprocess.call(["fusermount", "-u", "-q", "-z", tmpdir]) |
| 35 | + |
| 36 | + deadline = time.time() + 1 |
| 37 | + while time.time() < deadline: |
| 38 | + result = proc.poll() |
| 39 | + if result is not None: |
| 40 | + if result != 0: |
| 41 | + raise RuntimeError("Filesystem exited with an error: {result}") |
| 42 | + return |
| 43 | + time.sleep(.01) |
| 44 | + |
| 45 | + proc.terminate() |
| 46 | + raise RuntimeError("Filesystem failed to exit within 1s after unmount") |
| 47 | + |
| 48 | +@pytest.mark.fstype("hello") |
| 49 | +def test_hello(filesystem): |
| 50 | + content = (filesystem / "hello").read_text(encoding="utf-8") |
| 51 | + assert content == "Hello World!\n" |
| 52 | + |
| 53 | +@pytest.mark.fstype("fioc") |
| 54 | +def test_fioc(filesystem): |
| 55 | + FIOC_GET_SIZE, FIOC_SET_SIZE = 0x80084500, 0x40084501 |
| 56 | + with (filesystem / "fioc").open("rb") as f: |
| 57 | + b = struct.pack("L", 42) |
| 58 | + fcntl.ioctl(f.fileno(), FIOC_SET_SIZE, b) |
| 59 | + |
| 60 | + b = bytearray(struct.calcsize('l')) |
| 61 | + fcntl.ioctl(f.fileno(), FIOC_GET_SIZE, b) |
| 62 | + assert struct.unpack("L", b)[0] == 42 |
| 63 | + |
0 commit comments