Skip to content

Commit 24b9fb5

Browse files
committed
first version of write_levels() function (uses xcube)
1 parent c5270c5 commit 24b9fb5

File tree

2 files changed

+249
-0
lines changed

2 files changed

+249
-0
lines changed

tests/test_levels.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Copyright © 2024 Norman Fomferra and contributors
2+
# Permissions are hereby granted under the terms of the MIT License:
3+
# https://opensource.org/licenses/MIT.
4+
import json
5+
import unittest
6+
7+
8+
from zappend.fsutil import FileObj
9+
from zappend.levels import write_levels
10+
from zappend.levels import get_variables_config
11+
from .helpers import clear_memory_fs
12+
from .helpers import make_test_dataset
13+
14+
try:
15+
import xcube
16+
except:
17+
xcube = None
18+
19+
20+
class GetVariablesConfigTest(unittest.TestCase):
21+
def test_it(self):
22+
dataset = make_test_dataset()
23+
variables = get_variables_config(dataset, dict(x=512, y=256, time=1))
24+
self.assertEqual(
25+
{
26+
"x": {"encoding": {"chunks": None}},
27+
"y": {"encoding": {"chunks": None}},
28+
"time": {"encoding": {"chunks": None}},
29+
"chl": {"encoding": {"chunks": [1, 256, 512]}},
30+
"tsm": {"encoding": {"chunks": [1, 256, 512]}},
31+
},
32+
variables,
33+
)
34+
35+
36+
@unittest.skipIf(xcube is None, reason="xcube is not installed")
37+
class WriteLevelsTest(unittest.TestCase):
38+
def setUp(self):
39+
clear_memory_fs()
40+
41+
def test_it(self):
42+
source_path = "memory://source.zarr"
43+
make_test_dataset(
44+
uri=source_path,
45+
dims=("time", "y", "x"),
46+
shape=(3, 1024, 2048),
47+
chunks=(1, 128, 256),
48+
crs="EPSG:4326",
49+
)
50+
51+
target_dir = FileObj("memory://target.levels")
52+
self.assertFalse(target_dir.exists())
53+
54+
write_levels(source_path=source_path, target_path=target_dir.uri)
55+
56+
self.assertTrue(target_dir.exists())
57+
58+
levels_file = target_dir.for_path(".zlevels")
59+
self.assertTrue(levels_file.exists())
60+
levels_info = json.loads(levels_file.read())
61+
self.assertEqual(
62+
{
63+
"version": "1.0",
64+
"num_levels": 4,
65+
"agg_methods": {"chl": "mean", "tsm": "mean"},
66+
"use_saved_levels": False,
67+
},
68+
levels_info,
69+
)
70+
71+
# ds = xr.open_zarr(target_dir.uri + "/0.zarr")
72+
# self.assertEqual({"time": 1, "y": 10, "x": 20}, ds.sizes)
73+
# self.assertEqual({"x", "y", "time", "chl", "tsm"}, set(ds.variables))

zappend/levels.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# Copyright © 2024 Norman Fomferra and contributors
2+
# Permissions are hereby granted under the terms of the MIT License:
3+
# https://opensource.org/licenses/MIT.
4+
5+
import json
6+
import logging
7+
from typing import Any, Hashable
8+
9+
import xarray as xr
10+
import fsspec
11+
12+
from zappend.api import zappend
13+
14+
15+
def write_levels(
16+
source_path: str,
17+
source_storage_options: dict[str, Any] | None = None,
18+
target_path: str | None = None,
19+
num_levels: int | None = None,
20+
agg_methods: dict[str, Any] | None = None,
21+
use_saved_levels: bool = False,
22+
link_level_zero: bool = False,
23+
xy_dim_names: tuple[str, str] | None = None,
24+
tile_size: tuple[int, int] | None = None,
25+
**zappend_config,
26+
):
27+
from xcube.core.tilingscheme import get_num_levels
28+
from xcube.core.gridmapping import GridMapping
29+
from xcube.core.subsampling import get_dataset_agg_methods
30+
from xcube.core.subsampling import subsample_dataset
31+
32+
target_dir = zappend_config.pop("target_dir", None)
33+
if not target_dir and not target_path:
34+
raise ValueError("either 'target_dir' or 'target_path' can be given, not both")
35+
if target_dir and target_path and target_dir != target_path:
36+
raise ValueError("either 'target_dir' or 'target_path' can be given, not both")
37+
target_path = target_path or target_dir
38+
target_storage_options = zappend_config.pop(
39+
"target_storage_options", source_storage_options or {}
40+
)
41+
target_fs, target_root = fsspec.core.url_to_fs(
42+
target_path, **target_storage_options
43+
)
44+
45+
source_fs, source_root = fsspec.core.url_to_fs(
46+
source_path,
47+
**(
48+
source_storage_options
49+
if source_storage_options is not None
50+
else target_storage_options
51+
),
52+
)
53+
source_store = source_fs.get_mapper(root=source_root)
54+
source_ds = xr.open_zarr(source_store)
55+
56+
logger = logging.getLogger("zappend")
57+
58+
grid_mapping: GridMapping | None = None
59+
60+
if xy_dim_names is None:
61+
grid_mapping = grid_mapping or GridMapping.from_dataset(source_ds)
62+
xy_dim_names = grid_mapping.xy_dim_names
63+
64+
if tile_size is None:
65+
grid_mapping = grid_mapping or GridMapping.from_dataset(source_ds)
66+
tile_size = grid_mapping.tile_size
67+
68+
if num_levels is None:
69+
grid_mapping = grid_mapping or GridMapping.from_dataset(source_ds)
70+
num_levels = get_num_levels(grid_mapping.size, tile_size)
71+
72+
agg_methods = get_dataset_agg_methods(
73+
source_ds,
74+
xy_dim_names=xy_dim_names,
75+
agg_methods=agg_methods,
76+
)
77+
78+
force_new = zappend_config.pop("force_new", None)
79+
80+
append_dim = zappend_config.pop("append_dim", "time")
81+
append_coord = source_ds.coords[append_dim]
82+
83+
variables = get_variables_config(
84+
source_ds,
85+
{
86+
xy_dim_names[0]: tile_size[0],
87+
xy_dim_names[0]: tile_size[1],
88+
append_dim: 1,
89+
},
90+
variables=zappend_config.pop("variables", None),
91+
)
92+
93+
with target_fs.open(f"{target_root}/.zlevels", "wt") as fp:
94+
levels_data: dict[str, Any] = dict(
95+
version="1.0",
96+
num_levels=num_levels,
97+
agg_methods=dict(agg_methods),
98+
use_saved_levels=use_saved_levels,
99+
)
100+
json.dump(levels_data, fp, indent=2)
101+
102+
if link_level_zero:
103+
with target_fs.open(f"{target_root}/0.link", "wt") as fp:
104+
fp.write(source_root)
105+
106+
subsample_dataset_kwargs = dict(xy_dim_names=xy_dim_names, agg_methods=agg_methods)
107+
108+
for slice_index in range(append_coord.size):
109+
slice_ds_indexer = {append_dim: slice(slice_index, slice_index + 1)}
110+
slice_ds = source_ds.isel(slice_ds_indexer)
111+
112+
for level_index in range(num_levels):
113+
if level_index == 0:
114+
level_slice_ds = slice_ds
115+
elif use_saved_levels:
116+
prev_level_path = f"{target_root}/{level_index - 1}.zarr"
117+
prev_level_store = target_fs.get_mapper(root=prev_level_path)
118+
prev_level_ds = xr.open_zarr(prev_level_store)
119+
level_slice_ds = subsample_dataset(
120+
prev_level_ds.isel(slice_ds_indexer),
121+
step=2,
122+
**subsample_dataset_kwargs,
123+
)
124+
else:
125+
level_slice_ds = subsample_dataset(
126+
slice_ds,
127+
step=2**level_index,
128+
**subsample_dataset_kwargs,
129+
)
130+
131+
if not link_level_zero or level_index > 0:
132+
level_slice_path = f"{target_path}/{level_index}.zarr"
133+
zappend(
134+
[level_slice_ds],
135+
target_dir=level_slice_path,
136+
target_storage_options=target_storage_options,
137+
append_dim=append_dim,
138+
force_new=force_new if slice_index == 0 else False,
139+
variables=variables,
140+
**zappend_config,
141+
)
142+
143+
logger.info(f"done writing {target_path}")
144+
145+
146+
def get_variables_config(
147+
dataset: xr.Dataset,
148+
chunk_sizes: dict[Hashable, int],
149+
variables: dict[str, dict[str, Any]] | None = None,
150+
):
151+
"""Define the chunk sizes for the variables in *dataset*.
152+
153+
Args:
154+
dataset: The dataset
155+
chunk_sizes: The chunk sizes
156+
variables: Value of the zappend ``variables``
157+
configuration parameter
158+
Return:
159+
A zappend compatible with the zappend ``variables``
160+
configuration parameter.
161+
"""
162+
var_configs = dict(variables or {})
163+
for var_name, var in dataset.variables.items():
164+
var_name = str(var_name)
165+
var_config = dict(var_configs.get(var_name, {}))
166+
var_encoding = dict(var_config.get("encoding", {}))
167+
var_chunks = var_encoding.get("chunks")
168+
if not var_chunks and var.dims:
169+
if var_name in dataset.coords:
170+
var_chunks = None
171+
else:
172+
var_chunks = [chunk_sizes.get(dim) for dim in var.dims]
173+
var_encoding["chunks"] = var_chunks
174+
var_config["encoding"] = var_encoding
175+
var_configs[var_name] = var_config
176+
return var_configs

0 commit comments

Comments
 (0)