Skip to content

Commit e56991c

Browse files
authored
Merge pull request #32 from bcdev/forman-31-ds_closed_too_early
Fixed a problem with underlying i/o stream of a persistent slice
2 parents b485d63 + ecde84a commit e56991c

File tree

4 files changed

+66
-35
lines changed

4 files changed

+66
-35
lines changed

CHANGES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
### Fixes
1919

20+
* Fixed a problem where the underlying i/o stream of a persistent slice dataset
21+
was closed immediately after opening the dataset. [#31]
22+
2023
* Now logging ignored encodings on level DEBUG instead of WARNING because they
2124
occur very likely when processing NetCDF files.
2225

requirements-dev.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,7 @@ pytest-cov
33
black
44
flake8
55
flake8-bugbear
6+
h5netcdf
67
s3fs
8+
scipy
79
pyproj

tests/test_slice.py

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
# Copyright © 2024 Norman Fomferra
22
# Permissions are hereby granted under the terms of the MIT License:
33
# https://opensource.org/licenses/MIT.
4-
4+
import shutil
55
import unittest
6+
import warnings
67

78
import pytest
89
import xarray as xr
@@ -119,17 +120,42 @@ def test_persistent_slice_source_for_zarr(self):
119120
with slice_source as slice_ds:
120121
self.assertIsInstance(slice_ds, xr.Dataset)
121122

122-
# def test_persistent_slice_source_for_nc(self):
123-
# slice_ds = make_test_dataset()
124-
# slice_file = FileObj("memory:///slice.nc")
125-
# with slice_file.fs.open(slice_file.path, "wb") as f:
126-
# slice_ds.to_netcdf(f)
127-
# ctx = Context(dict(target_dir="memory://target.zarr",
128-
# slice_engine="scipy"))
129-
# slice_nc = open_slice_source(ctx, slice_file.uri)
130-
# self.assertIsInstance(slice_nc, PersistentSliceSource)
131-
# with slice_nc as slice_ds:
132-
# self.assertIsInstance(slice_ds, xr.Dataset)
123+
def test_persistent_slice_source_for_nc_nonlocal(self):
124+
engine = "scipy"
125+
format = "NETCDF3_CLASSIC"
126+
slice_ds = make_test_dataset()
127+
slice_file = FileObj("memory:///slice.nc")
128+
with slice_file.fs.open(slice_file.path, "wb") as stream:
129+
# noinspection PyTypeChecker
130+
slice_ds.to_netcdf(stream, engine=engine, format=format)
131+
ctx = Context(dict(target_dir="memory://target.zarr", slice_engine=engine))
132+
slice_source = open_slice_source(ctx, slice_file.uri)
133+
self.assertIsInstance(slice_source, PersistentSliceSource)
134+
try:
135+
with slice_source as slice_ds:
136+
self.assertIsInstance(slice_ds, xr.Dataset)
137+
except KeyError as e:
138+
# TODO: Find out what's going wrong in xarray.
139+
# This should not happen.
140+
warnings.warn(f"received known exception from to_netcdf(): {e}")
141+
142+
def test_persistent_slice_source_for_nc_local(self):
143+
engine = "h5netcdf"
144+
format = "NETCDF4"
145+
target_dir = FileObj("./target.zarr")
146+
ctx = Context(dict(target_dir=target_dir.path, slice_engine=engine))
147+
slice_ds = make_test_dataset()
148+
slice_file = FileObj("./slice.nc")
149+
# noinspection PyTypeChecker
150+
slice_ds.to_netcdf(slice_file.path, engine=engine, format=format)
151+
try:
152+
slice_source = open_slice_source(ctx, slice_file.uri)
153+
self.assertIsInstance(slice_source, PersistentSliceSource)
154+
with slice_source as slice_ds:
155+
self.assertIsInstance(slice_ds, xr.Dataset)
156+
finally:
157+
shutil.rmtree(target_dir.path, ignore_errors=True)
158+
slice_file.delete()
133159

134160
def test_persistent_wait_success(self):
135161
slice_dir = FileObj("memory://slice.zarr")

zappend/slice/persistent.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import time
66

7+
import fsspec.implementations.local
78
import xarray as xr
89

910
from ..context import Context
@@ -39,29 +40,24 @@ def dispose(self):
3940
super().dispose()
4041

4142
def _wait_for_slice_dataset(self) -> xr.Dataset:
42-
slice_ds: xr.Dataset | None = None
4343
interval, timeout = self.ctx.slice_polling
44-
if timeout is not None:
45-
# t0 = time.monotonic()
46-
# while (time.monotonic() - t0) < timeout:
47-
t0 = time.monotonic()
48-
while True:
49-
delta = time.monotonic() - t0
50-
if delta >= timeout:
51-
break
52-
try:
53-
slice_ds = self._open_slice_dataset()
54-
except OSError:
55-
logger.debug(
56-
f"Slice not ready or corrupt, retrying after {interval} seconds"
57-
)
58-
time.sleep(interval)
59-
else:
60-
slice_ds = self._open_slice_dataset()
44+
if timeout is None:
45+
return self._open_slice_dataset()
6146

62-
if not slice_ds:
63-
raise FileNotFoundError(self._slice_file.uri)
64-
return slice_ds
47+
# t0 = time.monotonic()
48+
# while (time.monotonic() - t0) < timeout:
49+
t0 = time.monotonic()
50+
while True:
51+
delta = time.monotonic() - t0
52+
if delta >= timeout:
53+
raise FileNotFoundError(self._slice_file.uri)
54+
try:
55+
return self._open_slice_dataset()
56+
except OSError:
57+
logger.debug(
58+
f"Slice not ready or corrupt, retrying after {interval} seconds"
59+
)
60+
time.sleep(interval)
6561

6662
def _open_slice_dataset(self) -> xr.Dataset:
6763
engine = self.ctx.slice_engine
@@ -74,5 +70,9 @@ def _open_slice_dataset(self) -> xr.Dataset:
7470
storage_options = self.ctx.slice_storage_options
7571
return xr.open_zarr(self._slice_file.uri, storage_options=storage_options)
7672

77-
with self._slice_file.fs.open(self._slice_file.path, "rb") as f:
78-
return xr.open_dataset(f, engine=engine)
73+
fs = self._slice_file.fs
74+
if isinstance(fs, fsspec.implementations.local.LocalFileSystem):
75+
return xr.open_dataset(self._slice_file.path, engine=engine)
76+
77+
fo = fs.open(self._slice_file.path, "rb")
78+
return xr.open_dataset(fo, engine=engine)

0 commit comments

Comments
 (0)