Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 65 additions & 8 deletions fsspec/implementations/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,66 @@ class MemoryFileSystem(AbstractFileSystem):
in memory filesystem.
"""

store: ClassVar[dict[str, Any]] = {} # global, do not overwrite!
pseudo_dirs = [""] # global, do not overwrite!
global_store: ClassVar[dict[str, Any]] = {} # global, do not overwrite!
global_pseudo_dirs = [""] # global, do not overwrite!
protocol = "memory"
root_marker = "/"
_intrans = False

# never called
# def __call__(cls, *args, **kwargs):
# print("MemoryFileSystem call kwargs", kwargs)
# skip = kwargs.pop("skip_instance_cache", False)
# instance = super().__call__(*args, **kwargs)
# if skip:
# instance.store = {}
# instance.pseudo_dirs = [""]
# else:
# instance.store = instance.global_store
# instance.pseudo_dirs = instance.global_pseudo_dirs
# return instance

# FIXME AttributeError: 'MemoryFileSystem' object has no attribute '_skip_instance_cache'
# def __new__(cls, *args, **kwargs):
# # print("new kwargs", kwargs)
# # skip = kwargs.pop("skip_instance_cache", False)
# instance = super().__new__(cls, *args, **kwargs)
# # FIXME AttributeError: 'MemoryFileSystem' object has no attribute '_skip_instance_cache'
# skip = instance._skip_instance_cache
# print("new skip", skip)
# if skip:
# instance.store = {}
# instance.pseudo_dirs = [""]
# else:
# instance.store = instance.global_store
# instance.pseudo_dirs = instance.global_pseudo_dirs
# return instance

def __init__(self, *args, **kwargs):
# print("MemoryFileSystem init kwargs", kwargs)
super().__init__(*args, **kwargs)
self.logger = logger
# print("MemoryFileSystem init kwargs", kwargs)
# skip = kwargs.pop("skip_instance_cache", False)
# if skip:
# self.store = {}
# self.pseudo_dirs = [""]
# local_memory = kwargs.pop("local_memory", False)
# if local_memory:
# skip = self._skip_instance_cache
skip = kwargs.get("skip_instance_cache", False)
# print("MemoryFileSystem init skip", skip)
# FIXME skip is None
# assert skip in (True, False)
if skip:
# local
self.store = {}
self.pseudo_dirs = [""]
else:
# global
self.store = self.global_store
self.pseudo_dirs = self.global_pseudo_dirs
# super().__init__(*args, **kwargs)

@classmethod
def _strip_protocol(cls, path):
Expand Down Expand Up @@ -147,7 +203,7 @@ def rmdir(self, path):
raise FileNotFoundError(path)

def info(self, path, **kwargs):
logger.debug("info: %s", path)
self.logger.debug("info: %s", path)
path = self._strip_protocol(path)
if path in self.pseudo_dirs or any(
p.startswith(path + "/") for p in list(self.store) + self.pseudo_dirs
Expand Down Expand Up @@ -202,7 +258,7 @@ def _open(
elif mode in {"wb", "xb"}:
if mode == "xb" and self.exists(path):
raise FileExistsError
m = MemoryFile(self, path, kwargs.get("data"))
m = MemoryFile(self, path, kwargs.get("data"), self.logger)
if not self._intrans:
m.commit()
return m
Expand All @@ -215,7 +271,7 @@ def cp_file(self, path1, path2, **kwargs):
path2 = self._strip_protocol(path2)
if self.isfile(path1):
self.store[path2] = MemoryFile(
self, path2, self.store[path1].getvalue()
self, path2, self.store[path1].getvalue(), self.logger
) # implicit copy
elif self.isdir(path1):
if path2 not in self.pseudo_dirs:
Expand All @@ -224,7 +280,7 @@ def cp_file(self, path1, path2, **kwargs):
raise FileNotFoundError(path1)

def cat_file(self, path, start=None, end=None, **kwargs):
logger.debug("cat: %s", path)
self.logger.debug("cat: %s", path)
path = self._strip_protocol(path)
try:
return bytes(self.store[path].getbuffer()[start:end])
Expand Down Expand Up @@ -283,8 +339,9 @@ class MemoryFile(BytesIO):
No need to provide fs, path if auto-committing (default)
"""

def __init__(self, fs=None, path=None, data=None):
logger.debug("open file %s", path)
def __init__(self, fs=None, path=None, data=None, logger=logger):
self.logger = logger
self.logger.debug("open file %s", path)
self.fs = fs
self.path = path
self.created = datetime.now(tz=timezone.utc)
Expand Down
60 changes: 60 additions & 0 deletions fsspec/implementations/tests/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,66 @@
import pytest

from fsspec.implementations.local import LocalFileSystem, make_path_posix
from fsspec.implementations.memory import MemoryFileSystem


def test_identical_instances():
fs1 = MemoryFileSystem()
fs2 = MemoryFileSystem()

assert id(fs1) == id(fs2)
assert id(fs1.store) == id(fs2.store)

fs1.touch("/fs1.txt")
fs2.touch("/fs2.txt")
assert fs1.ls("/", detail=False) == ["/fs1.txt", "/fs2.txt"]
assert fs2.ls("/", detail=False) == ["/fs1.txt", "/fs2.txt"]


def _clear(m):
m.store.clear()
m.pseudo_dirs.clear()
m.pseudo_dirs.append("")


def test_separate_instances_1_1():
# fs1 = MemoryFileSystem(local_memory=True)
# fs2 = MemoryFileSystem(local_memory=True)
# FIXME only one param
# fs1 = MemoryFileSystem(skip_instance_cache=True, local_memory=True)
# fs2 = MemoryFileSystem(skip_instance_cache=True, local_memory=True)
fs1 = MemoryFileSystem(skip_instance_cache=True)
fs2 = MemoryFileSystem(skip_instance_cache=True)
assert id(fs1) != id(fs2)
assert id(fs1.store) != id(fs2.store)
fs1.touch("/fs1.txt")
fs2.touch("/fs2.txt")
assert fs1.ls("/", detail=False) == ["/fs1.txt"]
assert fs2.ls("/", detail=False) == ["/fs2.txt"]


def test_separate_instances_1_0():
fs1 = MemoryFileSystem(skip_instance_cache=True) # local
fs2 = MemoryFileSystem() # global
_clear(fs2)
assert id(fs1) != id(fs2)
assert id(fs1.store) != id(fs2.store)
fs1.touch("/fs1.txt")
fs2.touch("/fs2.txt")
assert fs1.ls("/", detail=False) == ["/fs1.txt"] # local
assert fs2.ls("/", detail=False) == ["/fs2.txt"] # global


def test_separate_instances_0_1():
fs1 = MemoryFileSystem() # global
fs2 = MemoryFileSystem(skip_instance_cache=True) # local
_clear(fs1)
assert id(fs1) != id(fs2)
assert id(fs1.store) != id(fs2.store)
fs1.touch("/fs1.txt")
fs2.touch("/fs2.txt")
assert fs2.ls("/", detail=False) == ["/fs2.txt"] # local
assert fs1.ls("/", detail=False) == ["/fs1.txt"] # global


def test_1(m):
Expand Down
14 changes: 13 additions & 1 deletion fsspec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,17 @@ def __init__(cls, *args, **kwargs):
cls._pid = os.getpid()

def __call__(cls, *args, **kwargs):
# print("_Cached call kwargs", kwargs)
kwargs = apply_config(cls, kwargs)
extra_tokens = tuple(
getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes
)
token = tokenize(
cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
)
skip = kwargs.pop("skip_instance_cache", False)
# skip = kwargs.pop("skip_instance_cache", False)
skip = kwargs.get("skip_instance_cache", False)
assert skip in (True, False)
if os.getpid() != cls._pid:
cls._cache.clear()
cls._pid = os.getpid()
Expand All @@ -81,8 +84,12 @@ def __call__(cls, *args, **kwargs):
obj = super().__call__(*args, **kwargs)
# Setting _fs_token here causes some static linters to complain.
obj._fs_token_ = token
# no. too late. must be passed to init
# obj._skip_instance_cache_ = skip
# print("_Cached call skip", skip)
obj.storage_args = args
obj.storage_options = kwargs
# setattr(obj, "_skip_instance_cache", skip)
if obj.async_impl and obj.mirror_sync_methods:
from .asyn import mirror_sync_methods

Expand Down Expand Up @@ -160,6 +167,7 @@ def __init__(self, *args, **storage_options):
warnings.warn("add_aliases has been removed.", FutureWarning)
# This is set in _Cached
self._fs_token_ = None
# self._skip_instance_cache_ = None

@property
def fsid(self):
Expand All @@ -172,6 +180,10 @@ def fsid(self):
def _fs_token(self):
return self._fs_token_

# @property
# def _skip_instance_cache(self):
# return self._skip_instance_cache_

def __dask_tokenize__(self):
return self._fs_token

Expand Down
Loading