Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 67 additions & 14 deletions chromadb/api/async_fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@
import logging
import httpx
from overrides import override
from tenacity import (
AsyncRetrying,
RetryError,
before_sleep_log,
retry_if_exception,
stop_after_attempt,
wait_exponential,
wait_random_exponential,
)
from chromadb import __version__
from chromadb.auth import UserIdentity
from chromadb.api.async_api import AsyncServerAPI
Expand All @@ -16,6 +25,7 @@
create_collection_configuration_to_json,
update_collection_configuration_to_json,
)
from chromadb.api.fastapi import is_retryable_exception
from chromadb.config import DEFAULT_DATABASE, DEFAULT_TENANT, System, Settings
from chromadb.telemetry.opentelemetry import (
OpenTelemetryClient,
Expand Down Expand Up @@ -140,20 +150,63 @@ def _get_client(self) -> httpx.AsyncClient:
async def _make_request(
self, method: str, path: str, **kwargs: Dict[str, Any]
) -> Any:
# If the request has json in kwargs, use orjson to serialize it,
# remove it from kwargs, and add it to the content parameter
# This is because httpx uses a slower json serializer
if "json" in kwargs:
data = orjson.dumps(kwargs.pop("json"), option=orjson.OPT_SERIALIZE_NUMPY)
kwargs["content"] = data

# Unlike requests, httpx does not automatically escape the path
escaped_path = urllib.parse.quote(path, safe="/", encoding=None, errors=None)
url = self._api_url + escaped_path

response = await self._get_client().request(method, url, **cast(Any, kwargs))
BaseHTTPClient._raise_chroma_error(response)
return orjson.loads(response.text)
async def _send_request() -> Any:
# If the request has json in kwargs, use orjson to serialize it,
# remove it from kwargs, and add it to the content parameter
# This is because httpx uses a slower json serializer
if "json" in kwargs:
data = orjson.dumps(
kwargs.pop("json"), option=orjson.OPT_SERIALIZE_NUMPY
)
kwargs["content"] = data

# Unlike requests, httpx does not automatically escape the path
escaped_path = urllib.parse.quote(
path, safe="/", encoding=None, errors=None
)
url = self._api_url + escaped_path

response = await self._get_client().request(
method, url, **cast(Any, kwargs)
)
BaseHTTPClient._raise_chroma_error(response)
return orjson.loads(response.text)

retry_config = self._settings.retry_config

if retry_config is None:
return await _send_request()

min_delay = max(float(retry_config.min_delay), 0.0)
max_delay = max(float(retry_config.max_delay), min_delay)
multiplier = max(min_delay, 1e-3)
exp_base = retry_config.factor if retry_config.factor > 0 else 2.0

wait_args = {
"multiplier": multiplier,
"min": min_delay,
"max": max_delay,
"exp_base": exp_base,
}

wait_strategy = (
wait_random_exponential(**wait_args)
if retry_config.jitter
else wait_exponential(**wait_args)
)

retrying = AsyncRetrying(
stop=stop_after_attempt(retry_config.max_attempts),
wait=wait_strategy,
retry=retry_if_exception(is_retryable_exception),
before_sleep=before_sleep_log(logger, logging.INFO),
reraise=True,
)

try:
return await retrying(_send_request)
except RetryError as e:
raise e.last_attempt.exception() from None

@trace_method("AsyncFastAPI.heartbeat", OpenTelemetryGranularity.OPERATION)
@override
Expand Down
101 changes: 87 additions & 14 deletions chromadb/api/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@
import httpx
import urllib.parse
from overrides import override
from tenacity import (
RetryError,
Retrying,
before_sleep_log,
retry_if_exception,
stop_after_attempt,
wait_exponential,
wait_random_exponential,
)

from chromadb.api.collection_configuration import (
CreateCollectionConfiguration,
Expand Down Expand Up @@ -57,6 +66,28 @@
logger = logging.getLogger(__name__)


def is_retryable_exception(exception: BaseException) -> bool:
if isinstance(
exception,
(
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.WriteTimeout,
httpx.PoolTimeout,
httpx.NetworkError,
httpx.RemoteProtocolError,
),
):
return True

if isinstance(exception, httpx.HTTPStatusError):
# Retry on server errors that might be temporary
return exception.response.status_code in [502, 503, 504]

return False


class FastAPI(BaseHTTPClient, ServerAPI):
def __init__(self, system: System):
super().__init__(system)
Expand Down Expand Up @@ -97,20 +128,62 @@ def __init__(self, system: System):
self._session.headers[header] = value.get_secret_value()

def _make_request(self, method: str, path: str, **kwargs: Dict[str, Any]) -> Any:
# If the request has json in kwargs, use orjson to serialize it,
# remove it from kwargs, and add it to the content parameter
# This is because httpx uses a slower json serializer
if "json" in kwargs:
data = orjson.dumps(kwargs.pop("json"), option=orjson.OPT_SERIALIZE_NUMPY)
kwargs["content"] = data

# Unlike requests, httpx does not automatically escape the path
escaped_path = urllib.parse.quote(path, safe="/", encoding=None, errors=None)
url = self._api_url + escaped_path

response = self._session.request(method, url, **cast(Any, kwargs))
BaseHTTPClient._raise_chroma_error(response)
return orjson.loads(response.text)
def _send_request() -> Any:
# If the request has json in kwargs, use orjson to serialize it,
# remove it from kwargs, and add it to the content parameter
# This is because httpx uses a slower json serializer
if "json" in kwargs:
data = orjson.dumps(
kwargs.pop("json"), option=orjson.OPT_SERIALIZE_NUMPY
)
kwargs["content"] = data

# Unlike requests, httpx does not automatically escape the path
escaped_path = urllib.parse.quote(
path, safe="/", encoding=None, errors=None
)
url = self._api_url + escaped_path

response = self._session.request(method, url, **cast(Any, kwargs))
BaseHTTPClient._raise_chroma_error(response)
return orjson.loads(response.text)

retry_config = self._settings.retry_config

if retry_config is None:
return _send_request()

min_delay = max(float(retry_config.min_delay), 0.0)
max_delay = max(float(retry_config.max_delay), min_delay)
multiplier = max(min_delay, 1e-3)
exp_base = retry_config.factor if retry_config.factor > 0 else 2.0

wait_args = {
"multiplier": multiplier,
"min": min_delay,
"max": max_delay,
"exp_base": exp_base,
}

wait_strategy = (
wait_random_exponential(**wait_args)
if retry_config.jitter
else wait_exponential(**wait_args)
)

retrying = Retrying(
stop=stop_after_attempt(retry_config.max_attempts),
wait=wait_strategy,
retry=retry_if_exception(is_retryable_exception),
before_sleep=before_sleep_log(logger, logging.INFO),
reraise=True,
)

try:
return retrying(_send_request)
except RetryError as e:
# Re-raise the last exception that caused the retry to fail
raise e.last_attempt.exception() from None
Comment on lines 130 to +186
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

[CodeDuplication] The retry logic implemented in _make_request is nearly identical to the logic in chromadb/api/async_fastapi.py's _make_request method (lines 153-209). This introduces code duplication, making future maintenance harder.

Consider refactoring the common retry configuration and execution logic into a shared helper function. This would centralize the retry strategy and reduce redundancy. For example, a helper could build the Retrying or AsyncRetrying object based on a flag.

Context for Agents
[**BestPractice**]

[CodeDuplication] The retry logic implemented in `_make_request` is nearly identical to the logic in `chromadb/api/async_fastapi.py`'s `_make_request` method (lines 153-209). This introduces code duplication, making future maintenance harder.

Consider refactoring the common retry configuration and execution logic into a shared helper function. This would centralize the retry strategy and reduce redundancy. For example, a helper could build the `Retrying` or `AsyncRetrying` object based on a flag.

File: chromadb/api/fastapi.py
Line: 186


@trace_method("FastAPI.heartbeat", OpenTelemetryGranularity.OPERATION)
@override
Expand Down
11 changes: 11 additions & 0 deletions chromadb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from overrides import override
from typing_extensions import Literal
import platform
from pydantic import BaseModel

in_pydantic_v2 = False
try:
Expand Down Expand Up @@ -97,6 +98,14 @@ class APIVersion(str, Enum):
V2 = "/api/v2"


class RetryConfig(BaseModel):
factor: float = 2.0
min_delay: int = 1
max_delay: int = 5
Comment on lines +103 to +104
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

For consistency with the Javascript client and to allow for more granular control over backoff timing, consider changing min_delay and max_delay to float type. The implementation already casts these values to float, so this change would make the model's type hint more accurate.

Suggested change
min_delay: int = 1
max_delay: int = 5
min_delay: float = 1.0
max_delay: float = 5.0

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**BestPractice**]

For consistency with the Javascript client and to allow for more granular control over backoff timing, consider changing `min_delay` and `max_delay` to `float` type. The implementation already casts these values to float, so this change would make the model's type hint more accurate.

```suggestion
    min_delay: float = 1.0
    max_delay: float = 5.0
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: chromadb/config.py
Line: 104

max_attempts: int = 5
jitter: bool = True


# NOTE(hammadb) 1/13/2024 - This has to be in config.py instead of being localized to the module
# that uses it because of a circular import issue. This is a temporary solution until we can
# refactor the code to remove the circular import.
Expand Down Expand Up @@ -133,6 +142,8 @@ def empty_str_to_none(cls, v: str) -> Optional[str]:
return None
return v

retry_config: Optional[RetryConfig] = RetryConfig()

chroma_server_nofile: Optional[int] = None
# the number of maximum threads to handle synchronous tasks in the FastAPI server
chroma_server_thread_pool_size: int = 40
Expand Down
20 changes: 16 additions & 4 deletions clients/new-js/packages/chromadb/src/admin-client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { defaultAdminClientArgs, HttpMethod, normalizeMethod } from "./utils";
import { createClient, createConfig } from "@hey-api/client-fetch";
import { Database, DefaultService as Api } from "./api";
import { chromaFetch } from "./chroma-fetch";
import { createChromaFetch } from "./chroma-fetch";
import type { RetryConfig } from "./retry";

/**
* Configuration options for the AdminClient.
Expand All @@ -17,6 +18,8 @@ export interface AdminClientArgs {
headers?: Record<string, string>;
/** Additional fetch options for HTTP requests */
fetchOptions?: RequestInit;
/** Retry configuration for HTTP requests. Set to null to disable retries */
retryConfig?: RetryConfig | null;
}

/**
Expand All @@ -43,8 +46,17 @@ export class AdminClient {
* @param args - Optional configuration for the admin client
*/
constructor(args?: AdminClientArgs) {
const { host, port, ssl, headers, fetchOptions } =
args || defaultAdminClientArgs;
const {
host,
port,
ssl,
headers,
fetchOptions,
retryConfig,
} = {
...defaultAdminClientArgs,
...(args ?? {}),
};

const baseUrl = `${ssl ? "https" : "http"}://${host}:${port}`;

Expand All @@ -56,7 +68,7 @@ export class AdminClient {
};

this.apiClient = createClient(createConfig(configOptions));
this.apiClient.setConfig({ fetch: chromaFetch });
this.apiClient.setConfig({ fetch: createChromaFetch({ retryConfig }) });
}

/**
Expand Down
8 changes: 6 additions & 2 deletions clients/new-js/packages/chromadb/src/chroma-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import { DefaultService as Api, ChecklistResponse } from "./api";
import { CollectionMetadata, UserIdentity } from "./types";
import { Collection, CollectionImpl } from "./collection";
import { EmbeddingFunction, getEmbeddingFunction } from "./embedding-function";
import { chromaFetch } from "./chroma-fetch";
import { createChromaFetch } from "./chroma-fetch";
import type { RetryConfig } from "./retry";
import * as process from "node:process";
import {
ChromaConnectionError,
Expand Down Expand Up @@ -39,6 +40,8 @@ export interface ChromaClientArgs {
headers?: Record<string, string>;
/** Additional fetch options for HTTP requests */
fetchOptions?: RequestInit;
/** Retry configuration for HTTP requests. Set to null to disable retries */
retryConfig?: RetryConfig | null;
/** @deprecated Use host, port, and ssl instead */
path?: string;
/** @deprecated */
Expand Down Expand Up @@ -68,6 +71,7 @@ export class ChromaClient {
database = defaultArgs.database,
headers = defaultArgs.headers,
fetchOptions = defaultArgs.fetchOptions,
retryConfig = defaultArgs.retryConfig,
} = args;

if (args.path) {
Expand Down Expand Up @@ -109,7 +113,7 @@ export class ChromaClient {
};

this.apiClient = createClient(createConfig(configOptions));
this.apiClient.setConfig({ fetch: chromaFetch });
this.apiClient.setConfig({ fetch: createChromaFetch({ retryConfig }) });
}

/**
Expand Down
Loading
Loading