Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 32 additions & 52 deletions fsspec_python/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@

import inspect

from fsspec import AbstractFileSystem, filesystem

from .importer import install_importer, uninstall_importer
from fsspec import filesystem, register_implementation
from fsspec.implementations.chained import ChainedFileSystem

__all__ = ("PythonFileSystem",)


class PythonFileSystem(AbstractFileSystem):
class PythonFileSystem(ChainedFileSystem):
"""Python import filesystem"""

def __init__(self, target_protocol=None, target_options=None, fs=None, **kwargs):
protocol: str = "python"
root: str = "/"

def __init__(self, target_protocol=None, target_options=None, fs=None, install: bool = True, **kwargs):
"""
Args:
target_protocol: str (optional) Target filesystem protocol. Provide either this or ``fs``.
Expand All @@ -29,62 +31,37 @@ def __init__(self, target_protocol=None, target_options=None, fs=None, **kwargs)
self.target_protocol = (
target_protocol if isinstance(target_protocol, str) else (fs.protocol if isinstance(fs.protocol, str) else fs.protocol[0])
)

self.fs = fs if fs is not None else filesystem(target_protocol, **target_options)
self.root = kwargs.get("fo", "") or "/"

if install:
from .importer import install_importer

if target_protocol and kwargs.get("fo"):
install_importer(f"{self.target_protocol}://{kwargs['fo']}", **target_options)
else:
install_importer(self.fs, **target_options, **kwargs)
install_importer(fs=self, **kwargs)

def close(self):
uninstall_importer(self.target_protocol)
self.fs.close()
super().close()
def exit(self):
from .importer import uninstall_importer

uninstall_importer(self)
if hasattr(self, "fs") and self.fs is not None and hasattr(self.fs, "exit"):
self.fs.exit()

def __getattribute__(self, item):
if item in {
"__doc__",
"__init__",
"__getattribute__",
"__reduce__",
"_make_local_details",
"open",
"cat",
"cat_file",
"_cat_file",
"cat_ranges",
"_cat_ranges",
"get",
"read_block",
"tail",
"head",
"info",
"ls",
"exists",
"isfile",
"isdir",
"_check_file",
"_check_cache",
"_mkcache",
"clear_cache",
"clear_expired_cache",
"pop_from_cache",
"local_file",
"_paths_from_path",
"get_mapper",
"open_many",
"commit_many",
"hash_name",
"__hash__",
"__eq__",
"to_json",
"to_dict",
"cache_size",
"pipe_file",
"pipe",
"start_transaction",
"end_transaction",
"__module__",
"__new__",
"exit",
"fs",
"protocol",
"registered_name",
"target_protocol",
}:
return object.__getattribute__(self, item)

# Otherwise pull it out of dict
d = object.__getattribute__(self, "__dict__")
fs = d.get("fs", None) # fs is not immediately defined
if item in d:
Expand All @@ -102,3 +79,6 @@ def __getattribute__(self, item):
return m # class method or attribute
# attributes of the superclass, while target is being set up
return super().__getattribute__(item)


register_implementation(PythonFileSystem.protocol, PythonFileSystem)
93 changes: 50 additions & 43 deletions fsspec_python/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
from importlib.machinery import SOURCE_SUFFIXES, ModuleSpec
from os.path import join
from types import ModuleType
from typing import TYPE_CHECKING, Dict
from typing import TYPE_CHECKING, Dict, Union

from fsspec import url_to_fs
from fsspec.implementations.local import AbstractFileSystem

from .fs import PythonFileSystem
from .utils import normalize_fsspec

if TYPE_CHECKING:
from collections.abc import Sequence

Expand All @@ -23,88 +26,92 @@


class FSSpecImportFinder(MetaPathFinder):
def __init__(self, fsspec: str, **fsspec_args: str) -> None:
self.fsspec_fs: AbstractFileSystem
self.root: str
if isinstance(fsspec, AbstractFileSystem):
self.fsspec_fs = fsspec
self.root = fsspec_args.get("fo", fsspec.root_marker)
else:
self.fsspec_fs, self.root = url_to_fs(fsspec, **fsspec_args)
def __init__(self, fs: PythonFileSystem) -> None:
self.fs: PythonFileSystem = fs
self.remote_modules: dict[str, str] = {}

def find_spec(self, fullname: str, path: Sequence[str | bytes] | None, target: ModuleType | None = None) -> ModuleSpec | None:
for suffix in SOURCE_SUFFIXES:
filename = join(self.root, fullname.split(".")[-1] + suffix)
if not self.fsspec_fs.exists(filename):
filename = join(self.fs.root, fullname.split(".")[-1] + suffix)
if not self.fs.exists(filename):
continue
self.remote_modules[fullname] = ModuleSpec(
name=fullname, loader=FSSpecImportLoader(fullname, filename, self.fsspec_fs), origin=filename, is_package=False
name=fullname, loader=FSSpecImportLoader(fullname, filename, self.fs), origin=filename, is_package=False
)
return self.remote_modules[fullname]
return None

def unload(self) -> None:
# unimport all remote modules from sys.modules
for mod in self.remote_modules:
if mod in sys.modules:
del sys.modules[mod]
# TODO: what if imported by another?
sys.modules.pop(mod, None)
self.remote_modules = {}

def __eq__(self, other: object) -> bool:
if not isinstance(other, FSSpecImportFinder):
return False
return self.fs == other.fs


# Singleton for use elsewhere
_finders: Dict[str, FSSpecImportFinder] = {}


class FSSpecImportLoader(SourceLoader):
def __init__(self, fullname: str, path: str, fsspec_fs: AbstractFileSystem):
def __init__(self, fullname: str, path: str, fs: PythonFileSystem):
self.fullname = fullname
self.path = path
self.fsspec_fs = fsspec_fs
self.fs = fs

def get_filename(self, fullname: str) -> str: # noqa: ARG002
return self.path

def get_data(self, path: str | bytes) -> bytes:
with self.fsspec_fs.open(path, "rb") as f:
with self.fs.open(path, "rb") as f:
return f.read()

# def exec_module(self, module: ModuleType) -> None:
# source_bytes = self.get_data(self.get_filename(self.fullname))
# source = source_bytes.decode("utf-8")


def install_importer(fsspec: str, **fsspec_args: str) -> FSSpecImportFinder:
"""Install the fsspec importer.

Args:
fsspec: fsspec filesystem string
Returns: The finder instance that was installed.
"""
if isinstance(fsspec, AbstractFileSystem):
# Reassemble fsspec and args
fsspec = f"{fsspec.protocol if isinstance(fsspec.protocol, str) else fsspec.protocol[0]}://{fsspec.root_marker}"
fsspec_args = fsspec_args or {}
def install_importer(fs: Union[str, AbstractFileSystem], **kwargs: str) -> FSSpecImportFinder:
"""Install the fsspec importer."""
if isinstance(fs, AbstractFileSystem):
fsspec_str = normalize_fsspec(fs=fs, **kwargs)
elif not isinstance(fs, str):
raise ValueError("fs must be a string or AbstractFileSystem instance")
else:
fsspec_str = fs
assert "fo" not in kwargs, "fo cannot be used with string fs"
fs, kwargs["fo"] = url_to_fs(fsspec_str)

global _finders
if fsspec in _finders:
return _finders[fsspec]
_finders[fsspec] = FSSpecImportFinder(fsspec, **fsspec_args)
sys.meta_path.insert(0, _finders[fsspec])
return _finders[fsspec]
if fsspec_str not in _finders:
python_fs = fs if isinstance(fs, PythonFileSystem) else PythonFileSystem(fs=fs, install=False, **kwargs)

finder = FSSpecImportFinder(python_fs)
_finders[fsspec_str] = finder
sys.meta_path.insert(0, finder)
return _finders[fsspec_str].fs


def uninstall_importer(fsspec: str = "") -> None:
def uninstall_importer(fs: Union[str, AbstractFileSystem] = "") -> None:
"""Uninstall the fsspec importer."""
global _finders
if not fsspec:
# clear last
if not _finders:
return
fsspec = list(_finders.keys())[-1]
if fsspec in _finders:
finder = _finders[fsspec]
del _finders[fsspec]
if finder in sys.meta_path:
if not _finders:
return

# clear last
fs = list(_finders.keys())[-1] if not fs else fs
fsspec_str = normalize_fsspec(fs=fs) if isinstance(fs, AbstractFileSystem) else fs

if fsspec_str in _finders:
finder = _finders.pop(fsspec_str, None)
if finder:
finder.unload()
if finder in sys.meta_path:
sys.meta_path.remove(finder)
return
raise ValueError(f"No importer found for {fsspec_str}")
43 changes: 35 additions & 8 deletions fsspec_python/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,66 @@
import os
import sys
from pathlib import Path

import pytest
from fsspec import open
from fsspec import url_to_fs

from fsspec_python import install_importer, uninstall_importer
from fsspec_python import PythonFileSystem, install_importer, install_open_hook, uninstall_importer, uninstall_open_hook


@pytest.fixture()
def s3_importer():
# For coverage
uninstall_importer()

sys_meta_path_length = len(sys.meta_path)
if not os.environ.get("FSSPEC_S3_ENDPOINT_URL"):
pytest.skip("S3 not configured")
install_importer("s3://timkpaine-public/projects/fsspec-python")
yield
fs = install_importer("s3://timkpaine-public/projects/fsspec-python")
if len(sys.meta_path) != sys_meta_path_length + 1:
# reset, some others get registered
sys_meta_path_length = len(sys.meta_path) - 1
assert len(sys.meta_path) == sys_meta_path_length + 1
yield fs.fs
uninstall_importer()
assert len(sys.meta_path) == sys_meta_path_length


@pytest.fixture()
def local_importer():
sys_meta_path_length = len(sys.meta_path)
install_importer(f"file://{Path(__file__).parent}/local")
yield
uninstall_importer()
assert len(sys.meta_path) == sys_meta_path_length


@pytest.fixture()
def open_hook():
from fsspec_python import install_open_hook, uninstall_open_hook
def local_importer_multi():
sys_meta_path_length = len(sys.meta_path)
install_importer(f"file://{Path(__file__).parent}/local")
# install_importer(f"file://{Path(__file__).parent}/local2")
pfs = PythonFileSystem(target_protocol="file", fo=f"{Path(__file__).parent}/local2")
install_importer(pfs)
yield
uninstall_importer()
uninstall_importer()
assert len(sys.meta_path) == sys_meta_path_length


@pytest.fixture()
def open_hook():
sys_meta_path_length = len(sys.meta_path)
install_open_hook(f"file://{Path(__file__).parent}/dump/")
yield
uninstall_open_hook()
assert len(sys.meta_path) == sys_meta_path_length


@pytest.fixture()
def fs_importer():
fs = open(f"python::file://{Path(__file__).parent}/local2")
sys_meta_path_length = len(sys.meta_path)
fs, _ = url_to_fs(f"python::file://{Path(__file__).parent}/local2")
yield fs
fs.close()
fs.exit()
assert len(sys.meta_path) == sys_meta_path_length
Loading