Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 77 additions & 17 deletions chromadb/api/shared_system_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import ClassVar, Dict
import threading
import uuid

from chromadb.api import ServerAPI
Expand All @@ -9,37 +10,48 @@

class SharedSystemClient:
_identifier_to_system: ClassVar[Dict[str, System]] = {}
_identifier_to_clientcount: ClassVar[Dict[str, int]] = {}
_count_lock: ClassVar[threading.Lock] = threading.Lock()
_identifier: str
_closed: bool

def __init__(
self,
settings: Settings = Settings(),
) -> None:
self._identifier = SharedSystemClient._get_identifier_from_settings(settings)
SharedSystemClient._create_system_if_not_exists(self._identifier, settings)
with SharedSystemClient._count_lock:
SharedSystemClient._identifier_to_clientcount[self._identifier] = (
SharedSystemClient._identifier_to_clientcount.get(self._identifier, 0)
+ 1
)
self._closed = False

@classmethod
def _create_system_if_not_exists(
cls, identifier: str, settings: Settings
) -> System:
if identifier not in cls._identifier_to_system:
new_system = System(settings)
cls._identifier_to_system[identifier] = new_system
with cls._count_lock:
if identifier not in cls._identifier_to_system:
new_system = System(settings)
cls._identifier_to_system[identifier] = new_system
cls._identifier_to_clientcount[identifier] = 0

new_system.instance(ProductTelemetryClient)
new_system.instance(ServerAPI)
new_system.instance(ProductTelemetryClient)
new_system.instance(ServerAPI)

new_system.start()
else:
previous_system = cls._identifier_to_system[identifier]
new_system.start()
else:
previous_system = cls._identifier_to_system[identifier]

# For now, the settings must match
if previous_system.settings != settings:
raise ValueError(
f"An instance of Chroma already exists for {identifier} with different settings"
)
# For now, the settings must match
if previous_system.settings != settings:
raise ValueError(
f"An instance of Chroma already exists for {identifier} with different settings"
)

return cls._identifier_to_system[identifier]
return cls._identifier_to_system[identifier]

@staticmethod
def _get_identifier_from_settings(settings: Settings) -> str:
Expand Down Expand Up @@ -72,7 +84,8 @@ def _get_identifier_from_settings(settings: Settings) -> str:
@staticmethod
def _populate_data_from_system(system: System) -> str:
identifier = SharedSystemClient._get_identifier_from_settings(system.settings)
SharedSystemClient._identifier_to_system[identifier] = system
with SharedSystemClient._count_lock:
SharedSystemClient._identifier_to_system[identifier] = system
return identifier

@classmethod
Expand All @@ -83,13 +96,60 @@ def from_system(cls, system: System) -> "SharedSystemClient":
instance = cls(system.settings)
return instance

def close(self) -> None:
"""Explicitly cleanup this client's system reference."""
if hasattr(self, "_identifier") and not self._closed:
identifier = self._identifier
SharedSystemClient._decrement_refcount(identifier)
self._closed = True
# Prevent double-cleanup by removing identifier
delattr(self, "_identifier")

def __enter__(self) -> "SharedSystemClient":
return self

def __exit__(self, *args) -> None: # type: ignore
self.close()

def __del__(self) -> None:
"""Fallback cleanup - prefer using close() or context manager."""
try:
if hasattr(self, "_identifier") and not getattr(self, "_closed", False):
SharedSystemClient._decrement_refcount(self._identifier)
except Exception:
pass

@classmethod
def _decrement_refcount(cls, identifier: str) -> None:
"""Decrement reference count for a System and cleanup if no clients remain."""
with cls._count_lock:
if identifier not in cls._identifier_to_clientcount:
return

cls._identifier_to_clientcount[identifier] -= 1

if cls._identifier_to_clientcount[identifier] <= 0:
# since no more client using this system, can stop it and remove from cache
if identifier in cls._identifier_to_system:
system = cls._identifier_to_system[identifier]
system.stop()
del cls._identifier_to_system[identifier]
del cls._identifier_to_clientcount[identifier]

@staticmethod
def clear_system_cache() -> None:
SharedSystemClient._identifier_to_system = {}
"""Clear the system cache so that new systems can be created for an existing path.
This should only be used for testing purposes."""
with SharedSystemClient._count_lock:
for system in SharedSystemClient._identifier_to_system.values():
system.stop()
SharedSystemClient._identifier_to_system = {}
SharedSystemClient._identifier_to_clientcount = {}

@property
def _system(self) -> System:
return SharedSystemClient._identifier_to_system[self._identifier]
with SharedSystemClient._count_lock:
return SharedSystemClient._identifier_to_system[self._identifier]

def _submit_client_start_event(self) -> None:
telemetry_client = self._system.instance(ProductTelemetryClient)
Expand Down
Loading