Skip to content

Commit 056f2f7

Browse files
committed
Introduced keyword arguments source_ds and source_append_offset
1 parent 732f355 commit 056f2f7

File tree

4 files changed

+215
-38
lines changed

4 files changed

+215
-38
lines changed

tests/contrib/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Copyright © 2024 Norman Fomferra and contributors
2+
# Permissions are hereby granted under the terms of the MIT License:
3+
# https://opensource.org/licenses/MIT.
Lines changed: 142 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
import xarray as xr
1010

1111
from zappend.fsutil import FileObj
12-
from zappend.levels import write_levels
13-
from zappend.levels import get_variables_config
14-
from .helpers import clear_memory_fs
15-
from .helpers import make_test_dataset
12+
from zappend.contrib import write_levels
13+
from zappend.contrib.levels import get_variables_config
14+
from tests.helpers import clear_memory_fs
15+
from tests.helpers import make_test_dataset
1616

1717
try:
1818
# noinspection PyUnresolvedReferences
@@ -71,29 +71,30 @@ def test_variables_given(self):
7171
)
7272

7373

74+
source_path = "memory://source.zarr"
75+
target_path = "memory://target.levels"
76+
77+
78+
# noinspection PyMethodMayBeStatic
7479
@unittest.skipIf(xcube is None, reason="xcube is not installed")
75-
class WriteLevelsTest(unittest.TestCase):
80+
class WriteLevelsArgsTest(unittest.TestCase):
7681
def setUp(self):
7782
clear_memory_fs()
78-
79-
# noinspection PyMethodMayBeStatic
80-
def test_args_validation(self):
81-
source_path = "memory://source.zarr"
82-
target_path = "memory://target.levels"
83-
8483
make_test_dataset(
8584
uri=source_path,
8685
dims=("time", "lat", "lon"),
8786
shape=(3, 1024, 2048),
8887
chunks=(1, 128, 256),
8988
)
9089

90+
def test_target_path_not_given(self):
9191
with pytest.raises(
9292
ValueError,
9393
match="missing 'target_path' argument",
9494
):
9595
write_levels(source_path=source_path)
9696

97+
def test_target_dir_and_target_path_given(self):
9798
with pytest.raises(
9899
ValueError,
99100
match="either 'target_dir' or 'target_path' can be given, not both",
@@ -104,6 +105,7 @@ def test_args_validation(self):
104105
target_dir=target_path,
105106
)
106107

108+
def test_config_given(self):
107109
with pytest.raises(
108110
TypeError,
109111
match="write_levels\\(\\) got an unexpected keyword argument 'config'",
@@ -114,6 +116,7 @@ def test_args_validation(self):
114116
config={"dry_run": True},
115117
)
116118

119+
def test_dry_run_and_use_saved_levels_given(self):
117120
with pytest.raises(
118121
FileNotFoundError,
119122
match="Target parent directory does not exist: /target.levels",
@@ -129,8 +132,60 @@ def test_args_validation(self):
129132
use_saved_levels=True,
130133
)
131134

135+
def test_source_path_and_source_ds_not_given(self):
136+
with pytest.raises(
137+
TypeError,
138+
match="'source_ds' argument must be of type 'xarray.Dataset',"
139+
" but was 'NoneType'",
140+
):
141+
write_levels(
142+
target_path=target_path,
143+
)
144+
145+
def test_source_path_not_given_and_link_level_zero_is_true(self):
146+
with pytest.raises(
147+
ValueError,
148+
match="'source_path' argument must be provided"
149+
" if 'link_level_zero' is used",
150+
):
151+
write_levels(
152+
source_ds=xr.Dataset(), target_path=target_path, link_level_zero=True
153+
)
154+
155+
def test_source_append_offset_not_int(self):
156+
with pytest.raises(
157+
TypeError,
158+
match="'source_append_offset' argument must be of type 'int',"
159+
" but was 'str'",
160+
):
161+
# noinspection PyTypeChecker
162+
write_levels(
163+
source_ds=xr.open_zarr("memory://source.zarr"),
164+
source_append_offset="2",
165+
target_path=target_path,
166+
)
167+
168+
def test_source_append_offset_out_of_range(self):
169+
with pytest.raises(
170+
ValueError,
171+
match="'source_append_offset' argument must be >=0 and <3, but was 13",
172+
):
173+
# noinspection PyTypeChecker
174+
write_levels(
175+
source_ds=xr.open_zarr("memory://source.zarr"),
176+
source_append_offset=13,
177+
target_path=target_path,
178+
)
179+
180+
181+
@unittest.skipIf(xcube is None, reason="xcube is not installed")
182+
class WriteLevelsTest(unittest.TestCase):
183+
def setUp(self):
184+
clear_memory_fs()
185+
186+
# noinspection PyMethodMayBeStatic
187+
132188
def test_force_new(self):
133-
source_path = "memory://source.zarr"
134189
make_test_dataset(
135190
uri=source_path,
136191
dims=("time", "lat", "lon"),
@@ -145,12 +200,13 @@ def test_force_new(self):
145200
(target_dir / "0.zarr" / ".zgroup").write("{}")
146201
self.assertTrue(target_dir.exists())
147202

148-
write_levels(source_path=source_path, target_path=target_dir.uri, force_new=True)
203+
write_levels(
204+
source_path=source_path, target_path=target_dir.uri, force_new=True
205+
)
149206

150207
self.assertTrue(target_dir.exists())
151208

152209
def test_default_x_y_with_crs(self):
153-
source_path = "memory://source.zarr"
154210
make_test_dataset(
155211
uri=source_path,
156212
dims=("time", "y", "x"),
@@ -185,7 +241,6 @@ def test_default_x_y_with_crs(self):
185241
self.assert_level(target_dir.uri + "/3.zarr", 3, has_crs=True)
186242

187243
def test_default_lon_lat_no_crs(self):
188-
source_path = "memory://source.zarr"
189244
make_test_dataset(
190245
uri=source_path,
191246
dims=("time", "lat", "lon"),
@@ -219,6 +274,78 @@ def test_default_lon_lat_no_crs(self):
219274
self.assert_level(target_dir.uri + "/2.zarr", 2, xy_dims=xy_dims)
220275
self.assert_level(target_dir.uri + "/3.zarr", 3, xy_dims=xy_dims)
221276

277+
def test_default_lon_lat_no_crs_from_source_ds(self):
278+
source_ds = make_test_dataset(
279+
uri=source_path,
280+
dims=("time", "lat", "lon"),
281+
shape=(3, 1024, 2048),
282+
chunks=(1, 128, 256),
283+
)
284+
285+
target_dir = FileObj("memory://target.levels")
286+
self.assertFalse(target_dir.exists())
287+
288+
write_levels(source_ds=source_ds, target_path=target_dir.uri)
289+
290+
self.assertTrue(target_dir.exists())
291+
292+
levels_file = target_dir.for_path(".zlevels")
293+
self.assertTrue(levels_file.exists())
294+
levels_info = json.loads(levels_file.read())
295+
self.assertEqual(
296+
{
297+
"version": "1.0",
298+
"num_levels": 4,
299+
"agg_methods": {"chl": "mean", "tsm": "mean"},
300+
"use_saved_levels": False,
301+
},
302+
levels_info,
303+
)
304+
305+
xy_dims = "lon", "lat"
306+
self.assert_level(target_dir.uri + "/0.zarr", 0, xy_dims=xy_dims)
307+
self.assert_level(target_dir.uri + "/1.zarr", 1, xy_dims=xy_dims)
308+
self.assert_level(target_dir.uri + "/2.zarr", 2, xy_dims=xy_dims)
309+
self.assert_level(target_dir.uri + "/3.zarr", 3, xy_dims=xy_dims)
310+
311+
def test_default_lon_lat_no_crs_from_source_ds_with_offset(self):
312+
source_ds = make_test_dataset(
313+
uri=source_path,
314+
dims=("time", "lat", "lon"),
315+
shape=(10, 1024, 2048),
316+
chunks=(1, 128, 256),
317+
)
318+
319+
target_dir = FileObj("memory://target.levels")
320+
self.assertFalse(target_dir.exists())
321+
322+
write_levels(
323+
source_ds=source_ds,
324+
source_append_offset=7,
325+
target_path=target_dir.uri,
326+
)
327+
328+
self.assertTrue(target_dir.exists())
329+
330+
levels_file = target_dir.for_path(".zlevels")
331+
self.assertTrue(levels_file.exists())
332+
levels_info = json.loads(levels_file.read())
333+
self.assertEqual(
334+
{
335+
"version": "1.0",
336+
"num_levels": 4,
337+
"agg_methods": {"chl": "mean", "tsm": "mean"},
338+
"use_saved_levels": False,
339+
},
340+
levels_info,
341+
)
342+
343+
xy_dims = "lon", "lat"
344+
self.assert_level(target_dir.uri + "/0.zarr", 0, xy_dims=xy_dims)
345+
self.assert_level(target_dir.uri + "/1.zarr", 1, xy_dims=xy_dims)
346+
self.assert_level(target_dir.uri + "/2.zarr", 2, xy_dims=xy_dims)
347+
self.assert_level(target_dir.uri + "/3.zarr", 3, xy_dims=xy_dims)
348+
222349
def test_link_level_zero(self):
223350
source_dir = FileObj("memory://source.zarr")
224351
make_test_dataset(

zappend/contrib/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Copyright © 2024 Norman Fomferra and contributors
2+
# Permissions are hereby granted under the terms of the MIT License:
3+
# https://opensource.org/licenses/MIT.
4+
5+
from .levels import write_levels
Lines changed: 65 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818

1919
def write_levels(
20-
source_path: str,
20+
*,
21+
source_ds: xr.Dataset | None = None,
22+
source_path: str | None = None,
2123
source_storage_options: dict[str, Any] | None = None,
24+
source_append_offset: int | None = None,
2225
target_path: str | None = None,
2326
num_levels: int | None = None,
2427
tile_size: tuple[int, int] | None = None,
@@ -28,12 +31,13 @@ def write_levels(
2831
xy_dim_names: tuple[str, str] | None = None,
2932
**zappend_config,
3033
):
31-
"""Writes a dataset at `source_path` to `target_path` using the
34+
"""Writes a dataset given by `source_ds` or `source_path` to `target_path`
35+
using the
3236
[multi-level dataset format](https://xcube.readthedocs.io/en/latest/mldatasets.html)
3337
as specified by
3438
[xcube](https://github.com/xcube-dev/xcube).
3539
36-
It resembles the `store.write_data(cube, "<name>.levels", ...)` method
40+
It resembles the `store.write_data(dataset, "<name>.levels", ...)` method
3741
provided by the xcube filesystem data stores ("file", "s3", "memory", etc.).
3842
The zappend version may be used for potentially very large datasets in terms
3943
of dimension sizes or for datasets with very large number of chunks.
@@ -58,9 +62,17 @@ def write_levels(
5862
Important: This function requires the `xcube` package to be installed.
5963
6064
Args:
65+
source_ds: The source dataset.
66+
Must be given in case `source_path` is not given.
6167
source_path: The source dataset path.
68+
If `source_ds` is provided and `link_level_zero` is true,
69+
then `source_path` must also be provided in order
70+
to determine the path of the level zero source.
6271
source_storage_options: Storage options for the source
6372
dataset's filesystem.
73+
source_append_offset: Optional offset in the append dimension.
74+
Only slices with indexes greater or equal the offset are
75+
appended.
6476
target_path: The target multi-level dataset path.
6577
Filename extension should be `.levels`, by convention.
6678
If not given, `target_dir` should be passed as part of the
@@ -102,14 +114,15 @@ def write_levels(
102114
from xcube.core.tilingscheme import get_num_levels
103115
from xcube.util.fspath import get_fs_path_class
104116

117+
config = zappend_config.pop("config", None)
118+
if config is not None:
119+
raise TypeError("write_levels() got an unexpected keyword argument 'config'")
120+
105121
dry_run = zappend_config.pop("dry_run", False)
106122

107123
if dry_run and use_saved_levels:
108124
warnings.warn(f"'use_saved_levels' argument is not applicable if dry_run=True")
109125
use_saved_levels = False
110-
config = zappend_config.pop("config", None)
111-
if config is not None:
112-
raise TypeError("write_levels() got an unexpected keyword argument 'config'")
113126

114127
target_dir = zappend_config.pop("target_dir", None)
115128
if not target_path and not target_dir:
@@ -124,16 +137,49 @@ def write_levels(
124137
target_path, **target_storage_options
125138
)
126139

127-
source_fs, source_root = fsspec.core.url_to_fs(
128-
source_path,
129-
**(
130-
source_storage_options
131-
if source_storage_options is not None
132-
else target_storage_options
133-
),
134-
)
135-
source_store = source_fs.get_mapper(root=source_root)
136-
source_ds = xr.open_zarr(source_store)
140+
force_new = zappend_config.pop("force_new", None)
141+
142+
if source_path is not None:
143+
source_fs, source_root = fsspec.core.url_to_fs(
144+
source_path,
145+
**(
146+
source_storage_options
147+
if source_storage_options is not None
148+
else target_storage_options
149+
),
150+
)
151+
if source_ds is None:
152+
source_store = source_fs.get_mapper(root=source_root)
153+
source_ds = xr.open_zarr(source_store)
154+
else:
155+
source_root = None
156+
if not isinstance(source_ds, xr.Dataset):
157+
raise TypeError(
158+
f"'source_ds' argument must be of type 'xarray.Dataset',"
159+
f" but was {type(source_ds).__name__!r}"
160+
)
161+
if link_level_zero:
162+
raise ValueError(
163+
f"'source_path' argument must be provided"
164+
f" if 'link_level_zero' is used"
165+
)
166+
167+
append_dim = zappend_config.pop("append_dim", "time")
168+
append_coord = source_ds.coords[append_dim]
169+
170+
if source_append_offset is None:
171+
source_append_offset = 0
172+
elif not isinstance(source_append_offset, int):
173+
raise TypeError(
174+
f"'source_append_offset' argument must be of type 'int',"
175+
f" but was {type(source_append_offset).__name__!r}"
176+
)
177+
if not (0 <= source_append_offset < append_coord.size):
178+
raise ValueError(
179+
f"'source_append_offset' argument"
180+
f" must be >=0 and <{append_coord.size},"
181+
f" but was {source_append_offset}"
182+
)
137183

138184
logger = logging.getLogger("zappend")
139185

@@ -157,11 +203,6 @@ def write_levels(
157203
agg_methods=agg_methods,
158204
)
159205

160-
force_new = zappend_config.pop("force_new", None)
161-
162-
append_dim = zappend_config.pop("append_dim", "time")
163-
append_coord = source_ds.coords[append_dim]
164-
165206
variables = get_variables_config(
166207
source_ds,
167208
{
@@ -207,9 +248,10 @@ def write_levels(
207248

208249
subsample_dataset_kwargs = dict(xy_dim_names=xy_dim_names, agg_methods=agg_methods)
209250

210-
num_slices = append_coord.size
251+
num_slices = append_coord.size - source_append_offset
211252
for slice_index in range(num_slices):
212-
slice_ds_indexer = {append_dim: slice(slice_index, slice_index + 1)}
253+
append_index = source_append_offset + slice_index
254+
slice_ds_indexer = {append_dim: slice(append_index, append_index + 1)}
213255
slice_ds = source_ds.isel(slice_ds_indexer)
214256

215257
for level_index in range(num_levels):

0 commit comments

Comments
 (0)