Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 44 additions & 17 deletions fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,16 @@ def create(root, storage_options=None, fs=None, record_size=10000, **kwargs):
fs.pipe("/".join([root, ".zmetadata"]), json.dumps(met).encode())
return LazyReferenceMapper(root, fs, **kwargs)

@lru_cache
@lru_cache()
@staticmethod
def _listdir(zmetadata):
"""Cached static helper that lists top-level directories"""
dirs = (p.rsplit("/", 1)[0] for p in zmetadata if not p.startswith(".z"))
return set(dirs)

def listdir(self):
"""List top-level directories"""
dirs = (p.rsplit("/", 1)[0] for p in self.zmetadata if not p.startswith(".z"))
return set(dirs)
return LazyReferenceMapper._listdir(self.zmetadata)

def ls(self, path="", detail=True):
"""Shortcut file listings"""
Expand Down Expand Up @@ -332,27 +337,49 @@ def _load_one_key(self, key):
return selection[:3]

@lru_cache(4096)
def _key_to_record(self, key):
"""Details needed to construct a reference for one key"""
@staticmethod
def _key_to_record_static(chunk_sizes, zmetadata, record_size, key):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, silly me, dictionaries are unhashable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I look at this, the more I think the original simple solution might be the best.

Apparently, you can use freezedict which is hashable, but that might be a problem because _get_chunk_sizes actually mutates self.chunk_sizes (or more specifically adds a key-val pair to it, it should never mutate existing key).

I think, assuming that the object attributes won't change once the cache entry has been saved might be the easiest way forward. Of course, we should note that in a comment. And go back to

        self.listdir = lru_cache()(self.listdir)
        self._key_to_record = lru_cache(maxsize=4096)(self._key_to_record)

"""Details needed to construct a reference for one key.

This mehotd is static and cached.
"""
field, chunk = key.rsplit("/", 1)
chunk_sizes = self._get_chunk_sizes(field)
if len(chunk_sizes) == 0:
field_chunk_sizes = LazyReferenceMapper._get_chunk_sizes_static(
chunk_sizes, zmetadata, field
)
if len(field_chunk_sizes) == 0:
return 0, 0, 0
chunk_idx = [int(c) for c in chunk.split(".")]
chunk_number = ravel_multi_index(chunk_idx, chunk_sizes)
record = chunk_number // self.record_size
ri = chunk_number % self.record_size
return record, ri, len(chunk_sizes)
chunk_number = ravel_multi_index(chunk_idx, field_chunk_sizes)
record = chunk_number // record_size
ri = chunk_number % record_size
return record, ri, len(field_chunk_sizes)

def _get_chunk_sizes(self, field):
"""The number of chunks along each axis for a given field"""
if field not in self.chunk_sizes:
zarray = self.zmetadata[f"{field}/.zarray"]
def _key_to_record(self, key):
"""Details needed to construct a reference for one key"""
return LazyReferenceMapper._key_to_record_static(
self.chunk_sizes, self.zmetadata, self.record_size, key
)

@staticmethod
def _get_chunk_sizes_static(chunk_sizes, zmetadata, field):
"""The number of chunks along each axis for a given field.

This method is static.
"""
if field not in chunk_sizes:
zarray = zmetadata[f"{field}/.zarray"]
size_ratio = [
math.ceil(s / c) for s, c in zip(zarray["shape"], zarray["chunks"])
]
self.chunk_sizes[field] = size_ratio or [1]
return self.chunk_sizes[field]
chunk_sizes[field] = size_ratio or [1]
return chunk_sizes[field]

def _get_chunk_sizes(self, field):
"""The number of chunks along each axis for a given field"""
return LazyReferenceMapper._get_chunk_sizes_static(
self.chunk_sizes, self.zmetadata, field
)

def _generate_record(self, field, record):
"""The references for a given parquet file of a given field"""
Expand Down
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ select = [
ignore = [
# Loop control variable `loop` not used within loop body
"B007",
# Use of `functools.lru_cache` or `functools.cache` on methods can lead to memory leaks
"B019",
# Star-arg unpacking after a keyword argument is strongly discouraged
"B026",
# No explicit `stacklevel` keyword argument found
Expand Down
Loading