Skip to content

Commit 997d992

Browse files
committed
Finalize staged_insert1 API for direct object storage writes
- Use dedicated staged_insert1 method instead of co-opting insert1 - Add StagedInsert class with rec dict, store(), and open() methods - Document rationale for separate method (explicit, backward compatible, type safe) - Add examples for Zarr and multiple object fields - Note that staged inserts are limited to insert1 (no multi-row)
1 parent 93ce01e commit 997d992

File tree

1 file changed

+97
-33
lines changed

1 file changed

+97
-33
lines changed

docs/src/design/tables/file-type-spec.md

Lines changed: 97 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -480,56 +480,99 @@ The file/folder is copied to storage **before** the database insert is attempted
480480

481481
### Staged Insert (Direct Write Mode)
482482

483-
For large objects like Zarr arrays, copying from local storage is inefficient. **Staged insert** allows writing directly to the destination:
483+
For large objects like Zarr arrays, copying from local storage is inefficient. **Staged insert** allows writing directly to the destination.
484+
485+
#### Why a Separate Method?
486+
487+
Staged insert uses a dedicated `staged_insert1` method rather than co-opting `insert1` because:
488+
489+
1. **Explicit over implicit** - Staged inserts have fundamentally different semantics (file creation happens during context, commit on exit). A separate method makes this explicit.
490+
2. **Backward compatibility** - `insert1` returns `None` and doesn't support context manager protocol. Changing this could break existing code.
491+
3. **Clear error handling** - The context manager semantics (success = commit, exception = rollback) are obvious with `staged_insert1`.
492+
4. **Type safety** - The staged context exposes `.store()` for object fields. A dedicated method can return a properly-typed `StagedInsert` object.
493+
494+
**Staged inserts are limited to `insert1`** (one row at a time). Multi-row inserts are not supported for staged operations.
495+
496+
#### Basic Usage
484497

485498
```python
486-
# Stage an object for direct writing
487-
with Recording.stage_object(
488-
{"subject_id": 123, "session_id": 45},
489-
"raw_data",
490-
"my_array.zarr"
491-
) as staged:
492-
# Write directly to object storage (no local copy)
493-
import zarr
494-
z = zarr.open(staged.store, mode='w', shape=(10000, 10000), dtype='f4')
499+
# Stage an insert with direct object storage writes
500+
with Recording.staged_insert1 as staged:
501+
# Set primary key values
502+
staged.rec['subject_id'] = 123
503+
staged.rec['session_id'] = 45
504+
505+
# Create object storage directly using store()
506+
z = zarr.open(staged.store('raw_data', 'my_array.zarr'), mode='w', shape=(10000, 10000), dtype='f4')
495507
z[:] = compute_large_array()
496508

509+
# Assign the created object to the record
510+
staged.rec['raw_data'] = z
511+
497512
# On successful exit: metadata computed, record inserted
498513
# On exception: storage cleaned up, no record inserted
499514
```
500515

501-
#### StagedObject Interface
516+
#### StagedInsert Interface
502517

503518
```python
504-
@dataclass
505-
class StagedObject:
506-
"""Handle for staged write operations."""
519+
class StagedInsert:
520+
"""Context manager for staged insert operations."""
507521

508-
path: str # Reserved storage path
509-
full_path: str # Full URI (e.g., 's3://bucket/path')
510-
fs: fsspec.AbstractFileSystem # fsspec filesystem
511-
store: fsspec.FSMap # FSMap for Zarr/xarray
522+
rec: dict[str, Any] # Record dict for setting attribute values
512523

513-
def open(self, subpath: str = "", mode: str = "wb") -> IO:
514-
"""Open a file within the staged object for writing."""
524+
def store(self, field: str, name: str) -> fsspec.FSMap:
525+
"""
526+
Get an FSMap store for direct writes to an object field.
527+
528+
Args:
529+
field: Name of the object attribute
530+
name: Filename/dirname for the stored object
531+
532+
Returns:
533+
fsspec.FSMap suitable for Zarr/xarray
534+
"""
535+
...
536+
537+
def open(self, field: str, name: str, mode: str = "wb") -> IO:
538+
"""
539+
Open a file for direct writes to an object field.
540+
541+
Args:
542+
field: Name of the object attribute
543+
name: Filename for the stored object
544+
mode: File mode (default: "wb")
545+
546+
Returns:
547+
File-like object for writing
548+
"""
549+
...
550+
551+
@property
552+
def fs(self) -> fsspec.AbstractFileSystem:
553+
"""Return fsspec filesystem for advanced operations."""
515554
...
516555
```
517556

518557
#### Staged Insert Flow
519558

520559
```
521560
┌─────────────────────────────────────────────────────────┐
522-
│ 1. Reserve storage path with random token │
561+
│ 1. Enter context: create StagedInsert with empty rec │
562+
├─────────────────────────────────────────────────────────┤
563+
│ 2. User sets primary key values in staged.rec │
523564
├─────────────────────────────────────────────────────────┤
524-
│ 2. Return StagedObject handle to user │
565+
│ 3. User calls store()/open() to get storage handles │
566+
│ - Path reserved with random token on first call │
567+
│ - User writes data directly via fsspec │
525568
├─────────────────────────────────────────────────────────┤
526-
3. User writes data directly via fs/store
569+
4. User assigns object references to staged.rec
527570
├─────────────────────────────────────────────────────────┤
528-
4. On context exit (success): │
571+
5. On context exit (success): │
529572
│ - Compute metadata (size, hash, item_count) │
530573
│ - Execute database INSERT │
531574
├─────────────────────────────────────────────────────────┤
532-
5. On context exit (exception): │
575+
6. On context exit (exception): │
533576
│ - Delete any written data │
534577
│ - Re-raise exception │
535578
└─────────────────────────────────────────────────────────┘
@@ -542,23 +585,43 @@ import zarr
542585
import numpy as np
543586

544587
# Create a large Zarr array directly in object storage
545-
with Recording.stage_object(
546-
{"subject_id": 123, "session_id": 45},
547-
"neural_data",
548-
"spikes.zarr"
549-
) as staged:
550-
# Create Zarr hierarchy
551-
root = zarr.open(staged.store, mode='w')
588+
with Recording.staged_insert1 as staged:
589+
staged.rec['subject_id'] = 123
590+
staged.rec['session_id'] = 45
591+
592+
# Create Zarr hierarchy directly in object storage
593+
root = zarr.open(staged.store('neural_data', 'spikes.zarr'), mode='w')
552594
root.create_dataset('timestamps', data=np.arange(1000000))
553595
root.create_dataset('waveforms', shape=(1000000, 82), chunks=(10000, 82))
554596

555597
# Write in chunks (streaming from acquisition)
556598
for i, chunk in enumerate(data_stream):
557599
root['waveforms'][i*10000:(i+1)*10000] = chunk
558600

601+
# Assign to record
602+
staged.rec['neural_data'] = root
603+
559604
# Record automatically inserted with computed metadata
560605
```
561606

607+
#### Multiple Object Fields
608+
609+
```python
610+
with Recording.staged_insert1 as staged:
611+
staged.rec['subject_id'] = 123
612+
staged.rec['session_id'] = 45
613+
614+
# Write multiple object fields
615+
raw = zarr.open(staged.store('raw_data', 'raw.zarr'), mode='w', shape=(1000, 1000))
616+
raw[:] = raw_array
617+
618+
processed = zarr.open(staged.store('processed', 'processed.zarr'), mode='w', shape=(100, 100))
619+
processed[:] = processed_array
620+
621+
staged.rec['raw_data'] = raw
622+
staged.rec['processed'] = processed
623+
```
624+
562625
#### Comparison: Copy vs Staged Insert
563626

564627
| Aspect | Copy Insert | Staged Insert |
@@ -567,7 +630,8 @@ with Recording.stage_object(
567630
| Efficiency | Copy overhead | No copy needed |
568631
| Use case | Small files, existing data | Large arrays, streaming data |
569632
| Cleanup on failure | Orphan possible | Cleaned up |
570-
| API | `insert1({..., "field": path})` | `stage_object()` context manager |
633+
| API | `insert1({..., "field": path})` | `staged_insert1` context manager |
634+
| Multi-row | Supported | Not supported (insert1 only) |
571635

572636
## Transaction Handling
573637

0 commit comments

Comments
 (0)