diff --git a/Include/fuse_common.pxd b/Include/fuse_common.pxd index cddbc33..f315fa8 100644 --- a/Include/fuse_common.pxd +++ b/Include/fuse_common.pxd @@ -42,6 +42,11 @@ cdef extern from * nogil: # fuse_common.h should not be included struct fuse_chan: pass + struct fuse_pollhandle: + pass + + void fuse_pollhandle_destroy(fuse_pollhandle *ph) + struct fuse_loop_config: int clone_fd unsigned max_idle_threads diff --git a/Include/fuse_lowlevel.pxd b/Include/fuse_lowlevel.pxd index fe21193..099d6f3 100644 --- a/Include/fuse_lowlevel.pxd +++ b/Include/fuse_lowlevel.pxd @@ -119,6 +119,8 @@ cdef extern from "" nogil: off_t offset, off_t length, fuse_file_info *fi) except * void (*readdirplus) (fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, fuse_file_info *fi) except * + void (*poll) (fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi, + fuse_pollhandle *ph) except * # Reply functions @@ -137,6 +139,7 @@ cdef extern from "" nogil: fuse_buf_copy_flags flags) int fuse_reply_statfs(fuse_req_t req, statvfs *stbuf) int fuse_reply_xattr(fuse_req_t req, size_t count) + int fuse_reply_poll(fuse_req_t req, unsigned revents) size_t fuse_add_direntry(fuse_req_t req, const_char *buf, size_t bufsize, const_char *name, struct_stat *stbuf, @@ -157,6 +160,7 @@ cdef extern from "" nogil: fuse_buf_copy_flags flags) int fuse_lowlevel_notify_retrieve(fuse_session *se, fuse_ino_t ino, size_t size, off_t offset, void *cookie) + int fuse_lowlevel_notify_poll(fuse_pollhandle *ph) # Utility functions void *fuse_req_userdata(fuse_req_t req) diff --git a/src/pyfuse3/__init__.pyi b/src/pyfuse3/__init__.pyi index dfefd2b..5c4b7ae 100644 --- a/src/pyfuse3/__init__.pyi +++ b/src/pyfuse3/__init__.pyi @@ -129,6 +129,10 @@ class FUSEError(Exception): def __init__(self, errno: int) -> None: ... def __str__(self) -> str: ... +class PollHandle: + def __getstate__(self) -> None: ... + def notify(self) -> None: ... + def listdir(path: str) -> List[str]: ... def syncfs(path: str) -> str: ... def setxattr(path: str, name: str, value: bytes, namespace: NamespaceT = ...) -> None: ... diff --git a/src/pyfuse3/__init__.pyx b/src/pyfuse3/__init__.pyx index 2a1e193..099bf92 100644 --- a/src/pyfuse3/__init__.pyx +++ b/src/pyfuse3/__init__.pyx @@ -504,6 +504,78 @@ cdef class FUSEError(Exception): return strerror(self.errno_) +@cython.freelist(10) +cdef class PollHandle: + ''' + Opaque handle for delivering poll(2) readiness notifications. + + Instances of this class are created by pyfuse3 and passed to + `Operations.poll`. The filesystem may keep a reference and later + call `PollHandle.notify` on the handle to wake up any process currently + blocked in :manpage:`poll(2)`, :manpage:`select(2)` or + :manpage:`epoll_wait(2)` for the corresponding file descriptor. + + A single notification is sufficient to clear all pending waiters; + filesystems should normally discard the handle after notifying. + + The underlying ``fuse_pollhandle`` is automatically destroyed when + the Python object is garbage collected, so filesystems should simply + drop the reference when the notification is no longer needed. + ''' + + cdef fuse_pollhandle *_ph + + def __cinit__(self): + self._ph = NULL + + def __init__(self): + raise TypeError('PollHandle cannot be instantiated directly') + + @staticmethod + cdef PollHandle from_ptr(fuse_pollhandle *ph): + cdef PollHandle self + + if ph == NULL: + raise ValueError('NULL fuse_pollhandle') + + self = PollHandle.__new__(PollHandle) + self._ph = ph + return self + + def __dealloc__(self): + if self._ph is not NULL: + fuse_pollhandle_destroy(self._ph) + self._ph = NULL + + def __getstate__(self): + raise PicklingError("PollHandle instances can't be pickled") + + def notify(self): + ''' + Notify IO readiness for this poll handle. + + After this returns, any process waiting in :manpage:`poll(2)`, + :manpage:`select(2)` or :manpage:`epoll_wait(2)` on the + corresponding file descriptor will be woken so it can re-poll + the filesystem for the current readiness mask. + + Each `PollHandle` is intended for a single notification. After a + successful call, the filesystem should not call `notify_poll` again on + the same handle and should discard it. + ''' + + cdef int ret + + if self._ph == NULL: + raise RuntimeError('PollHandle is no longer valid') + + with nogil: + ret = fuse_lowlevel_notify_poll(self._ph) + + if ret != 0: + raise OSError(-ret, 'fuse_lowlevel_notify_poll returned: ' + strerror(-ret)) + + def listdir(path): '''Like `os.listdir`, but releases the GIL. diff --git a/src/pyfuse3/_pyfuse3.py b/src/pyfuse3/_pyfuse3.py index 48cb1e5..836be2f 100644 --- a/src/pyfuse3/_pyfuse3.py +++ b/src/pyfuse3/_pyfuse3.py @@ -35,6 +35,7 @@ EntryAttributes, FileInfo, FUSEError, + PollHandle, ReaddirToken, RequestContext, SetattrFields, @@ -451,6 +452,44 @@ async def fsync(self, fh: FileHandleT, datasync: bool) -> None: raise FUSEError(errno.ENOSYS) + async def poll( + self, + inode: InodeT, + fh: FileHandleT, + poll_handle: Optional["PollHandle"], + ctx: "RequestContext", + ) -> int: + '''Check IO readiness on an open file. + + This method is called when a process performs :manpage:`poll(2)`, + :manpage:`select(2)` or :manpage:`epoll_wait(2)` on a file descriptor + backed by *fh* (returned by a prior `open` or `create` call). *inode* + identifies the inode that *fh* refers to. + + The method will return the bitwise-or of the currently active poll + events, for example `select.POLLIN`, `select.POLLOUT` or + `select.POLLPRI`. If no events are currently ready, it will return `0`. + + If *poll_handle* is `None`, the kernel has not provided a notification + handle for this request. The filesystem should only return the current + readiness mask and must not attempt to store a handle or arrange a later + `PollHandle.notify` call for this poll request. + + If *poll_handle* is not `None`, the kernel has provided a notification + handle that may be used to wake waiters if readiness changes after this + method returns. The filesystem may store the handle and later call + `PollHandle.notify` when a relevant event becomes available. Each + `~Operations.poll` call produces a fresh handle; storing a new handle + should replace any previously held one, allowing the old handle to be + destroyed. + + If this method raises `FUSEError(errno.ENOSYS)` (the default), the + kernel will fall back to a default poll implementation and will not call + this handler again for the lifetime of the mount. + ''' + + raise FUSEError(errno.ENOSYS) + async def opendir(self, inode: InodeT, ctx: "RequestContext") -> FileHandleT: '''Open the directory with inode *inode*. diff --git a/src/pyfuse3/handlers.pxi b/src/pyfuse3/handlers.pxi index a424be2..50114e0 100644 --- a/src/pyfuse3/handlers.pxi +++ b/src/pyfuse3/handlers.pxi @@ -836,6 +836,39 @@ async def fuse_access_async (_Container c): +cdef void fuse_poll (fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi, + fuse_pollhandle *ph): + cdef _Container c = _Container() + cdef object py_ph + c.req = req + c.ino = ino + if fi is NULL: + c.fh = 0 + else: + c.fh = fi.fh + if ph == NULL: + py_ph = None + else: + py_ph = PollHandle.from_ptr(ph) + save_retval(fuse_poll_async(c, py_ph)) + +async def fuse_poll_async (_Container c, object py_ph): + cdef int ret + cdef unsigned revents + + ctx = get_request_context(c.req) + try: + result = await operations.poll(c.ino, c.fh, py_ph, ctx) + except FUSEError as e: + ret = fuse_reply_err(c.req, e.errno) + else: + revents = (result if result is not None else 0) + ret = fuse_reply_poll(c.req, revents) + + if ret != 0: + log.error('fuse_poll(): fuse_reply_* failed with %s', strerror(-ret)) + + cdef void fuse_create (fuse_req_t req, fuse_ino_t parent, const_char *name, mode_t mode, fuse_file_info *fi): cdef _Container c = _Container() diff --git a/src/pyfuse3/internal.pxi b/src/pyfuse3/internal.pxi index 336c0b1..08a7dd5 100644 --- a/src/pyfuse3/internal.pxi +++ b/src/pyfuse3/internal.pxi @@ -69,6 +69,7 @@ cdef void init_fuse_ops(): fuse_ops.create = fuse_create fuse_ops.forget_multi = fuse_forget_multi fuse_ops.write_buf = fuse_write_buf + fuse_ops.poll = fuse_poll cdef make_fuse_args(args, fuse_args* f_args): cdef char* arg diff --git a/test/test_fs.py b/test/test_fs.py index ed1bbe0..8f4d99c 100755 --- a/test/test_fs.py +++ b/test/test_fs.py @@ -21,6 +21,7 @@ import logging import multiprocessing import os +import select import stat import threading import time @@ -34,6 +35,7 @@ FileInfo, FUSEError, InodeT, + PollHandle, ReaddirToken, RequestContext, ) @@ -59,11 +61,20 @@ def get_mp(): @pytest.fixture() def testfs(tmpdir): + yield from _mount_fs(tmpdir, Fs) + + +@pytest.fixture() +def pollfs(tmpdir): + yield from _mount_fs(tmpdir, PollTestFs) + + +def _mount_fs(tmpdir, fs_class): mnt_dir = str(tmpdir) mp = get_mp() with mp.Manager() as mgr: cross_process = mgr.Namespace() - mount_process = mp.Process(target=run_fs, args=(mnt_dir, cross_process)) + mount_process = mp.Process(target=run_fs, args=(mnt_dir, cross_process, fs_class)) mount_process.start() try: @@ -118,6 +129,38 @@ def test_notify_store(testfs): assert not fs_state.read_called +def test_notify_poll(pollfs): + (mnt_dir, fs_state) = pollfs + path = os.path.join(mnt_dir, 'message') + + with open(path, 'rb', buffering=0) as fh: + poller = select.poll() + poller.register(fh.fileno(), select.POLLPRI) + + events = [] + + def poll_wait(): + events.extend(poller.poll(5000)) + + thread = threading.Thread(target=poll_wait) + thread.start() + + deadline = time.monotonic() + 5 + while time.monotonic() < deadline and not fs_state.poll_handle_received: + time.sleep(0.01) + + assert fs_state.poll_called + assert fs_state.poll_handle_received + assert not events + + pyfuse3.setxattr(path, 'command', b'poll_ready') + thread.join(5) + assert not thread.is_alive() + assert events + assert events[0][0] == fh.fileno() + assert events[0][1] & select.POLLPRI + + def test_entry_timeout(testfs): (mnt_dir, fs_state) = testfs fs_state.entry_timeout = 1 @@ -267,11 +310,57 @@ async def setxattr(self, inode, name, value, ctx): elif value == b'terminate': pyfuse3.terminate() + else: raise FUSEError(errno.EINVAL) -def run_fs(mountpoint, cross_process): +class PollTestFs(Fs): + def __init__(self, cross_process): + super().__init__(cross_process) + self.poll_handle: PollHandle | None = None + self.status.poll_called = False + self.status.poll_handle_received = False + self.status.poll_ready = False + + async def poll( + self, + inode: InodeT, + fh: FileHandleT, + poll_handle: PollHandle | None, + ctx: RequestContext, + ) -> int: + assert inode == self.hello_inode + assert fh == self.hello_inode + + self.status.poll_called = True + + if poll_handle is not None: + self.poll_handle = poll_handle + self.status.poll_handle_received = True + + if self.status.poll_ready: + return select.POLLPRI + + return 0 + + async def setxattr(self, inode, name, value, ctx): + if value != b"poll_ready": + return await super().setxattr(inode, name, value, ctx) + + if inode != self.hello_inode or name != b"command": + raise FUSEError(errno.ENOTSUP) + + self.status.poll_ready = True + + if self.poll_handle is None: + raise FUSEError(errno.EINVAL) + + self.poll_handle.notify() + self.poll_handle = None + + +def run_fs(mountpoint, cross_process, fs_class=Fs): # Logging (note that we run in a new process, so we can't # rely on direct log capture and instead print to stdout) root_logger = logging.getLogger() @@ -285,7 +374,7 @@ def run_fs(mountpoint, cross_process): root_logger.addHandler(handler) root_logger.setLevel(logging.DEBUG) - testfs = Fs(cross_process) + testfs = fs_class(cross_process) fuse_options = set(pyfuse3.default_options) fuse_options.add('fsname=pyfuse3_testfs') pyfuse3.init(testfs, mountpoint, fuse_options) diff --git a/uv.lock b/uv.lock index f928497..37d5d18 100644 --- a/uv.lock +++ b/uv.lock @@ -1194,11 +1194,11 @@ wheels = [ [[package]] name = "urllib3" -version = "2.6.3" +version = "2.7.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" } +sdist = { url = "https://files.pythonhosted.org/packages/53/0c/06f8b233b8fd13b9e5ee11424ef85419ba0d8ba0b3138bf360be2ff56953/urllib3-2.7.0.tar.gz", hash = "sha256:231e0ec3b63ceb14667c67be60f2f2c40a518cb38b03af60abc813da26505f4c", size = 433602, upload-time = "2026-05-07T16:13:18.596Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" }, + { url = "https://files.pythonhosted.org/packages/7f/3e/5db95bcf282c52709639744ca2a8b149baccf648e39c8cc87553df9eae0c/urllib3-2.7.0-py3-none-any.whl", hash = "sha256:9fb4c81ebbb1ce9531cce37674bbc6f1360472bc18ca9a553ede278ef7276897", size = 131087, upload-time = "2026-05-07T16:13:17.151Z" }, ] [[package]]