Skip to content

Commit d65ece7

Browse files
committed
Refactor external storage to use fsspec for unified backend
- Add fsspec>=2023.1.0 as core dependency - Add optional dependencies for cloud backends (s3fs, gcsfs, adlfs) - Create new storage.py module with StorageBackend class - Unified interface for file, S3, GCS, and Azure storage - Methods: put_file, get_file, put_buffer, get_buffer, exists, remove - Refactor ExternalTable to use StorageBackend instead of protocol-specific code - Replace _upload_file, _download_file, etc. with storage backend calls - Add storage property, deprecate s3 property - Update settings.py to support GCS and Azure protocols - Add deprecation warning to s3.py Folder class - Module kept for backward compatibility - Will be removed in future version This lays the foundation for the new object type which will also use fsspec.
1 parent 38844f1 commit d65ece7

File tree

5 files changed

+381
-53
lines changed

5 files changed

+381
-53
lines changed

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ dependencies = [
1717
"networkx",
1818
"pydot",
1919
"minio>=7.0.0",
20+
"fsspec>=2023.1.0",
2021
"matplotlib",
2122
"faker",
2223
"urllib3",
2324
"setuptools",
2425
"pydantic-settings>=2.0.0",
2526
]
27+
2628
requires-python = ">=3.10,<3.14"
2729
authors = [
2830
{name = "Dimitri Yatsenko", email = "[email protected]"},
@@ -90,6 +92,9 @@ test = [
9092
]
9193

9294
[project.optional-dependencies]
95+
s3 = ["s3fs>=2023.1.0"]
96+
gcs = ["gcsfs>=2023.1.0"]
97+
azure = ["adlfs>=2023.1.0"]
9398
dev = [
9499
"pre-commit",
95100
"ruff",

src/datajoint/external.py

Lines changed: 42 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
import logging
2+
import warnings
23
from collections.abc import Mapping
34
from pathlib import Path, PurePosixPath, PureWindowsPath
45

56
from tqdm import tqdm
67

7-
from . import errors, s3
8+
from . import errors
89
from .declare import EXTERNAL_TABLE_ROOT
910
from .errors import DataJointError, MissingExternalFile
1011
from .hash import uuid_from_buffer, uuid_from_file
1112
from .heading import Heading
1213
from .settings import config
14+
from .storage import StorageBackend
1315
from .table import FreeTable, Table
1416
from .utils import safe_copy, safe_write
1517

@@ -38,7 +40,7 @@ class ExternalTable(Table):
3840
def __init__(self, connection, store, database):
3941
self.store = store
4042
self.spec = config.get_store_spec(store)
41-
self._s3 = None
43+
self._storage = None
4244
self.database = database
4345
self._connection = connection
4446
self._heading = Heading(
@@ -52,9 +54,8 @@ def __init__(self, connection, store, database):
5254
self._support = [self.full_table_name]
5355
if not self.is_declared:
5456
self.declare()
55-
self._s3 = None
56-
if self.spec["protocol"] == "file" and not Path(self.spec["location"]).is_dir():
57-
raise FileNotFoundError("Inaccessible local directory %s" % self.spec["location"]) from None
57+
# Initialize storage backend (validates configuration)
58+
_ = self.storage
5859

5960
@property
6061
def definition(self):
@@ -73,17 +74,32 @@ def definition(self):
7374
def table_name(self):
7475
return f"{EXTERNAL_TABLE_ROOT}_{self.store}"
7576

77+
@property
78+
def storage(self) -> StorageBackend:
79+
"""Get or create the storage backend instance."""
80+
if self._storage is None:
81+
self._storage = StorageBackend(self.spec)
82+
return self._storage
83+
7684
@property
7785
def s3(self):
78-
if self._s3 is None:
79-
self._s3 = s3.Folder(**self.spec)
80-
return self._s3
86+
"""Deprecated: Use storage property instead."""
87+
warnings.warn(
88+
"ExternalTable.s3 is deprecated. Use ExternalTable.storage instead.",
89+
DeprecationWarning,
90+
stacklevel=2,
91+
)
92+
# For backward compatibility, return a legacy s3.Folder if needed
93+
from . import s3
94+
if not hasattr(self, "_s3_legacy") or self._s3_legacy is None:
95+
self._s3_legacy = s3.Folder(**self.spec)
96+
return self._s3_legacy
8197

8298
# - low-level operations - private
8399

84100
def _make_external_filepath(self, relative_filepath):
85101
"""resolve the complete external path based on the relative path"""
86-
# Strip root
102+
# Strip root for S3 paths
87103
if self.spec["protocol"] == "s3":
88104
posix_path = PurePosixPath(PureWindowsPath(self.spec["location"]))
89105
location_path = (
@@ -92,11 +108,13 @@ def _make_external_filepath(self, relative_filepath):
92108
else Path(posix_path)
93109
)
94110
return PurePosixPath(location_path, relative_filepath)
95-
# Preserve root
111+
# Preserve root for local filesystem
96112
elif self.spec["protocol"] == "file":
97113
return PurePosixPath(Path(self.spec["location"]), relative_filepath)
98114
else:
99-
assert False
115+
# For other protocols (gcs, azure, etc.), treat like S3
116+
location = self.spec.get("location", "")
117+
return PurePosixPath(location, relative_filepath) if location else PurePosixPath(relative_filepath)
100118

101119
def _make_uuid_path(self, uuid, suffix=""):
102120
"""create external path based on the uuid hash"""
@@ -109,57 +127,32 @@ def _make_uuid_path(self, uuid, suffix=""):
109127
)
110128

111129
def _upload_file(self, local_path, external_path, metadata=None):
112-
if self.spec["protocol"] == "s3":
113-
self.s3.fput(local_path, external_path, metadata)
114-
elif self.spec["protocol"] == "file":
115-
safe_copy(local_path, external_path, overwrite=True)
116-
else:
117-
assert False
130+
"""Upload a file to external storage using fsspec backend."""
131+
self.storage.put_file(local_path, external_path, metadata)
118132

119133
def _download_file(self, external_path, download_path):
120-
if self.spec["protocol"] == "s3":
121-
self.s3.fget(external_path, download_path)
122-
elif self.spec["protocol"] == "file":
123-
safe_copy(external_path, download_path)
124-
else:
125-
assert False
134+
"""Download a file from external storage using fsspec backend."""
135+
self.storage.get_file(external_path, download_path)
126136

127137
def _upload_buffer(self, buffer, external_path):
128-
if self.spec["protocol"] == "s3":
129-
self.s3.put(external_path, buffer)
130-
elif self.spec["protocol"] == "file":
131-
safe_write(external_path, buffer)
132-
else:
133-
assert False
138+
"""Upload bytes to external storage using fsspec backend."""
139+
self.storage.put_buffer(buffer, external_path)
134140

135141
def _download_buffer(self, external_path):
136-
if self.spec["protocol"] == "s3":
137-
return self.s3.get(external_path)
138-
if self.spec["protocol"] == "file":
139-
try:
140-
return Path(external_path).read_bytes()
141-
except FileNotFoundError:
142-
raise errors.MissingExternalFile(f"Missing external file {external_path}") from None
143-
assert False
142+
"""Download bytes from external storage using fsspec backend."""
143+
return self.storage.get_buffer(external_path)
144144

145145
def _remove_external_file(self, external_path):
146-
if self.spec["protocol"] == "s3":
147-
self.s3.remove_object(external_path)
148-
elif self.spec["protocol"] == "file":
149-
try:
150-
Path(external_path).unlink()
151-
except FileNotFoundError:
152-
pass
146+
"""Remove a file from external storage using fsspec backend."""
147+
self.storage.remove(external_path)
153148

154149
def exists(self, external_filepath):
155150
"""
151+
Check if an external file is accessible using fsspec backend.
152+
156153
:return: True if the external file is accessible
157154
"""
158-
if self.spec["protocol"] == "s3":
159-
return self.s3.exists(external_filepath)
160-
if self.spec["protocol"] == "file":
161-
return Path(external_filepath).is_file()
162-
assert False
155+
return self.storage.exists(external_filepath)
163156

164157
# --- BLOBS ----
165158

src/datajoint/s3.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
11
"""
2-
AWS S3 operations
2+
AWS S3 operations using minio client.
3+
4+
.. deprecated:: 0.15.0
5+
This module is deprecated. Use :mod:`datajoint.storage` with fsspec backend instead.
6+
The minio-based S3 client will be removed in a future version.
7+
8+
Migration guide:
9+
- Instead of importing from datajoint.s3, use datajoint.storage.StorageBackend
10+
- StorageBackend provides a unified interface for all storage protocols
11+
- See datajoint.storage module for details
312
"""
413

514
import logging
615
import uuid
16+
import warnings
717
from io import BytesIO
818
from pathlib import Path
919

@@ -17,7 +27,10 @@
1727

1828
class Folder:
1929
"""
20-
A Folder instance manipulates a flat folder of objects within an S3-compatible object store
30+
A Folder instance manipulates a flat folder of objects within an S3-compatible object store.
31+
32+
.. deprecated:: 0.15.0
33+
Use :class:`datajoint.storage.StorageBackend` instead.
2134
"""
2235

2336
def __init__(
@@ -31,6 +44,12 @@ def __init__(
3144
proxy_server=None,
3245
**_,
3346
):
47+
warnings.warn(
48+
"datajoint.s3.Folder is deprecated and will be removed in a future version. "
49+
"Use datajoint.storage.StorageBackend with fsspec instead.",
50+
DeprecationWarning,
51+
stacklevel=2,
52+
)
3453
# from https://docs.min.io/docs/python-client-api-reference
3554
self.client = minio.Minio(
3655
endpoint,

src/datajoint/settings.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,13 +275,19 @@ def get_store_spec(self, store: str) -> dict[str, Any]:
275275

276276
# Validate protocol
277277
protocol = spec.get("protocol", "").lower()
278-
if protocol not in ("file", "s3"):
279-
raise DataJointError(f'Missing or invalid protocol in config.stores["{store}"]')
278+
supported_protocols = ("file", "s3", "gcs", "azure")
279+
if protocol not in supported_protocols:
280+
raise DataJointError(
281+
f'Missing or invalid protocol in config.stores["{store}"]. '
282+
f'Supported protocols: {", ".join(supported_protocols)}'
283+
)
280284

281285
# Define required and allowed keys by protocol
282286
required_keys: dict[str, tuple[str, ...]] = {
283287
"file": ("protocol", "location"),
284288
"s3": ("protocol", "endpoint", "bucket", "access_key", "secret_key", "location"),
289+
"gcs": ("protocol", "bucket", "location"),
290+
"azure": ("protocol", "container", "location"),
285291
}
286292
allowed_keys: dict[str, tuple[str, ...]] = {
287293
"file": ("protocol", "location", "subfolding", "stage"),
@@ -297,6 +303,25 @@ def get_store_spec(self, store: str) -> dict[str, Any]:
297303
"stage",
298304
"proxy_server",
299305
),
306+
"gcs": (
307+
"protocol",
308+
"bucket",
309+
"location",
310+
"token",
311+
"project",
312+
"subfolding",
313+
"stage",
314+
),
315+
"azure": (
316+
"protocol",
317+
"container",
318+
"location",
319+
"account_name",
320+
"account_key",
321+
"connection_string",
322+
"subfolding",
323+
"stage",
324+
),
300325
}
301326

302327
# Check required keys

0 commit comments

Comments
 (0)