diff --git a/chromadb/api/shared_system_client.py b/chromadb/api/shared_system_client.py index d5c8b76fb6c..d60be980ca2 100644 --- a/chromadb/api/shared_system_client.py +++ b/chromadb/api/shared_system_client.py @@ -1,4 +1,5 @@ from typing import ClassVar, Dict +import threading import uuid from chromadb.api import ServerAPI @@ -9,7 +10,10 @@ class SharedSystemClient: _identifier_to_system: ClassVar[Dict[str, System]] = {} + _identifier_to_clientcount: ClassVar[Dict[str, int]] = {} + _count_lock: ClassVar[threading.Lock] = threading.Lock() _identifier: str + _closed: bool def __init__( self, @@ -17,29 +21,37 @@ def __init__( ) -> None: self._identifier = SharedSystemClient._get_identifier_from_settings(settings) SharedSystemClient._create_system_if_not_exists(self._identifier, settings) + with SharedSystemClient._count_lock: + SharedSystemClient._identifier_to_clientcount[self._identifier] = ( + SharedSystemClient._identifier_to_clientcount.get(self._identifier, 0) + + 1 + ) + self._closed = False @classmethod def _create_system_if_not_exists( cls, identifier: str, settings: Settings ) -> System: - if identifier not in cls._identifier_to_system: - new_system = System(settings) - cls._identifier_to_system[identifier] = new_system + with cls._count_lock: + if identifier not in cls._identifier_to_system: + new_system = System(settings) + cls._identifier_to_system[identifier] = new_system + cls._identifier_to_clientcount[identifier] = 0 - new_system.instance(ProductTelemetryClient) - new_system.instance(ServerAPI) + new_system.instance(ProductTelemetryClient) + new_system.instance(ServerAPI) - new_system.start() - else: - previous_system = cls._identifier_to_system[identifier] + new_system.start() + else: + previous_system = cls._identifier_to_system[identifier] - # For now, the settings must match - if previous_system.settings != settings: - raise ValueError( - f"An instance of Chroma already exists for {identifier} with different settings" - ) + # For now, the settings must match + if previous_system.settings != settings: + raise ValueError( + f"An instance of Chroma already exists for {identifier} with different settings" + ) - return cls._identifier_to_system[identifier] + return cls._identifier_to_system[identifier] @staticmethod def _get_identifier_from_settings(settings: Settings) -> str: @@ -72,7 +84,8 @@ def _get_identifier_from_settings(settings: Settings) -> str: @staticmethod def _populate_data_from_system(system: System) -> str: identifier = SharedSystemClient._get_identifier_from_settings(system.settings) - SharedSystemClient._identifier_to_system[identifier] = system + with SharedSystemClient._count_lock: + SharedSystemClient._identifier_to_system[identifier] = system return identifier @classmethod @@ -83,13 +96,60 @@ def from_system(cls, system: System) -> "SharedSystemClient": instance = cls(system.settings) return instance + def close(self) -> None: + """Explicitly cleanup this client's system reference.""" + if hasattr(self, "_identifier") and not self._closed: + identifier = self._identifier + SharedSystemClient._decrement_refcount(identifier) + self._closed = True + # Prevent double-cleanup by removing identifier + delattr(self, "_identifier") + + def __enter__(self) -> "SharedSystemClient": + return self + + def __exit__(self, *args) -> None: # type: ignore + self.close() + + def __del__(self) -> None: + """Fallback cleanup - prefer using close() or context manager.""" + try: + if hasattr(self, "_identifier") and not getattr(self, "_closed", False): + SharedSystemClient._decrement_refcount(self._identifier) + except Exception: + pass + + @classmethod + def _decrement_refcount(cls, identifier: str) -> None: + """Decrement reference count for a System and cleanup if no clients remain.""" + with cls._count_lock: + if identifier not in cls._identifier_to_clientcount: + return + + cls._identifier_to_clientcount[identifier] -= 1 + + if cls._identifier_to_clientcount[identifier] <= 0: + # since no more client using this system, can stop it and remove from cache + if identifier in cls._identifier_to_system: + system = cls._identifier_to_system[identifier] + system.stop() + del cls._identifier_to_system[identifier] + del cls._identifier_to_clientcount[identifier] + @staticmethod def clear_system_cache() -> None: - SharedSystemClient._identifier_to_system = {} + """Clear the system cache so that new systems can be created for an existing path. + This should only be used for testing purposes.""" + with SharedSystemClient._count_lock: + for system in SharedSystemClient._identifier_to_system.values(): + system.stop() + SharedSystemClient._identifier_to_system = {} + SharedSystemClient._identifier_to_clientcount = {} @property def _system(self) -> System: - return SharedSystemClient._identifier_to_system[self._identifier] + with SharedSystemClient._count_lock: + return SharedSystemClient._identifier_to_system[self._identifier] def _submit_client_start_event(self) -> None: telemetry_client = self._system.instance(ProductTelemetryClient)