-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathreader.py
More file actions
65 lines (53 loc) · 1.91 KB
/
reader.py
File metadata and controls
65 lines (53 loc) · 1.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""XarrayReader"""
import pickle
from typing import Any, List, Optional
import attr
from titiler.multidim.redis_pool import get_redis
from titiler.multidim.settings import ApiSettings
from titiler.xarray.io import Reader, get_variable, xarray_open_dataset
api_settings = ApiSettings()
cache_client = get_redis()
@attr.s
class XarrayReader(Reader):
"""Custom XarrayReader with redis cache"""
def __attrs_post_init__(self):
"""Set bounds and CRS."""
ds = None
# Generate cache key and attempt to fetch the dataset from cache
cache_key = f"{self.src_path}_group:{self.group}_time:{self.decode_times}"
if api_settings.enable_cache:
data_bytes = cache_client.get(cache_key)
if data_bytes:
print(f"Found dataset in Cache {cache_key}")
ds = pickle.loads(data_bytes)
self.ds = ds or self.opener(
self.src_path,
group=self.group,
decode_times=self.decode_times,
)
if not ds and api_settings.enable_cache:
# Serialize the dataset to bytes using pickle
cache_key = f"{self.src_path}_group:{self.group}_time:{self.decode_times}"
data_bytes = pickle.dumps(self.ds)
print(f"Adding dataset in Cache: {cache_key}")
cache_client.set(cache_key, data_bytes, ex=300)
self.input = get_variable(
self.ds,
self.variable,
sel=self.sel,
)
super().__attrs_post_init__()
@classmethod
def list_variables(
cls,
src_path: str,
group: Optional[Any] = None,
decode_times: bool = True,
) -> List[str]:
"""List available variable in a dataset."""
with xarray_open_dataset(
src_path,
group=group,
decode_times=decode_times,
) as ds:
return list(ds.data_vars) # type: ignore