Skip to content

Commit 2a32911

Browse files
authored
fix: self.mem_cubes RuntimeError: dictionary changed size during iteration (#216)
* feat: reorgniza code * feat: re org code and fix bad case ref for equation * fix: dictionary changed size during iteration for product serve
1 parent 9ae9737 commit 2a32911

File tree

5 files changed

+668
-3
lines changed

5 files changed

+668
-3
lines changed

src/memos/mem_os/core.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from memos.memories.activation.item import ActivationMemoryItem
2525
from memos.memories.parametric.item import ParametricMemoryItem
2626
from memos.memories.textual.item import TextualMemoryItem, TextualMemoryMetadata
27+
from memos.memos_tools.thread_safe_dict import ThreadSafeDict
2728
from memos.templates.mos_prompts import QUERY_REWRITING_PROMPT
2829
from memos.types import ChatHistory, MessageList, MOSSearchResult
2930

@@ -42,10 +43,13 @@ def __init__(self, config: MOSConfig, user_manager: UserManager | None = None):
4243
self.config = config
4344
self.user_id = config.user_id
4445
self.session_id = config.session_id
45-
self.mem_cubes: dict[str, GeneralMemCube] = {}
4646
self.chat_llm = LLMFactory.from_config(config.chat_model)
4747
self.mem_reader = MemReaderFactory.from_config(config.mem_reader)
4848
self.chat_history_manager: dict[str, ChatHistory] = {}
49+
# use thread safe dict for multi-user product-server scenario
50+
self.mem_cubes: ThreadSafeDict[str, GeneralMemCube] = (
51+
ThreadSafeDict() if user_manager is not None else {}
52+
)
4953
self._register_chat_history()
5054

5155
# Use provided user_manager or create a new one
@@ -575,7 +579,13 @@ def search(
575579
}
576580
if install_cube_ids is None:
577581
install_cube_ids = user_cube_ids
578-
for mem_cube_id, mem_cube in self.mem_cubes.items():
582+
# create exist dict in mem_cubes and avoid one search slow
583+
tmp_mem_cubes = {}
584+
for mem_cube_id in install_cube_ids:
585+
if mem_cube_id in self.mem_cubes:
586+
tmp_mem_cubes[mem_cube_id] = self.mem_cubes.get(mem_cube_id)
587+
588+
for mem_cube_id, mem_cube in tmp_mem_cubes.items():
579589
if (
580590
(mem_cube_id in install_cube_ids)
581591
and (mem_cube.text_mem is not None)
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
"""
2+
Lock-free dictionary implementation using copy-on-write strategy.
3+
This provides better performance but uses more memory.
4+
"""
5+
6+
import threading
7+
8+
from collections.abc import ItemsView, Iterator, KeysView, ValuesView
9+
from typing import Generic, TypeVar
10+
11+
12+
K = TypeVar("K")
13+
V = TypeVar("V")
14+
15+
16+
class CopyOnWriteDict(Generic[K, V]):
17+
"""
18+
A lock-free dictionary using copy-on-write strategy.
19+
20+
Reads are completely lock-free and very fast.
21+
Writes create a new copy of the dictionary.
22+
Uses more memory but provides excellent read performance.
23+
"""
24+
25+
def __init__(self, initial_dict: dict[K, V] | None = None):
26+
"""Initialize with optional initial dictionary."""
27+
self._dict = initial_dict.copy() if initial_dict else {}
28+
self._write_lock = threading.Lock() # Only for writes
29+
30+
def __getitem__(self, key: K) -> V:
31+
"""Get item by key - completely lock-free."""
32+
return self._dict[key]
33+
34+
def __setitem__(self, key: K, value: V) -> None:
35+
"""Set item by key - uses copy-on-write."""
36+
with self._write_lock:
37+
# Create a new dictionary with the update
38+
new_dict = self._dict.copy()
39+
new_dict[key] = value
40+
# Atomic replacement
41+
self._dict = new_dict
42+
43+
def __delitem__(self, key: K) -> None:
44+
"""Delete item by key - uses copy-on-write."""
45+
with self._write_lock:
46+
new_dict = self._dict.copy()
47+
del new_dict[key]
48+
self._dict = new_dict
49+
50+
def __contains__(self, key: K) -> bool:
51+
"""Check if key exists - completely lock-free."""
52+
return key in self._dict
53+
54+
def __len__(self) -> int:
55+
"""Get length - completely lock-free."""
56+
return len(self._dict)
57+
58+
def __bool__(self) -> bool:
59+
"""Check if not empty - completely lock-free."""
60+
return bool(self._dict)
61+
62+
def __iter__(self) -> Iterator[K]:
63+
"""Iterate over keys - completely lock-free."""
64+
return iter(self._dict.keys())
65+
66+
def get(self, key: K, default: V | None = None) -> V:
67+
"""Get with default - completely lock-free."""
68+
return self._dict.get(key, default)
69+
70+
def keys(self) -> KeysView[K]:
71+
"""Get keys - completely lock-free."""
72+
return self._dict.keys()
73+
74+
def values(self) -> ValuesView[V]:
75+
"""Get values - completely lock-free."""
76+
return self._dict.values()
77+
78+
def items(self) -> ItemsView[K, V]:
79+
"""Get items - completely lock-free."""
80+
return self._dict.items()
81+
82+
def copy(self) -> dict[K, V]:
83+
"""Create a copy - completely lock-free."""
84+
return self._dict.copy()
85+
86+
def update(self, *args, **kwargs) -> None:
87+
"""Update dictionary - uses copy-on-write."""
88+
with self._write_lock:
89+
new_dict = self._dict.copy()
90+
new_dict.update(*args, **kwargs)
91+
self._dict = new_dict
92+
93+
def clear(self) -> None:
94+
"""Clear all items."""
95+
with self._write_lock:
96+
self._dict = {}
97+
98+
def pop(self, key: K, *args) -> V:
99+
"""Pop item by key."""
100+
with self._write_lock:
101+
new_dict = self._dict.copy()
102+
result = new_dict.pop(key, *args)
103+
self._dict = new_dict
104+
return result
105+
106+
def setdefault(self, key: K, default: V | None = None) -> V:
107+
"""Set default value for key if not exists."""
108+
# Fast path for existing keys
109+
if key in self._dict:
110+
return self._dict[key]
111+
112+
with self._write_lock:
113+
# Double-check after acquiring lock
114+
if key in self._dict:
115+
return self._dict[key]
116+
117+
new_dict = self._dict.copy()
118+
result = new_dict.setdefault(key, default)
119+
self._dict = new_dict
120+
return result

0 commit comments

Comments
 (0)