|
2 | 2 | import os |
3 | 3 | import re |
4 | 4 | import json |
5 | | -import numpy as np |
6 | | -from typing import TypeVar, Optional, List, Dict, Callable |
| 5 | +from typing import TypeVar, Union, Optional, List, Dict, Callable, Tuple |
7 | 6 | from pathlib import Path |
8 | 7 | from importlib import import_module |
9 | 8 | from packaging import version |
10 | 9 | from time import sleep |
| 10 | +from functools import lru_cache |
| 11 | + |
| 12 | +import h5py |
| 13 | +import numpy as np |
| 14 | +from numpy.typing import ArrayLike |
| 15 | + |
11 | 16 |
|
12 | 17 | PathType = TypeVar("PathType", str, Path) # For types that can be either files or folders |
13 | 18 | FilePathType = TypeVar("FilePathType", str, Path) |
14 | 19 | OptionalListOfStrings = Optional[List[str]] |
15 | 20 |
|
16 | 21 | dict_regex = r"({.+:.+})" |
| 22 | +MAX_CACHE_ITEMS = 1000 # lru_cache default is 128 calls of matching input/output, but might need more to get use here |
| 23 | + |
| 24 | + |
| 25 | +@lru_cache(maxsize=MAX_CACHE_ITEMS) |
| 26 | +def _cache_data_retrieval_command( |
| 27 | + data: h5py.Dataset, reduced_selection: Tuple[Tuple[Optional[int], Optional[int], Optional[int]]] |
| 28 | +) -> np.ndarray: |
| 29 | + """LRU caching for _cache_data_selection cannot be applied to list inputs; this expects the tuple or Dataset.""" |
| 30 | + selection = tuple([slice(*reduced_slice) for reduced_slice in reduced_selection]) # reconstitute the slices |
| 31 | + return data[selection] |
| 32 | + |
| 33 | + |
| 34 | +def _cache_data_selection(data: Union[h5py.Dataset, ArrayLike], selection: Union[slice, Tuple[slice]]) -> np.ndarray: |
| 35 | + """Extract the selection lazily from the data object for efficient caching (most beneficial during streaming).""" |
| 36 | + if isinstance(data, np.memmap): # Technically np.memmap should be able to support this type of behavior as well |
| 37 | + return data[selection] # But they aren't natively hashable either... |
| 38 | + if not isinstance(data, h5py.Dataset): # No need to attempt to cache if already an in-memory object |
| 39 | + return np.array(data)[selection] |
| 40 | + |
| 41 | + # slices also aren't hashable, but their reduced representation is |
| 42 | + if isinstance(selection, slice): # If a single slice |
| 43 | + reduced_selection = tuple([selection.__reduce__()[1]]) # if a single slice |
| 44 | + else: |
| 45 | + reduced_selection = tuple([selection_slice.__reduce__()[1] for selection_slice in selection]) |
| 46 | + return _cache_data_retrieval_command(data=data, reduced_selection=reduced_selection) |
17 | 47 |
|
18 | 48 |
|
19 | 49 | def format_byte_size(byte_size: int, units: str = "SI"): |
@@ -52,9 +82,12 @@ def check_regular_series(series: np.ndarray, tolerance_decimals: int = 9): |
52 | 82 | return len(uniq_diff_ts) == 1 |
53 | 83 |
|
54 | 84 |
|
55 | | -def is_ascending_series(series: np.ndarray, nelems=None): |
| 85 | +def is_ascending_series(series: Union[h5py.Dataset, ArrayLike], nelems=None): |
56 | 86 | """General purpose function for determining if a series is monotonic increasing.""" |
57 | | - return np.all(np.diff(series[:nelems]) > 0) |
| 87 | + if isinstance(series, h5py.Dataset): |
| 88 | + return np.all(np.diff(_cache_data_selection(data=series, selection=slice(nelems))) > 0) |
| 89 | + else: |
| 90 | + return np.all(np.diff(series[:nelems]) > 0) # already in memory, no need to cache |
58 | 91 |
|
59 | 92 |
|
60 | 93 | def is_dict_in_string(string: str): |
|
0 commit comments