Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 106 additions & 1 deletion hf3fs_fuse/io.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,95 @@
import hf3fs_py_usrbio as h3fio
from hf3fs_py_usrbio import register_fd, deregister_fd, force_fsync, extract_mount_point, hardlink, punch_hole

import atexit
import multiprocessing.shared_memory
import os
import os.path
import threading
import weakref
from uuid import uuid4

# Global registry of active iovec symlinks for cleanup on exit
# Uses weak references to avoid preventing garbage collection
_active_symlinks_lock = threading.Lock()
_active_symlinks = set()


def _cleanup_symlinks():
"""Cleanup handler registered with atexit to remove orphaned symlinks."""
with _active_symlinks_lock:
for link in list(_active_symlinks):
try:
if os.path.islink(link):
os.unlink(link)
except OSError:
pass # Best effort cleanup
_active_symlinks.clear()


# Register cleanup handler once at module load
atexit.register(_cleanup_symlinks)


class iovec:
"""
I/O vector wrapper for shared memory operations in 3FS.

Manages a symlink in the 3FS virtual filesystem that points to shared memory.
Supports context manager protocol for safe resource management.

Usage:
# Preferred: using context manager
with make_iovec(shm, mount_point) as iov:
# use iov
pass
# symlink is automatically cleaned up

# Alternative: manual management
iov = make_iovec(shm, mount_point)
try:
# use iov
finally:
iov.close()
"""

def __init__(self, iov, link):
self.iov = iov
self.link = link
self._closed = False

# Register symlink for cleanup on exit
with _active_symlinks_lock:
_active_symlinks.add(link)

def close(self):
"""Explicitly close and cleanup the iovec, removing the symlink."""
if self._closed:
return
self._closed = True

# Remove from global registry
with _active_symlinks_lock:
_active_symlinks.discard(self.link)

# Remove the symlink
try:
if os.path.islink(self.link):
os.unlink(self.link)
except OSError:
pass # Best effort

def __del__(self):
os.unlink(self.link)
self.close()

def __enter__(self):
"""Context manager entry."""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - ensures cleanup."""
self.close()
return False

def __getitem__(self, slice):
return self.iov[slice]
Expand All @@ -21,8 +98,36 @@ def __setitem__(self, slice, val):
self.iov[slice] = val

class ioring:
"""
I/O ring wrapper for batched I/O operations in 3FS.

Supports context manager protocol for safe resource management.

Usage:
with make_ioring(mount_point, entries) as ior:
ior.prepare(iov, ...)
ior.submit().wait()
"""

def __init__(self, ior):
self.ior = ior
self._closed = False

def close(self):
"""Explicitly close the ioring."""
if self._closed:
return
self._closed = True
# The underlying ior object will be garbage collected

def __enter__(self):
"""Context manager entry."""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.close()
return False

@staticmethod
def size_for_entries(entries):
Expand Down