Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 116 additions & 39 deletions proposals/BEP-1020-vfolder-destination-support-for-artifact-import.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Implemented-Version:

## Motivation

Currently, when users import artifacts via the `import_artifacts` GraphQL mutation, the download destination is determined by the pre-configured `artifact_storage` in the system settings. This limits flexibility as users cannot choose where to store their imported models.
Currently, when users import artifacts via the `import_artifacts` API, the download destination is determined by the pre-configured `artifact_storage` in the system settings. This limits flexibility as users cannot choose where to store their imported models.

### Current Structure

Expand Down Expand Up @@ -87,73 +87,150 @@ This enhancement allows users to download models to:

### API Changes

Add an optional `vfolderId` parameter to the `import_artifacts` mutation:

```graphql
mutation {
importArtifacts(
input: {
artifactRevisionIds: ["..."]
vfolderId: "uuid-of-target-vfolder" # New optional parameter
}
) {
...
}
}
Add an optional `vfolderId` parameter to the `import_artifacts` API.

### Architecture Changes

#### 1. VolumeStorageAdapter Class

To support vfolder destinations, we need a way to use `AbstractVolume` (the volume backend interface) as an artifact storage target. The existing import pipeline expects `AbstractStorage` interface, so we introduce `VolumeStorageAdapter` that bridges these two interfaces:

```python
class VolumeStorageAdapter(AbstractStorage):
"""
Adapter that wraps AbstractVolume to implement AbstractStorage interface.

This enables using any volume backend (VFS, XFS, NetApp, GPFS, Weka, VAST, CephFS, etc.)
as an artifact storage target without registering to StoragePool.
"""

def __init__(
self,
name: str,
volume: AbstractVolume,
vfid: VFolderID,
) -> None:
self._name = name
self._volume = volume
self._vfid = vfid

async def stream_upload(self, filepath: str, data_stream: StreamReader) -> None:
# Delegates to volume.add_file()
...

async def stream_download(self, filepath: str) -> StreamReader:
# Delegates to volume.read_file()
...

async def delete_file(self, filepath: str) -> None:
# Delegates to volume.delete_files()
...

async def get_file_info(self, filepath: str) -> VFSFileMetaResponse:
# Uses volume.sanitize_vfpath() + aiofiles.os.stat()
...
Comment on lines +117 to +131
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are they abstract methods from AbstractStorage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

```

When `vfolderId` is provided:
- The system resolves the vfolder's storage host and uses it for all import steps (DOWNLOAD, VERIFY, ARCHIVE)
- The vfolder's host format is `{proxy_name}:{volume_name}`, which is parsed to route requests correctly
**Key advantages of VolumeStorageAdapter:**
- Uses volume's native file operations (add_file, read_file, delete_files, mkdir)
- Supports all vfolder backends uniformly (VFS, XFS, NetApp, GPFS, Weka, VAST, CephFS, etc.)
- No StoragePool registration overhead
- Works with backend-specific quota management
- Delegates all operations to the volume, enabling backend-specific optimizations

### Architecture Changes
#### 2. StorageTarget Class

With `VolumeStorageAdapter`, we can now use volumes as artifact storage. However, the existing import pipeline uses storage names (strings) to look up storages from `StoragePool`. We need a way to pass either:
- A storage name (`str`) for pre-configured storages (existing behavior)
- A `VolumeStorageAdapter` instance for vfolder destinations (new behavior)

The `StorageTarget` class provides this unified interface:

#### 1. VFolderStorage Class
```python
class StorageTarget:
"""
Wrapper for storage step mapping that can be either a storage name (str)
or a storage instance (AbstractStorage).

Introduce a new `VFolderStorage` class that inherits from `AbstractStorage`.
When str: resolved via storage_pool.get_storage(name)
When AbstractStorage: used directly (e.g., VolumeStorageAdapter for VFolder imports)
"""

**Why is VFolderStorage needed?**
_value: str | AbstractStorage

The existing import pipeline is designed around the `AbstractStorage` interface with `storage_step_mappings` that map import steps to storage names. Rather than modifying the entire pipeline to handle vfolder paths directly, we create a `VFolderStorage` that:
def __init__(self, value: str | AbstractStorage) -> None:
self._value = value

- Implements the same `AbstractStorage` interface (stream_upload, stream_download, get_file_info, delete_file)
- Resolves the vfolder's base path using `volume.mangle_vfpath(vfid)`
- Integrates seamlessly with the existing pipeline without modifying service layer logic
@property
def name(self) -> str:
"""Get the storage name from this mapping."""
if isinstance(self._value, str):
return self._value
return getattr(self._value, "name", str(id(self._value)))

This approach follows the Open/Closed Principle—extending functionality by adding a new class rather than modifying existing code extensively.
def resolve(self, storage_pool: AbstractStoragePool) -> AbstractStorage:
"""Resolve this mapping to an AbstractStorage instance."""
if isinstance(self._value, AbstractStorage):
return self._value
return storage_pool.get_storage(self._value)
```

#### 2. Dynamic Storage Registration Pattern
#### 3. Updated Import Step Context

**Why register and unregister VFolderStorage for each request?**
The `ImportStepContext` is updated to use `StorageTarget` instead of string-only mappings:

Unlike pre-configured storages (VFSStorage, ObjectStorage) that are created at server startup from configuration files, VFolderStorage instances are:
```python
@dataclass
class ImportStepContext:
"""Context shared across import steps"""

1. **Created on-demand**: Each request may target a different vfolder
2. **Transient by nature**: The storage is only needed for the duration of the import task
3. **Potentially numerous**: There could be thousands of vfolders, making pre-registration impractical
model: ModelTarget
registry_name: str
storage_pool: AbstractStoragePool
storage_step_mappings: dict[ArtifactStorageImportStep, StorageTarget]
step_metadata: dict[str, Any]
```

The lifecycle is managed as follows:
Import steps now use `StorageTarget.resolve()` to get the storage instance:

```python
# In import step
storage = context.storage_step_mappings[self.step_type].resolve(context.storage_pool)
await storage.stream_upload(filepath, data_stream)
```

### Request Flow

```
API Request with vfolderId
Create VFolderStorage (name: vfolder_{request_id})
Parse vfid from request body
Get volume from VolumePool by volume_name
Register to StoragePool
Create VolumeStorageAdapter (name: vfolder_storage_{request_id})
Create StorageTarget wrapping VolumeStorageAdapter
Build storage_step_mappings with StorageTarget instances
Start background import task ──► Return response to client
[Background task runs...]
Task completes (success or failure)
│ Each step resolves storage via:
│ storage = mapping.resolve(storage_pool)
│ → Returns VolumeStorageAdapter directly
on_complete callback: Unregister from StoragePool
Task completes (success or failure)
```

Using `request_id` in the storage name ensures uniqueness and traceability. The cleanup callback guarantees no memory leaks even if the import fails.
Using `request_id` in the adapter name ensures uniqueness and traceability.
Loading