Skip to content

Commit 5a727d2

Browse files
committed
Add remote URL support for copy insert
- Add is_remote_url() and parse_remote_url() helpers to storage.py - Add copy_from_url() method to StorageBackend for remote-to-managed copies - Add source_exists(), source_is_directory(), get_source_size() helpers - Support s3://, gs://, az://, http://, https:// protocols - Update spec with Remote URL Support section and examples - Update object.md with "Inserting from Remote URLs" section - Update insert.md with remote URL examples - Add TestRemoteURLSupport test class
1 parent 4e90c1e commit 5a727d2

File tree

5 files changed

+380
-11
lines changed

5 files changed

+380
-11
lines changed

docs/src/design/tables/object-type-spec.md

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -584,37 +584,70 @@ Each insert stores a separate copy of the file, even if identical content was pr
584584

585585
At insert time, the `object` attribute accepts:
586586

587-
1. **File path** (string or `Path`): Path to an existing file (extension extracted)
588-
2. **Folder path** (string or `Path`): Path to an existing directory
589-
3. **Tuple of (ext, stream)**: File-like object with explicit extension
587+
1. **Local file path** (string or `Path`): Path to an existing local file (extension extracted)
588+
2. **Local folder path** (string or `Path`): Path to an existing local directory
589+
3. **Remote URL** (string): URL to remote file or folder (`s3://`, `gs://`, `az://`, `http://`, `https://`)
590+
4. **Tuple of (ext, stream)**: File-like object with explicit extension
590591

591592
```python
592-
# From file path - extension (.dat) extracted from source
593+
# From local file path - extension (.dat) extracted from source
593594
Recording.insert1({
594595
"subject_id": 123,
595596
"session_id": 45,
596597
"raw_data": "/local/path/to/recording.dat"
597598
})
598599
# Stored as: raw_data_Ax7bQ2kM.dat
599600

600-
# From folder path - no extension
601+
# From local folder path - no extension
601602
Recording.insert1({
602603
"subject_id": 123,
603604
"session_id": 45,
604605
"raw_data": "/local/path/to/data_folder/"
605606
})
606607
# Stored as: raw_data_pL9nR4wE/
607608

609+
# From remote URL - copies from source to managed storage
610+
Recording.insert1({
611+
"subject_id": 123,
612+
"session_id": 45,
613+
"raw_data": "s3://source-bucket/path/to/data.dat"
614+
})
615+
# Stored as: raw_data_kM3nP2qR.dat
616+
617+
# From remote Zarr store (e.g., collaborator data on GCS)
618+
Recording.insert1({
619+
"subject_id": 123,
620+
"session_id": 45,
621+
"neural_data": "gs://collaborator-bucket/shared/experiment.zarr"
622+
})
623+
# Copied to managed storage as: neural_data_pL9nR4wE.zarr
624+
608625
# From stream with explicit extension
609626
with open("/local/path/data.bin", "rb") as f:
610627
Recording.insert1({
611628
"subject_id": 123,
612629
"session_id": 45,
613630
"raw_data": (".bin", f)
614631
})
615-
# Stored as: raw_data_kM3nP2qR.bin
632+
# Stored as: raw_data_xY8zW3vN.bin
616633
```
617634

635+
### Remote URL Support
636+
637+
Remote URLs are detected by protocol prefix and handled via fsspec:
638+
639+
| Protocol | Example | Notes |
640+
|----------|---------|-------|
641+
| `s3://` | `s3://bucket/path/file.dat` | AWS S3, MinIO |
642+
| `gs://` | `gs://bucket/path/file.dat` | Google Cloud Storage |
643+
| `az://` | `az://container/path/file.dat` | Azure Blob Storage |
644+
| `http://` | `http://server/path/file.dat` | HTTP (read-only source) |
645+
| `https://` | `https://server/path/file.dat` | HTTPS (read-only source) |
646+
647+
**Authentication**: Remote sources may require credentials. fsspec uses standard credential discovery (environment variables, config files, IAM roles). For cross-cloud copies, ensure credentials are configured for both source and destination.
648+
649+
**Performance note**: For large remote-to-remote copies, data flows through the client. This is acceptable for most use cases but may be slow for very large datasets. Future optimizations could include server-side copy for same-provider transfers.
650+
618651
### Insert Processing Steps
619652

620653
1. Validate input (file/folder exists, stream is readable)

docs/src/design/tables/object.md

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ Note: No `@store` suffix needed—storage is determined by pipeline configuratio
8989

9090
### Inserting Files
9191

92-
Insert a file by providing its path:
92+
Insert a file by providing its local path:
9393

9494
```python
9595
Recording.insert1({
@@ -113,6 +113,37 @@ Recording.insert1({
113113
})
114114
```
115115

116+
### Inserting from Remote URLs
117+
118+
Insert from cloud storage or HTTP sources—content is copied to managed storage:
119+
120+
```python
121+
# From S3
122+
Recording.insert1({
123+
"subject_id": 123,
124+
"session_id": 45,
125+
"raw_data": "s3://source-bucket/path/to/data.dat"
126+
})
127+
128+
# From Google Cloud Storage (e.g., collaborator data)
129+
Recording.insert1({
130+
"subject_id": 123,
131+
"session_id": 45,
132+
"neural_data": "gs://collaborator-bucket/shared/experiment.zarr"
133+
})
134+
135+
# From HTTP/HTTPS
136+
Recording.insert1({
137+
"subject_id": 123,
138+
"session_id": 45,
139+
"raw_data": "https://example.com/public/data.dat"
140+
})
141+
```
142+
143+
Supported protocols: `s3://`, `gs://`, `az://`, `http://`, `https://`
144+
145+
Remote sources may require credentials configured via environment variables or fsspec configuration files.
146+
116147
### Inserting from Streams
117148

118149
Insert from a file-like object with explicit extension:

docs/src/manipulation/insert.md

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,24 +96,38 @@ phase_two.Protocol.insert(protocols)
9696
## Object attributes
9797

9898
Tables with [`object`](../design/tables/object.md) type attributes can be inserted with
99-
file paths, folder paths, or streams. The content is automatically copied to object
100-
storage.
99+
local file paths, folder paths, remote URLs, or streams. The content is automatically
100+
copied to object storage.
101101

102102
```python
103-
# Insert with file path
103+
# Insert with local file path
104104
Recording.insert1({
105105
"subject_id": 123,
106106
"session_id": 45,
107107
"raw_data": "/local/path/to/data.dat"
108108
})
109109

110-
# Insert with folder path
110+
# Insert with local folder path
111111
Recording.insert1({
112112
"subject_id": 123,
113113
"session_id": 45,
114114
"raw_data": "/local/path/to/data_folder/"
115115
})
116116

117+
# Insert from remote URL (S3, GCS, Azure, HTTP)
118+
Recording.insert1({
119+
"subject_id": 123,
120+
"session_id": 45,
121+
"raw_data": "s3://source-bucket/path/to/data.dat"
122+
})
123+
124+
# Insert remote Zarr store (e.g., from collaborator)
125+
Recording.insert1({
126+
"subject_id": 123,
127+
"session_id": 45,
128+
"neural_data": "gs://collaborator-bucket/shared/experiment.zarr"
129+
})
130+
117131
# Insert from stream with explicit extension
118132
with open("/path/to/data.bin", "rb") as f:
119133
Recording.insert1({
@@ -123,6 +137,8 @@ with open("/path/to/data.bin", "rb") as f:
123137
})
124138
```
125139

140+
Supported remote URL protocols: `s3://`, `gs://`, `az://`, `http://`, `https://`
141+
126142
### Staged inserts
127143

128144
For large objects like Zarr arrays, use `staged_insert1` to write directly to storage

src/datajoint/storage.py

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,55 @@
2222
# Characters safe for use in filenames and URLs
2323
TOKEN_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"
2424

25+
# Supported remote URL protocols for copy insert
26+
REMOTE_PROTOCOLS = ("s3://", "gs://", "gcs://", "az://", "abfs://", "http://", "https://")
27+
28+
29+
def is_remote_url(path: str) -> bool:
30+
"""
31+
Check if a path is a remote URL.
32+
33+
Args:
34+
path: Path string to check
35+
36+
Returns:
37+
True if path is a remote URL
38+
"""
39+
if not isinstance(path, str):
40+
return False
41+
return path.lower().startswith(REMOTE_PROTOCOLS)
42+
43+
44+
def parse_remote_url(url: str) -> tuple[str, str]:
45+
"""
46+
Parse a remote URL into protocol and path.
47+
48+
Args:
49+
url: Remote URL (e.g., 's3://bucket/path/file.dat')
50+
51+
Returns:
52+
Tuple of (protocol, path) where protocol is fsspec-compatible
53+
"""
54+
url_lower = url.lower()
55+
56+
# Map URL schemes to fsspec protocols
57+
protocol_map = {
58+
"s3://": "s3",
59+
"gs://": "gcs",
60+
"gcs://": "gcs",
61+
"az://": "abfs",
62+
"abfs://": "abfs",
63+
"http://": "http",
64+
"https://": "https",
65+
}
66+
67+
for prefix, protocol in protocol_map.items():
68+
if url_lower.startswith(prefix):
69+
path = url[len(prefix) :]
70+
return protocol, path
71+
72+
raise errors.DataJointError(f"Unsupported remote URL protocol: {url}")
73+
2574

2675
def generate_token(length: int = 8) -> str:
2776
"""
@@ -494,6 +543,155 @@ def get_fsmap(self, remote_path: str | PurePosixPath) -> fsspec.FSMap:
494543
full_path = self._full_path(remote_path)
495544
return fsspec.FSMap(full_path, self.fs)
496545

546+
def copy_from_url(self, source_url: str, dest_path: str | PurePosixPath) -> int:
547+
"""
548+
Copy a file from a remote URL to managed storage.
549+
550+
Args:
551+
source_url: Remote URL (s3://, gs://, http://, etc.)
552+
dest_path: Destination path in managed storage
553+
554+
Returns:
555+
Size of copied file in bytes
556+
"""
557+
protocol, source_path = parse_remote_url(source_url)
558+
full_dest = self._full_path(dest_path)
559+
560+
logger.debug(f"copy_from_url: {protocol}://{source_path} -> {self.protocol}:{full_dest}")
561+
562+
# Get source filesystem
563+
source_fs = fsspec.filesystem(protocol)
564+
565+
# Check if source is a directory
566+
if source_fs.isdir(source_path):
567+
return self._copy_folder_from_url(source_fs, source_path, dest_path)
568+
569+
# Copy single file
570+
if self.protocol == "file":
571+
# Download to local destination
572+
Path(full_dest).parent.mkdir(parents=True, exist_ok=True)
573+
source_fs.get_file(source_path, full_dest)
574+
return Path(full_dest).stat().st_size
575+
else:
576+
# Remote-to-remote copy via streaming
577+
with source_fs.open(source_path, "rb") as src:
578+
content = src.read()
579+
self.fs.pipe_file(full_dest, content)
580+
return len(content)
581+
582+
def _copy_folder_from_url(
583+
self, source_fs: fsspec.AbstractFileSystem, source_path: str, dest_path: str | PurePosixPath
584+
) -> dict:
585+
"""
586+
Copy a folder from a remote URL to managed storage.
587+
588+
Args:
589+
source_fs: Source filesystem
590+
source_path: Path in source filesystem
591+
dest_path: Destination path in managed storage
592+
593+
Returns:
594+
Manifest dict with file list, total_size, and item_count
595+
"""
596+
full_dest = self._full_path(dest_path)
597+
logger.debug(f"copy_folder_from_url: {source_path} -> {self.protocol}:{full_dest}")
598+
599+
# Collect file info for manifest
600+
files = []
601+
total_size = 0
602+
603+
# Walk source directory
604+
for root, dirs, filenames in source_fs.walk(source_path):
605+
for filename in filenames:
606+
src_file = f"{root}/{filename}" if root != source_path else f"{source_path}/{filename}"
607+
rel_path = src_file[len(source_path) :].lstrip("/")
608+
file_size = source_fs.size(src_file)
609+
files.append({"path": rel_path, "size": file_size})
610+
total_size += file_size
611+
612+
# Copy file
613+
dest_file = f"{full_dest}/{rel_path}"
614+
if self.protocol == "file":
615+
Path(dest_file).parent.mkdir(parents=True, exist_ok=True)
616+
source_fs.get_file(src_file, dest_file)
617+
else:
618+
with source_fs.open(src_file, "rb") as src:
619+
content = src.read()
620+
self.fs.pipe_file(dest_file, content)
621+
622+
# Build manifest
623+
manifest = {
624+
"files": files,
625+
"total_size": total_size,
626+
"item_count": len(files),
627+
"created": datetime.now(timezone.utc).isoformat(),
628+
}
629+
630+
# Write manifest alongside folder
631+
manifest_path = f"{dest_path}.manifest.json"
632+
self.put_buffer(json.dumps(manifest, indent=2).encode(), manifest_path)
633+
634+
return manifest
635+
636+
def source_is_directory(self, source: str) -> bool:
637+
"""
638+
Check if a source path (local or remote URL) is a directory.
639+
640+
Args:
641+
source: Local path or remote URL
642+
643+
Returns:
644+
True if source is a directory
645+
"""
646+
if is_remote_url(source):
647+
protocol, path = parse_remote_url(source)
648+
source_fs = fsspec.filesystem(protocol)
649+
return source_fs.isdir(path)
650+
else:
651+
return Path(source).is_dir()
652+
653+
def source_exists(self, source: str) -> bool:
654+
"""
655+
Check if a source path (local or remote URL) exists.
656+
657+
Args:
658+
source: Local path or remote URL
659+
660+
Returns:
661+
True if source exists
662+
"""
663+
if is_remote_url(source):
664+
protocol, path = parse_remote_url(source)
665+
source_fs = fsspec.filesystem(protocol)
666+
return source_fs.exists(path)
667+
else:
668+
return Path(source).exists()
669+
670+
def get_source_size(self, source: str) -> int | None:
671+
"""
672+
Get the size of a source file (local or remote URL).
673+
674+
Args:
675+
source: Local path or remote URL
676+
677+
Returns:
678+
Size in bytes, or None if directory or cannot determine
679+
"""
680+
try:
681+
if is_remote_url(source):
682+
protocol, path = parse_remote_url(source)
683+
source_fs = fsspec.filesystem(protocol)
684+
if source_fs.isdir(path):
685+
return None
686+
return source_fs.size(path)
687+
else:
688+
p = Path(source)
689+
if p.is_dir():
690+
return None
691+
return p.stat().st_size
692+
except Exception:
693+
return None
694+
497695

498696
STORE_METADATA_FILENAME = "datajoint_store.json"
499697

0 commit comments

Comments
 (0)