Skip to content

Commit bb7d586

Browse files
committed
[ENH] Clean up System instances when clients are destroyed
1 parent 091f8bd commit bb7d586

File tree

1 file changed

+77
-17
lines changed

1 file changed

+77
-17
lines changed

chromadb/api/shared_system_client.py

Lines changed: 77 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from typing import ClassVar, Dict
2+
import threading
23
import uuid
34

45
from chromadb.api import ServerAPI
@@ -9,37 +10,48 @@
910

1011
class SharedSystemClient:
1112
_identifier_to_system: ClassVar[Dict[str, System]] = {}
13+
_identifier_to_clientcount: ClassVar[Dict[str, int]] = {}
14+
_count_lock: ClassVar[threading.Lock] = threading.Lock()
1215
_identifier: str
16+
_closed: bool
1317

1418
def __init__(
1519
self,
1620
settings: Settings = Settings(),
1721
) -> None:
1822
self._identifier = SharedSystemClient._get_identifier_from_settings(settings)
1923
SharedSystemClient._create_system_if_not_exists(self._identifier, settings)
24+
with SharedSystemClient._count_lock:
25+
SharedSystemClient._identifier_to_clientcount[self._identifier] = (
26+
SharedSystemClient._identifier_to_clientcount.get(self._identifier, 0)
27+
+ 1
28+
)
29+
self._closed = False
2030

2131
@classmethod
2232
def _create_system_if_not_exists(
2333
cls, identifier: str, settings: Settings
2434
) -> System:
25-
if identifier not in cls._identifier_to_system:
26-
new_system = System(settings)
27-
cls._identifier_to_system[identifier] = new_system
35+
with cls._count_lock:
36+
if identifier not in cls._identifier_to_system:
37+
new_system = System(settings)
38+
cls._identifier_to_system[identifier] = new_system
39+
cls._identifier_to_clientcount[identifier] = 0
2840

29-
new_system.instance(ProductTelemetryClient)
30-
new_system.instance(ServerAPI)
41+
new_system.instance(ProductTelemetryClient)
42+
new_system.instance(ServerAPI)
3143

32-
new_system.start()
33-
else:
34-
previous_system = cls._identifier_to_system[identifier]
44+
new_system.start()
45+
else:
46+
previous_system = cls._identifier_to_system[identifier]
3547

36-
# For now, the settings must match
37-
if previous_system.settings != settings:
38-
raise ValueError(
39-
f"An instance of Chroma already exists for {identifier} with different settings"
40-
)
48+
# For now, the settings must match
49+
if previous_system.settings != settings:
50+
raise ValueError(
51+
f"An instance of Chroma already exists for {identifier} with different settings"
52+
)
4153

42-
return cls._identifier_to_system[identifier]
54+
return cls._identifier_to_system[identifier]
4355

4456
@staticmethod
4557
def _get_identifier_from_settings(settings: Settings) -> str:
@@ -72,7 +84,8 @@ def _get_identifier_from_settings(settings: Settings) -> str:
7284
@staticmethod
7385
def _populate_data_from_system(system: System) -> str:
7486
identifier = SharedSystemClient._get_identifier_from_settings(system.settings)
75-
SharedSystemClient._identifier_to_system[identifier] = system
87+
with SharedSystemClient._count_lock:
88+
SharedSystemClient._identifier_to_system[identifier] = system
7689
return identifier
7790

7891
@classmethod
@@ -83,13 +96,60 @@ def from_system(cls, system: System) -> "SharedSystemClient":
8396
instance = cls(system.settings)
8497
return instance
8598

99+
def close(self) -> None:
100+
"""Explicitly cleanup this client's system reference."""
101+
if hasattr(self, "_identifier") and not self._closed:
102+
identifier = self._identifier
103+
SharedSystemClient._decrement_refcount(identifier)
104+
self._closed = True
105+
# Prevent double-cleanup by removing identifier
106+
delattr(self, "_identifier")
107+
108+
def __enter__(self) -> "SharedSystemClient":
109+
return self
110+
111+
def __exit__(self, *args) -> None: # type: ignore
112+
self.close()
113+
114+
def __del__(self) -> None:
115+
"""Fallback cleanup - prefer using close() or context manager."""
116+
try:
117+
if hasattr(self, "_identifier") and not getattr(self, "_closed", False):
118+
SharedSystemClient._decrement_refcount(self._identifier)
119+
except Exception:
120+
pass
121+
122+
@classmethod
123+
def _decrement_refcount(cls, identifier: str) -> None:
124+
"""Decrement reference count for a System and cleanup if no clients remain."""
125+
with cls._count_lock:
126+
if identifier not in cls._identifier_to_clientcount:
127+
return
128+
129+
cls._identifier_to_clientcount[identifier] -= 1
130+
131+
if cls._identifier_to_clientcount[identifier] <= 0:
132+
# since no more client using this system, can stop it and remove from cache
133+
if identifier in cls._identifier_to_system:
134+
system = cls._identifier_to_system[identifier]
135+
system.stop()
136+
del cls._identifier_to_system[identifier]
137+
del cls._identifier_to_clientcount[identifier]
138+
86139
@staticmethod
87140
def clear_system_cache() -> None:
88-
SharedSystemClient._identifier_to_system = {}
141+
"""Clear the system cache so that new systems can be created for an existing path.
142+
This should only be used for testing purposes."""
143+
with SharedSystemClient._count_lock:
144+
for system in SharedSystemClient._identifier_to_system.values():
145+
system.stop()
146+
SharedSystemClient._identifier_to_system = {}
147+
SharedSystemClient._identifier_to_clientcount = {}
89148

90149
@property
91150
def _system(self) -> System:
92-
return SharedSystemClient._identifier_to_system[self._identifier]
151+
with SharedSystemClient._count_lock:
152+
return SharedSystemClient._identifier_to_system[self._identifier]
93153

94154
def _submit_client_start_event(self) -> None:
95155
telemetry_client = self._system.instance(ProductTelemetryClient)

0 commit comments

Comments
 (0)