Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ help::

mypy: $(call FORALL_PKGS,mypy)
help::
@echo " mypy -- Run mypy on all files"
@echo " mypy -- Run mypy on all files"

ruff:
@uv run ruff check --fix
Expand Down
10 changes: 9 additions & 1 deletion src/storage/run-unasync.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@
paths = Path("src/storage3").glob("**/*.py")
tests = Path("tests").glob("**/*.py")

rules = (unasync._DEFAULT_RULE,)
rules = (
unasync.Rule(
fromdir="/_async/",
todir="/_sync/",
additional_replacements={"AsyncClient": "Client"},
),
unasync._DEFAULT_RULE,
)


files = [str(p) for p in list(paths) + list(tests)]

Expand Down
48 changes: 48 additions & 0 deletions src/storage/src/storage3/_async/analytics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import List, Optional

from httpx import QueryParams

from ..types import (
AnalyticsBucket,
AnalyticsBucketDeleteResponse,
AnalyticsBucketsParser,
SortColumn,
SortOrder,
)
from .request import AsyncRequestBuilder


class AsyncStorageAnalyticsClient:
def __init__(self, request: AsyncRequestBuilder) -> None:
self._request = request

async def create(self, bucket_name: str) -> AnalyticsBucket:
body = {"name": bucket_name}
data = await self._request.send(http_method="POST", path=["bucket"], body=body)
return AnalyticsBucket.model_validate(data.content)

async def list(
self,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort_column: Optional[SortColumn] = None,
sort_order: Optional[SortOrder] = None,
search: Optional[str] = None,
) -> List[AnalyticsBucket]:
params = QueryParams(
limit=limit,
offset=offset,
sort_column=sort_column,
sort_order=sort_order,
search=search,
)
data = await self._request.send(
http_method="GET", path=["bucket"], query_params=params
)
return AnalyticsBucketsParser.validate_json(data.content)

async def delete(self, bucket_name: str) -> AnalyticsBucketDeleteResponse:
data = await self._request.send(
http_method="DELETE", path=["bucket", bucket_name]
)
return AnalyticsBucketDeleteResponse.model_validate(data.content)
8 changes: 8 additions & 0 deletions src/storage/src/storage3/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ..version import __version__
from .bucket import AsyncStorageBucketAPI
from .file_api import AsyncBucketProxy
from .vectors import AsyncStorageVectorsClient

__all__ = [
"AsyncStorageClient",
Expand Down Expand Up @@ -80,3 +81,10 @@ def from_(self, id: str) -> AsyncBucketProxy:
The unique identifier of the bucket
"""
return AsyncBucketProxy(id, self._base_url, self._headers, self._client)

def vectors(self) -> AsyncStorageVectorsClient:
return AsyncStorageVectorsClient(
url=self._base_url.joinpath("v1", "vector"),
headers=self._headers,
session=self.session,
)
47 changes: 47 additions & 0 deletions src/storage/src/storage3/_async/request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Optional

from httpx import AsyncClient, Headers, HTTPStatusError, QueryParams, Response
from pydantic import ValidationError
from yarl import URL

from ..exceptions import StorageApiError, VectorBucketErrorMessage
from ..types import JSON, RequestMethod


class AsyncRequestBuilder:
def __init__(self, session: AsyncClient, base_url: URL, headers: Headers) -> None:
self._session = session
self._base_url = base_url
self.headers = headers

async def send(
self,
http_method: RequestMethod,
path: list[str],
body: JSON = None,
query_params: Optional[QueryParams] = None,
) -> Response:
response = await self._session.request(
method=http_method,
json=body,
url=str(self._base_url.joinpath(*path)),
headers=self.headers,
params=query_params or QueryParams(),
)
try:
response.raise_for_status()
return response
except HTTPStatusError as exc:
try:
error = VectorBucketErrorMessage.model_validate_json(response.content)
raise StorageApiError(
message=error.message,
code=error.code or "400",
status=error.statusCode,
) from exc
except ValidationError as exc:
raise StorageApiError(
message="The request failed, but could not parse error message response.",
code="LibraryError",
status=response.status_code,
) from exc
213 changes: 213 additions & 0 deletions src/storage/src/storage3/_async/vectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
from __future__ import annotations

from typing import List, Optional

from httpx import AsyncClient, Headers
from yarl import URL

from ..exceptions import StorageApiError, VectorBucketException
from ..types import (
JSON,
DistanceMetric,
GetVectorBucketResponse,
GetVectorIndexResponse,
GetVectorsResponse,
ListVectorBucketsResponse,
ListVectorIndexesResponse,
ListVectorsResponse,
MetadataConfiguration,
QueryVectorsResponse,
VectorBucket,
VectorData,
VectorFilter,
VectorIndex,
VectorMatch,
VectorObject,
)
from .request import AsyncRequestBuilder


# used to not send non-required values as `null`
# for they cannot be null
def remove_none(**kwargs: JSON) -> JSON:
return {key: val for key, val in kwargs.items() if val is not None}


class AsyncVectorBucketScope:
def __init__(self, request: AsyncRequestBuilder, bucket_name: str) -> None:
self._request = request
self._bucket_name = bucket_name

def with_metadata(self, **data: JSON) -> JSON:
return remove_none(vectorBucketName=self._bucket_name, **data)

async def create_index(
self,
index_name: str,
dimension: int,
distance_metric: DistanceMetric,
data_type: str,
metadata: Optional[MetadataConfiguration] = None,
) -> None:
body = self.with_metadata(
indexName=index_name,
dimension=dimension,
distanceMetric=distance_metric,
dataType=data_type,
metadataConfiguration=dict(metadata) if metadata else None,
)
await self._request.send(http_method="POST", path=["CreateIndex"], body=body)

async def get_index(self, index_name: str) -> Optional[VectorIndex]:
body = self.with_metadata(indexName=index_name)
try:
data = await self._request.send(
http_method="POST", path=["GetIndex"], body=body
)
return GetVectorIndexResponse.model_validate_json(data.content).index
except StorageApiError:
return None

async def list_indexes(
self,
next_token: Optional[str] = None,
max_results: Optional[int] = None,
prefix: Optional[str] = None,
) -> ListVectorIndexesResponse:
body = self.with_metadata(
next_token=next_token, max_results=max_results, prefix=prefix
)
data = await self._request.send(
http_method="POST", path=["ListIndexes"], body=body
)
return ListVectorIndexesResponse.model_validate_json(data.content)

async def delete_index(self, index_name: str) -> None:
body = self.with_metadata(indexName=index_name)
await self._request.send(http_method="POST", path=["DeleteIndex"], body=body)

def index(self, index_name: str) -> AsyncVectorIndexScope:
return AsyncVectorIndexScope(self._request, self._bucket_name, index_name)


class AsyncVectorIndexScope:
def __init__(
self, request: AsyncRequestBuilder, bucket_name: str, index_name: str
) -> None:
self._request = request
self._bucket_name = bucket_name
self._index_name = index_name

def with_metadata(self, **data: JSON) -> JSON:
return remove_none(
vectorBucketName=self._bucket_name,
indexName=self._index_name,
**data,
)

async def put(self, vectors: List[VectorObject]) -> None:
body = self.with_metadata(vectors=[v.as_json() for v in vectors])
await self._request.send(http_method="POST", path=["PutVectors"], body=body)

async def get(
self, *keys: str, return_data: bool = True, return_metadata: bool = True
) -> List[VectorMatch]:
body = self.with_metadata(
keys=keys, returnData=return_data, returnMetadata=return_metadata
)
data = await self._request.send(
http_method="POST", path=["GetVectors"], body=body
)
return GetVectorsResponse.model_validate_json(data.content).vectors

async def list(
self,
max_results: Optional[int] = None,
next_token: Optional[str] = None,
return_data: bool = True,
return_metadata: bool = True,
segment_count: Optional[int] = None,
segment_index: Optional[int] = None,
) -> ListVectorsResponse:
body = self.with_metadata(
maxResults=max_results,
nextToken=next_token,
returnData=return_data,
returnMetadata=return_metadata,
segmentCount=segment_count,
segmentIndex=segment_index,
)
data = await self._request.send(
http_method="POST", path=["ListVectors"], body=body
)
return ListVectorsResponse.model_validate_json(data.content)

async def query(
self,
query_vector: VectorData,
topK: Optional[int] = None,
filter: Optional[VectorFilter] = None,
return_distance: bool = True,
return_metadata: bool = True,
) -> QueryVectorsResponse:
body = self.with_metadata(
queryVector=dict(query_vector),
topK=topK,
filter=filter,
returnDistance=return_distance,
returnMetadata=return_metadata,
)
data = await self._request.send(
http_method="POST", path=["QueryVectors"], body=body
)
return QueryVectorsResponse.model_validate_json(data.content)

async def delete(self, keys: List[str]) -> None:
if 1 < len(keys) or len(keys) > 500:
raise VectorBucketException("Keys batch size must be between 1 and 500.")
body = self.with_metadata(keys=keys)
await self._request.send(http_method="POST", path=["DeleteVectors"], body=body)


class AsyncStorageVectorsClient:
def __init__(self, url: URL, headers: Headers, session: AsyncClient) -> None:
self._request = AsyncRequestBuilder(session, base_url=URL(url), headers=headers)

def from_(self, bucket_name: str) -> AsyncVectorBucketScope:
return AsyncVectorBucketScope(self._request, bucket_name)

async def create_bucket(self, bucket_name: str) -> None:
body = {"vectorBucketName": bucket_name}
await self._request.send(
http_method="POST", path=["CreateVectorBucket"], body=body
)

async def get_bucket(self, bucket_name: str) -> Optional[VectorBucket]:
body = {"vectorBucketName": bucket_name}
try:
data = await self._request.send(
http_method="POST", path=["GetVectorBucket"], body=body
)
return GetVectorBucketResponse.model_validate_json(
data.content
).vectorBucket
except StorageApiError:
return None

async def list_buckets(
self,
prefix: Optional[str] = None,
max_results: Optional[int] = None,
next_token: Optional[str] = None,
) -> ListVectorBucketsResponse:
body = remove_none(prefix=prefix, maxResults=max_results, nextToken=next_token)
data = await self._request.send(
http_method="POST", path=["ListVectorBuckets"], body=body
)
return ListVectorBucketsResponse.model_validate_json(data.content)

async def delete_bucket(self, bucket_name: str) -> None:
body = {"vectorBucketName": bucket_name}
await self._request.send(
http_method="POST", path=["DeleteVectorBucket"], body=body
)
Loading