Skip to content

Commit 5ed7329

Browse files
committed
Implement multi-store support for object type
- Add object@store_name syntax in table definitions (declare.py) - Extract store name from attribute type in heading.py - Add url and store fields to ObjectRef for metadata storage - Add object_storage.stores.<name>.* configuration (settings.py) - Add get_object_store_spec() method for named store configs - Update table.py to use correct backend per attribute store - Update fetch.py to use store from JSON metadata - Rename ObjectRef.store property to fsmap to avoid conflict This enables different object attributes to use different storage backends (e.g., private vs public buckets) while maintaining OAS (Object-Augmented Schema) principles for all stores.
1 parent 260a43a commit 5ed7329

File tree

6 files changed

+195
-22
lines changed

6 files changed

+195
-22
lines changed

src/datajoint/declare.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
INTERNAL_ATTACH=r"attach$",
6666
EXTERNAL_ATTACH=r"attach@(?P<store>[a-z][\-\w]*)$",
6767
FILEPATH=r"filepath@(?P<store>[a-z][\-\w]*)$",
68-
OBJECT=r"object$", # managed object storage (files/folders)
68+
OBJECT=r"object(@(?P<store>[a-z][\-\w]*))?$", # managed object storage (files/folders)
6969
UUID=r"uuid$",
7070
ADAPTED=r"<.+>$",
7171
).items()
@@ -469,6 +469,9 @@ def substitute_special_type(match, category, foreign_key_sql, context):
469469
match["type"] = "LONGBLOB"
470470
elif category == "OBJECT":
471471
# Object type stores metadata as JSON - no foreign key to external table
472+
# Extract store name if present (object@store_name syntax)
473+
if "@" in match["type"]:
474+
match["store"] = match["type"].split("@", 1)[1]
472475
match["type"] = "JSON"
473476
elif category in EXTERNAL_TYPES:
474477
if category == "FILEPATH" and not _support_filepath_types():

src/datajoint/fetch.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ def _get(connection, attr, data, squeeze, download_path):
5353
if attr.is_object:
5454
# Object type - return ObjectRef handle
5555
json_data = json.loads(data) if isinstance(data, str) else data
56+
# Get the correct backend based on store name in metadata
57+
store_name = json_data.get("store") # None for default store
5658
try:
57-
spec = config.get_object_storage_spec()
59+
spec = config.get_object_store_spec(store_name)
5860
backend = StorageBackend(spec)
5961
except DataJointError:
6062
backend = None

src/datajoint/heading.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,13 @@ def _init_from_database(self):
331331
{env} = TRUE or upgrade datajoint.
332332
""".format(env=FILEPATH_FEATURE_SWITCH)
333333
)
334+
# Extract store name for external types and object types with named stores
335+
store = None
336+
if category in EXTERNAL_TYPES:
337+
store = attr["type"].split("@")[1]
338+
elif category == "OBJECT" and "@" in attr["type"]:
339+
store = attr["type"].split("@")[1]
340+
334341
attr.update(
335342
unsupported=False,
336343
is_attachment=category in ("INTERNAL_ATTACH", "EXTERNAL_ATTACH"),
@@ -340,7 +347,7 @@ def _init_from_database(self):
340347
is_blob=category in ("INTERNAL_BLOB", "EXTERNAL_BLOB"),
341348
uuid=category == "UUID",
342349
is_external=category in EXTERNAL_TYPES,
343-
store=(attr["type"].split("@")[1] if category in EXTERNAL_TYPES else None),
350+
store=store,
344351
)
345352

346353
if attr["in_key"] and any(

src/datajoint/objectref.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ class ObjectRef:
3434
from the storage backend.
3535
3636
Attributes:
37-
path: Full path/key within storage backend (includes token)
37+
path: Relative path within the store (includes token)
38+
url: Full URI to the object (e.g., 's3://bucket/path/to/object.dat')
39+
store: Store name (None for default store)
3840
size: Total size in bytes (sum for folders), or None if not computed.
3941
For large hierarchical data like Zarr stores, size computation can
4042
be expensive and is optional.
@@ -53,6 +55,8 @@ class ObjectRef:
5355
ext: str | None
5456
is_dir: bool
5557
timestamp: datetime
58+
url: str | None = None
59+
store: str | None = None
5660
mime_type: str | None = None
5761
item_count: int | None = None
5862
_backend: StorageBackend | None = None
@@ -80,6 +84,8 @@ def from_json(cls, json_data: dict | str, backend: StorageBackend | None = None)
8084

8185
return cls(
8286
path=data["path"],
87+
url=data.get("url"),
88+
store=data.get("store"),
8389
size=data["size"],
8490
hash=data.get("hash"),
8591
ext=data.get("ext"),
@@ -105,6 +111,10 @@ def to_json(self) -> dict:
105111
"is_dir": self.is_dir,
106112
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
107113
}
114+
if self.url:
115+
data["url"] = self.url
116+
if self.store:
117+
data["store"] = self.store
108118
if self.mime_type:
109119
data["mime_type"] = self.mime_type
110120
if self.item_count is not None:
@@ -121,7 +131,9 @@ def to_dict(self) -> dict:
121131
122132
Returns:
123133
Dict containing the object metadata:
124-
- path: Storage path
134+
- path: Relative storage path within the store
135+
- url: Full URI (e.g., 's3://bucket/path') (optional)
136+
- store: Store name (optional, None for default store)
125137
- size: File/folder size in bytes (or None)
126138
- hash: Content hash (or None)
127139
- ext: File extension (or None)
@@ -152,12 +164,15 @@ def fs(self) -> fsspec.AbstractFileSystem:
152164
return self._backend.fs
153165

154166
@property
155-
def store(self) -> fsspec.FSMap:
167+
def fsmap(self) -> fsspec.FSMap:
156168
"""
157169
Return FSMap suitable for Zarr/xarray.
158170
159171
This provides a dict-like interface to the storage location,
160172
compatible with zarr.open() and xarray.open_zarr().
173+
174+
Example:
175+
>>> z = zarr.open(obj_ref.fsmap, mode='r')
161176
"""
162177
self._ensure_backend()
163178
full_path = self._backend._full_path(self.path)

src/datajoint/settings.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,9 @@ class ObjectStorageSettings(BaseSettings):
215215
partition_pattern: str | None = Field(default=None, description="Path pattern with {attribute} placeholders")
216216
token_length: int = Field(default=8, ge=4, le=16, description="Random suffix length for filenames")
217217

218+
# Named stores configuration (object_storage.stores.<name>.*)
219+
stores: dict[str, dict[str, Any]] = Field(default_factory=dict, description="Named object stores")
220+
218221

219222
class Config(BaseSettings):
220223
"""
@@ -432,6 +435,88 @@ def get_object_storage_spec(self) -> dict[str, Any]:
432435

433436
return spec
434437

438+
def get_object_store_spec(self, store_name: str | None = None) -> dict[str, Any]:
439+
"""
440+
Get validated configuration for a specific object store.
441+
442+
Args:
443+
store_name: Name of the store (None for default store)
444+
445+
Returns:
446+
Object store configuration dict
447+
448+
Raises:
449+
DataJointError: If store is not configured or has invalid config
450+
"""
451+
if store_name is None:
452+
# Return default store spec
453+
return self.get_object_storage_spec()
454+
455+
os_settings = self.object_storage
456+
457+
# Check if named store exists
458+
if store_name not in os_settings.stores:
459+
raise DataJointError(
460+
f"Object store '{store_name}' is not configured. "
461+
f"Add object_storage.stores.{store_name}.* settings to datajoint.json"
462+
)
463+
464+
store_config = os_settings.stores[store_name]
465+
protocol = store_config.get("protocol", "").lower()
466+
467+
supported_protocols = ("file", "s3", "gcs", "azure")
468+
if protocol not in supported_protocols:
469+
raise DataJointError(
470+
f"Invalid protocol for store '{store_name}': {protocol}. "
471+
f'Supported protocols: {", ".join(supported_protocols)}'
472+
)
473+
474+
# Use project_name from default config if not specified in store
475+
project_name = store_config.get("project_name") or os_settings.project_name
476+
if not project_name:
477+
raise DataJointError(
478+
f"project_name is required for object store '{store_name}'. "
479+
"Set object_storage.project_name or object_storage.stores.{store_name}.project_name"
480+
)
481+
482+
# Build spec dict
483+
spec = {
484+
"project_name": project_name,
485+
"protocol": protocol,
486+
"location": store_config.get("location", ""),
487+
"partition_pattern": store_config.get("partition_pattern") or os_settings.partition_pattern,
488+
"token_length": store_config.get("token_length") or os_settings.token_length,
489+
"store_name": store_name,
490+
}
491+
492+
# Add protocol-specific settings
493+
if protocol == "s3":
494+
endpoint = store_config.get("endpoint")
495+
bucket = store_config.get("bucket")
496+
if not endpoint or not bucket:
497+
raise DataJointError(f"endpoint and bucket are required for S3 store '{store_name}'")
498+
spec.update(
499+
{
500+
"endpoint": endpoint,
501+
"bucket": bucket,
502+
"access_key": store_config.get("access_key"),
503+
"secret_key": store_config.get("secret_key"),
504+
"secure": store_config.get("secure", True),
505+
}
506+
)
507+
elif protocol == "gcs":
508+
bucket = store_config.get("bucket")
509+
if not bucket:
510+
raise DataJointError(f"bucket is required for GCS store '{store_name}'")
511+
spec["bucket"] = bucket
512+
elif protocol == "azure":
513+
container = store_config.get("container")
514+
if not container:
515+
raise DataJointError(f"container is required for Azure store '{store_name}'")
516+
spec["container"] = container
517+
518+
return spec
519+
435520
def load(self, filename: str | Path) -> None:
436521
"""
437522
Load settings from a JSON file.
@@ -464,6 +549,13 @@ def _update_from_flat_dict(self, data: dict[str, Any]) -> None:
464549
group_obj = getattr(self, group)
465550
if hasattr(group_obj, attr):
466551
setattr(group_obj, attr, value)
552+
elif len(parts) == 4:
553+
# Handle object_storage.stores.<name>.<attr> pattern
554+
group, subgroup, store_name, attr = parts
555+
if group == "object_storage" and subgroup == "stores":
556+
if store_name not in self.object_storage.stores:
557+
self.object_storage.stores[store_name] = {}
558+
self.object_storage.stores[store_name][attr] = value
467559

468560
def _load_secrets(self, secrets_dir: Path) -> None:
469561
"""Load secrets from a secrets directory."""

src/datajoint/table.py

Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -275,32 +275,49 @@ def external(self):
275275

276276
@property
277277
def object_storage(self) -> StorageBackend | None:
278-
"""Get the object storage backend for this table."""
279-
if not hasattr(self, "_object_storage"):
278+
"""Get the default object storage backend for this table."""
279+
return self.get_object_storage()
280+
281+
def get_object_storage(self, store_name: str | None = None) -> StorageBackend | None:
282+
"""
283+
Get the object storage backend for a specific store.
284+
285+
Args:
286+
store_name: Name of the store (None for default store)
287+
288+
Returns:
289+
StorageBackend instance or None if not configured
290+
"""
291+
cache_key = f"_object_storage_{store_name or 'default'}"
292+
if not hasattr(self, cache_key):
280293
try:
281-
spec = config.get_object_storage_spec()
282-
self._object_storage = StorageBackend(spec)
294+
spec = config.get_object_store_spec(store_name)
295+
backend = StorageBackend(spec)
283296
# Verify/create store metadata on first use
284-
verify_or_create_store_metadata(self._object_storage, spec)
297+
verify_or_create_store_metadata(backend, spec)
298+
setattr(self, cache_key, backend)
285299
except DataJointError:
286-
self._object_storage = None
287-
return self._object_storage
300+
setattr(self, cache_key, None)
301+
return getattr(self, cache_key)
288302

289-
def _process_object_value(self, name: str, value, row: dict) -> str:
303+
def _process_object_value(self, name: str, value, row: dict, store_name: str | None = None) -> str:
290304
"""
291305
Process an object attribute value for insert.
292306
293307
Args:
294308
name: Attribute name
295309
value: Input value (file path, folder path, or (ext, stream) tuple)
296310
row: The full row dict (needed for primary key values)
311+
store_name: Name of the object store (None for default store)
297312
298313
Returns:
299314
JSON string for database storage
300315
"""
301-
if self.object_storage is None:
316+
backend = self.get_object_storage(store_name)
317+
if backend is None:
318+
store_desc = f"'{store_name}'" if store_name else "default"
302319
raise DataJointError(
303-
"Object storage is not configured. Set object_storage settings in datajoint.json "
320+
f"Object storage ({store_desc}) is not configured. Set object_storage settings in datajoint.json "
304321
"or DJ_OBJECT_STORAGE_* environment variables."
305322
)
306323

@@ -339,7 +356,7 @@ def _process_object_value(self, name: str, value, row: dict) -> str:
339356
)
340357

341358
# Get storage spec for path building
342-
spec = config.get_object_storage_spec()
359+
spec = config.get_object_store_spec(store_name)
343360
partition_pattern = spec.get("partition_pattern")
344361
token_length = spec.get("token_length", 8)
345362
location = spec.get("location", "")
@@ -362,24 +379,33 @@ def _process_object_value(self, name: str, value, row: dict) -> str:
362379
manifest = None
363380
if source_path:
364381
if is_dir:
365-
manifest = self.object_storage.put_folder(source_path, full_storage_path)
382+
manifest = backend.put_folder(source_path, full_storage_path)
366383
size = manifest["total_size"]
367384
else:
368-
self.object_storage.put_file(source_path, full_storage_path)
385+
backend.put_file(source_path, full_storage_path)
369386
elif stream:
370-
self.object_storage.put_buffer(content, full_storage_path)
387+
backend.put_buffer(content, full_storage_path)
388+
389+
# Build full URL for the object
390+
url = self._build_object_url(spec, full_storage_path)
371391

372392
# Build JSON metadata
373393
timestamp = datetime.now(timezone.utc).isoformat()
374394
metadata = {
375-
"path": relative_path,
395+
"path": full_storage_path,
376396
"size": size,
377397
"hash": None, # Hash is optional, not computed by default
378398
"ext": ext,
379399
"is_dir": is_dir,
380400
"timestamp": timestamp,
381401
}
382402

403+
# Add URL and store name
404+
if url:
405+
metadata["url"] = url
406+
if store_name:
407+
metadata["store"] = store_name
408+
383409
# Add mime_type for files
384410
if not is_dir and ext:
385411
mime_type, _ = mimetypes.guess_type(f"file{ext}")
@@ -392,6 +418,34 @@ def _process_object_value(self, name: str, value, row: dict) -> str:
392418

393419
return json.dumps(metadata)
394420

421+
def _build_object_url(self, spec: dict, path: str) -> str | None:
422+
"""
423+
Build a full URL for an object based on the storage spec.
424+
425+
Args:
426+
spec: Storage configuration dict
427+
path: Path within the storage
428+
429+
Returns:
430+
Full URL string or None for local storage
431+
"""
432+
protocol = spec.get("protocol", "")
433+
if protocol == "s3":
434+
bucket = spec.get("bucket", "")
435+
return f"s3://{bucket}/{path}"
436+
elif protocol == "gcs":
437+
bucket = spec.get("bucket", "")
438+
return f"gs://{bucket}/{path}"
439+
elif protocol == "azure":
440+
container = spec.get("container", "")
441+
return f"az://{container}/{path}"
442+
elif protocol == "file":
443+
# For local storage, return file:// URL
444+
location = spec.get("location", "")
445+
full_path = f"{location}/{path}" if location else path
446+
return f"file://{full_path}"
447+
return None
448+
395449
def update1(self, row):
396450
"""
397451
``update1`` updates one existing entry in the table.
@@ -912,7 +966,7 @@ def __make_placeholder(self, name, value, ignore_extra_fields=False, row=None):
912966
raise DataJointError(
913967
f"Object attribute {name} requires full row context for insert. " "This is an internal error."
914968
)
915-
value = self._process_object_value(name, value, row)
969+
value = self._process_object_value(name, value, row, store_name=attr.store)
916970
elif attr.numeric:
917971
value = str(int(value) if isinstance(value, bool) else value)
918972
elif attr.json:

0 commit comments

Comments
 (0)