Skip to content

Commit 161256e

Browse files
Copilotbnlawrence
andcommitted
feat: add filesystem keyword argument to cfdm.read (refs cf-python#931)
Co-authored-by: bnlawrence <1792815+bnlawrence@users.noreply.github.com>
1 parent 9698c25 commit 161256e

File tree

4 files changed

+164
-41
lines changed

4 files changed

+164
-41
lines changed

cfdm/docstring/docstring.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,21 @@
368368
*Example:*
369369
``('netCDF4', 'h5netcdf-pyfive', 'netcdf_file',
370370
'h5netcdf-h5py')``""",
371+
# read filesystem
372+
"{{read filesystem: optional}}": """filesystem: optional
373+
A pre-authenticated filesystem object (for example an
374+
``fsspec`` filesystem instance) to use for opening the
375+
dataset. When provided, *datasets* values are treated as
376+
paths understood by *filesystem*, and local string
377+
pre-processing (tilde/variable expansion, globbing and
378+
directory walking) is bypassed. The file is opened by
379+
calling ``filesystem.open(dataset, "rb")``, which returns
380+
a file-like object that is passed to the netCDF backend.
381+
382+
If `None` (the default) then the existing file-opening
383+
logic is used.
384+
385+
.. versionadded:: (cfdm) NEXTVERSION""",
371386
# read storage_options
372387
"{{read storage_options: `dict` or `None`, optional}}": """storage_options: `dict` or `None`, optional
373388
Pass parameters to the backend file system driver, such as

cfdm/read_write/netcdf/netcdfread.py

Lines changed: 68 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -544,35 +544,43 @@ def dataset_open(self, dataset, flatten=True, verbose=None):
544544

545545
g["cdl_filename"] = cdl_filename
546546

547-
u = urisplit(dataset)
548-
storage_options = self._get_storage_options(dataset, u)
549-
550-
if u.scheme == "s3":
547+
filesystem = g.get("filesystem")
548+
if filesystem is not None:
551549
# --------------------------------------------------------
552-
# A file in an S3 object store
550+
# A pre-authenticated filesystem was provided: open the
551+
# dataset as a file-like object and pass it to the backend.
553552
# --------------------------------------------------------
554-
from dask.base import tokenize
555-
556-
# Create an openable S3 file object
557-
fs_key = tokenize(("s3", storage_options))
558-
file_systems = g["file_systems"]
559-
file_system = file_systems.get(fs_key)
560-
if file_system is None:
561-
# An S3 file system with these options does not exist,
562-
# so create one.
563-
from s3fs import S3FileSystem
564-
565-
file_system = S3FileSystem(**storage_options)
566-
file_systems[fs_key] = file_system
567-
568-
# Reset 'dataset' to an s3fs.File object that can be
569-
# passed to the netCDF backend
570-
dataset = file_system.open(u.path[1:], "rb")
571-
572-
if is_log_level_detail(logger):
573-
logger.detail(
574-
f" S3: s3fs.S3FileSystem options: {storage_options}\n"
575-
) # pragma: no cover
553+
dataset = filesystem.open(dataset, "rb")
554+
else:
555+
u = urisplit(dataset)
556+
storage_options = self._get_storage_options(dataset, u)
557+
558+
if u.scheme == "s3":
559+
# --------------------------------------------------------
560+
# A file in an S3 object store
561+
# --------------------------------------------------------
562+
from dask.base import tokenize
563+
564+
# Create an openable S3 file object
565+
fs_key = tokenize(("s3", storage_options))
566+
file_systems = g["file_systems"]
567+
file_system = file_systems.get(fs_key)
568+
if file_system is None:
569+
# An S3 file system with these options does not exist,
570+
# so create one.
571+
from s3fs import S3FileSystem
572+
573+
file_system = S3FileSystem(**storage_options)
574+
file_systems[fs_key] = file_system
575+
576+
# Reset 'dataset' to an s3fs.File object that can be
577+
# passed to the netCDF backend
578+
dataset = file_system.open(u.path[1:], "rb")
579+
580+
if is_log_level_detail(logger):
581+
logger.detail(
582+
f" S3: s3fs.S3FileSystem options: {storage_options}\n"
583+
) # pragma: no cover
576584

577585
# Map backend names to dataset-open functions
578586
dataset_open_function = {
@@ -1015,6 +1023,7 @@ def read(
10151023
warn_valid=False,
10161024
domain=False,
10171025
storage_options=None,
1026+
filesystem=None,
10181027
_file_systems=None,
10191028
netcdf_backend=None,
10201029
cache=True,
@@ -1085,6 +1094,11 @@ def read(
10851094
10861095
.. versionadded:: (cfdm) 1.11.2.0
10871096
1097+
filesystem: optional
1098+
See `cfdm.read` for details.
1099+
1100+
.. versionadded:: (cfdm) NEXTVERSION
1101+
10881102
netcdf_backend: `None` or `str`, optional
10891103
See `cfdm.read` for details.
10901104
@@ -1229,22 +1243,33 @@ def read(
12291243
# Note that the `dataset_type` method is much faster than the
12301244
# `dataset_open` method at returning for unrecognised types.
12311245
# ------------------------------------------------------------
1232-
d_type = self.dataset_type(dataset, dataset_type)
1233-
if not d_type:
1234-
# Can't interpret the dataset as a recognised type, so
1235-
# either raise an exception or return an empty list.
1236-
if dataset_type is None:
1237-
raise DatasetTypeError(
1238-
f"Can't interpret {dataset} as a dataset of one of the "
1239-
f"valid types: {valid_dataset_types!r}"
1240-
)
1246+
if filesystem is not None:
1247+
# When a pre-authenticated filesystem is provided we cannot
1248+
# inspect the file locally, so we trust the caller. Use
1249+
# the explicitly requested dataset_type if given, otherwise
1250+
# default to 'netCDF'.
1251+
if dataset_type is not None and "netCDF" not in dataset_type:
1252+
# The caller explicitly excluded netCDF; nothing to do.
1253+
return []
12411254

1242-
return []
1255+
d_type = "netCDF"
1256+
else:
1257+
d_type = self.dataset_type(dataset, dataset_type)
1258+
if not d_type:
1259+
# Can't interpret the dataset as a recognised type, so
1260+
# either raise an exception or return an empty list.
1261+
if dataset_type is None:
1262+
raise DatasetTypeError(
1263+
f"Can't interpret {dataset} as a dataset of one of the "
1264+
f"valid types: {valid_dataset_types!r}"
1265+
)
12431266

1244-
# Can interpret the dataset as a recognised type, but return
1245-
# an empty list if that type has been exlcuded.
1246-
if dataset_type is not None and d_type not in dataset_type:
1247-
return []
1267+
return []
1268+
1269+
# Can interpret the dataset as a recognised type, but return
1270+
# an empty list if that type has been exlcuded.
1271+
if dataset_type is not None and d_type not in dataset_type:
1272+
return []
12481273

12491274
# ------------------------------------------------------------
12501275
# Parse the 'netcdf_backend' keyword parameter
@@ -1532,6 +1557,8 @@ def read(
15321557
"file_system_storage_options": {},
15331558
# Cached s3fs.S3FileSystem objects
15341559
"file_systems": _file_systems,
1560+
# Pre-authenticated filesystem object (e.g. fsspec)
1561+
"filesystem": filesystem,
15351562
# --------------------------------------------------------
15361563
# Array element caching
15371564
# --------------------------------------------------------

cfdm/read_write/read.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ class read(ReadWrite):
154154
155155
.. versionadded:: (cfdm) 1.11.2.0
156156
157+
{{read filesystem: optional}}
158+
159+
.. versionadded:: (cfdm) NEXTVERSION
160+
157161
{{read storage_options: `dict` or `None`, optional}}
158162
159163
.. versionadded:: (cfdm) 1.11.2.0
@@ -243,6 +247,7 @@ def __new__(
243247
domain=False,
244248
netcdf_backend=None,
245249
storage_options=None,
250+
filesystem=None,
246251
cache=True,
247252
dask_chunks="storage-aligned",
248253
store_dataset_chunks=True,
@@ -344,6 +349,14 @@ def _datasets(self):
344349
followlinks = kwargs.get("followlinks", False)
345350

346351
datasets = self._flat(kwargs["datasets"])
352+
353+
# If a filesystem object is provided, treat each dataset path
354+
# as-is (no local glob/walk/expansion) and yield directly.
355+
if kwargs.get("filesystem") is not None:
356+
for dataset1 in datasets:
357+
yield dataset1
358+
return
359+
347360
if kwargs["cdl_string"]:
348361
# Return CDL strings as they are
349362
for dataset1 in datasets:
@@ -580,6 +593,7 @@ def _read(self, dataset):
580593
"unpack",
581594
"domain",
582595
"storage_options",
596+
"filesystem",
583597
"netcdf_backend",
584598
"cache",
585599
"dask_chunks",

cfdm/test/test_read_write.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1541,6 +1541,73 @@ def test_read_zarr_and_non_zarr(self):
15411541
self.assertEqual(len(f), 5)
15421542

15431543

1544+
def test_read_filesystem(self):
1545+
"""Test cfdm.read with a pre-authenticated filesystem object."""
1546+
import io
1547+
from unittest.mock import MagicMock
1548+
1549+
f = self.f0
1550+
cfdm.write(f, tmpfile)
1551+
1552+
# ------------------------------------------------------------------
1553+
# Build a mock filesystem whose .open() returns the real file bytes
1554+
# so that the netCDF backend can parse them normally.
1555+
# ------------------------------------------------------------------
1556+
with open(tmpfile, "rb") as fh:
1557+
file_bytes = fh.read()
1558+
1559+
open_calls = []
1560+
1561+
def fake_open(path, mode="rb"):
1562+
open_calls.append((path, mode))
1563+
return io.BytesIO(file_bytes)
1564+
1565+
mock_fs = MagicMock()
1566+
mock_fs.open.side_effect = fake_open
1567+
1568+
# Read using the mock filesystem
1569+
result = cfdm.read(tmpfile, filesystem=mock_fs)
1570+
1571+
# filesystem.open() must have been called with the dataset path
1572+
self.assertTrue(len(open_calls) > 0, "filesystem.open was not called")
1573+
self.assertEqual(open_calls[0][0], tmpfile)
1574+
self.assertEqual(open_calls[0][1], "rb")
1575+
1576+
# The read result must match what we get without filesystem
1577+
expected = cfdm.read(tmpfile)
1578+
self.assertEqual(len(result), len(expected))
1579+
self.assertTrue(result[0].equals(expected[0]))
1580+
1581+
def test_read_filesystem_bypasses_glob(self):
1582+
"""Test that filesystem=... bypasses local glob expansion."""
1583+
import io
1584+
from unittest.mock import MagicMock
1585+
1586+
f = self.f0
1587+
cfdm.write(f, tmpfile)
1588+
1589+
with open(tmpfile, "rb") as fh:
1590+
file_bytes = fh.read()
1591+
1592+
yielded_datasets = []
1593+
1594+
def fake_open(path, mode="rb"):
1595+
yielded_datasets.append(path)
1596+
return io.BytesIO(file_bytes)
1597+
1598+
mock_fs = MagicMock()
1599+
mock_fs.open.side_effect = fake_open
1600+
1601+
# Pass a glob-like pattern as the dataset. Without filesystem,
1602+
# this would expand to matching local files. With filesystem, it
1603+
# must be passed through unchanged.
1604+
pattern = "/some/remote/path/*.nc"
1605+
cfdm.read(pattern, filesystem=mock_fs)
1606+
1607+
# The pattern must have been forwarded verbatim to filesystem.open()
1608+
self.assertEqual(yielded_datasets, [pattern])
1609+
1610+
15441611
if __name__ == "__main__":
15451612
print("Run date:", datetime.datetime.now())
15461613
cfdm.environment()

0 commit comments

Comments
 (0)