Skip to content

Commit f54612e

Browse files
committed
feat: add Redis-based ORM with multiprocess synchronization
- Add RedisDBManager and RedisLockableORM classes - Implement atomic locking mechanism for concurrent access - Add merge functionality for different object types - Include comprehensive test suite and examples - Fix Redis key type conflicts in lock operations
1 parent 667814c commit f54612e

File tree

6 files changed

+1264
-2
lines changed

6 files changed

+1264
-2
lines changed

examples/mem_scheduler/orm_examples.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
for MySQL and Redis connections.
77
"""
88

9+
import multiprocessing
910
import os
1011
import sys
1112

@@ -17,6 +18,7 @@
1718

1819
from memos.log import get_logger
1920
from memos.mem_scheduler.orm_modules.base_model import BaseDBManager, DatabaseError
21+
from memos.mem_scheduler.orm_modules.redis_model import RedisDBManager, SimpleListManager
2022

2123

2224
logger = get_logger(__name__)
@@ -171,6 +173,175 @@ def test_manual_env_loading():
171173
print(f"❌ Error loading environment file: {e}")
172174

173175

176+
def test_redis_lockable_orm_with_list():
177+
"""Test RedisDBManager with list[str] type synchronization"""
178+
print("\n" + "=" * 60)
179+
print("Testing RedisDBManager with list[str]")
180+
print("=" * 60)
181+
182+
try:
183+
from memos.mem_scheduler.orm_modules.redis_model import RedisDBManager
184+
185+
# Create a simple list manager instance
186+
list_manager = SimpleListManager(["apple", "banana", "cherry"])
187+
print(f"Original list manager: {list_manager}")
188+
189+
# Create RedisDBManager instance
190+
redis_client = BaseDBManager.load_redis_engine_from_env()
191+
if redis_client is None:
192+
print("❌ Failed to create Redis connection - check environment variables")
193+
return
194+
195+
db_manager = RedisDBManager(
196+
redis_client=redis_client,
197+
user_id="test_user",
198+
mem_cube_id="test_list_cube",
199+
obj=list_manager,
200+
)
201+
202+
# Save to Redis
203+
db_manager.save_to_db(list_manager)
204+
print("✅ List manager saved to Redis")
205+
206+
# Load from Redis
207+
loaded_manager = db_manager.load_from_db()
208+
if loaded_manager:
209+
print(f"Loaded list manager: {loaded_manager}")
210+
print(f"Items match: {list_manager.items == loaded_manager.items}")
211+
else:
212+
print("❌ Failed to load list manager from Redis")
213+
214+
# Clean up
215+
redis_client.delete("lockable_orm:test_user:test_list_cube:data")
216+
redis_client.delete("lockable_orm:test_user:test_list_cube:lock")
217+
redis_client.delete("lockable_orm:test_user:test_list_cube:version")
218+
redis_client.close()
219+
220+
except Exception as e:
221+
print(f"❌ Error in RedisDBManager test: {e}")
222+
223+
224+
def modify_list_process(process_id: int, items_to_add: list[str]):
225+
"""Function to be run in separate processes to modify the list using merge_items"""
226+
try:
227+
from memos.mem_scheduler.orm_modules.redis_model import RedisDBManager
228+
229+
# Create Redis connection
230+
redis_client = BaseDBManager.load_redis_engine_from_env()
231+
if redis_client is None:
232+
print(f"Process {process_id}: Failed to create Redis connection")
233+
return
234+
235+
# Create a temporary list manager for this process with items to add
236+
temp_manager = SimpleListManager()
237+
238+
db_manager = RedisDBManager(
239+
redis_client=redis_client,
240+
user_id="test_user",
241+
mem_cube_id="multiprocess_list",
242+
obj=temp_manager,
243+
)
244+
245+
print(f"Process {process_id}: Starting modification with items: {items_to_add}")
246+
for item in items_to_add:
247+
db_manager.obj.add_item(item)
248+
# Use sync_with_orm which internally uses merge_items
249+
db_manager.sync_with_orm(size_limit=None)
250+
251+
print(f"Process {process_id}: Successfully synchronized with Redis")
252+
253+
redis_client.close()
254+
255+
except Exception as e:
256+
print(f"Process {process_id}: Error - {e}")
257+
import traceback
258+
259+
traceback.print_exc()
260+
261+
262+
def test_multiprocess_synchronization():
263+
"""Test multiprocess synchronization with RedisDBManager"""
264+
print("\n" + "=" * 60)
265+
print("Testing Multiprocess Synchronization")
266+
print("=" * 60)
267+
268+
try:
269+
# Initialize Redis with empty list
270+
redis_client = BaseDBManager.load_redis_engine_from_env()
271+
if redis_client is None:
272+
print("❌ Failed to create Redis connection")
273+
return
274+
275+
# Initialize with empty list
276+
initial_manager = SimpleListManager([])
277+
db_manager = RedisDBManager(
278+
redis_client=redis_client,
279+
user_id="test_user",
280+
mem_cube_id="multiprocess_list",
281+
obj=initial_manager,
282+
)
283+
db_manager.save_to_db(initial_manager)
284+
print("✅ Initialized empty list manager in Redis")
285+
286+
# Define items for each process to add
287+
process_items = [
288+
["item1", "item2"],
289+
["item3", "item4"],
290+
["item5", "item6"],
291+
["item1", "item7"], # item1 is duplicate, should not be added twice
292+
]
293+
294+
# Create and start processes
295+
processes = []
296+
for i, items in enumerate(process_items):
297+
p = multiprocessing.Process(target=modify_list_process, args=(i + 1, items))
298+
processes.append(p)
299+
p.start()
300+
301+
# Wait for all processes to complete
302+
for p in processes:
303+
p.join()
304+
305+
print("\n" + "-" * 40)
306+
print("All processes completed. Checking final result...")
307+
308+
# Load final result
309+
final_db_manager = RedisDBManager(
310+
redis_client=redis_client,
311+
user_id="test_user",
312+
mem_cube_id="multiprocess_list",
313+
obj=SimpleListManager([]),
314+
)
315+
final_manager = final_db_manager.load_from_db()
316+
317+
if final_manager:
318+
print(f"Final synchronized list manager: {final_manager}")
319+
print(f"Final list length: {len(final_manager)}")
320+
print("Expected items: {'item1', 'item2', 'item3', 'item4', 'item5', 'item6', 'item7'}")
321+
print(f"Actual items: {set(final_manager.items)}")
322+
323+
# Check if all unique items are present
324+
expected_items = {"item1", "item2", "item3", "item4", "item5", "item6", "item7"}
325+
actual_items = set(final_manager.items)
326+
327+
if expected_items == actual_items:
328+
print("✅ All processes contributed correctly - synchronization successful!")
329+
else:
330+
print(f"❌ Expected items: {expected_items}")
331+
print(f" Actual items: {actual_items}")
332+
else:
333+
print("❌ Failed to load final result")
334+
335+
# Clean up
336+
redis_client.delete("lockable_orm:test_user:multiprocess_list:data")
337+
redis_client.delete("lockable_orm:test_user:multiprocess_list:lock")
338+
redis_client.delete("lockable_orm:test_user:multiprocess_list:version")
339+
redis_client.close()
340+
341+
except Exception as e:
342+
print(f"❌ Error in multiprocess synchronization test: {e}")
343+
344+
174345
def main():
175346
"""Main function to run all tests"""
176347
print("ORM Examples - Environment Variable Loading Tests")
@@ -188,6 +359,12 @@ def main():
188359
# Test Redis connection loading
189360
test_redis_connection_from_env()
190361

362+
# Test RedisLockableORM with list[str]
363+
test_redis_lockable_orm_with_list()
364+
365+
# Test multiprocess synchronization
366+
test_multiprocess_synchronization()
367+
191368
print("\n" + "=" * 80)
192369
print("All tests completed!")
193370
print("=" * 80)

src/memos/api/product_models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ class APISearchRequest(BaseRequest):
171171
query: str = Field(..., description="Search query")
172172
user_id: str = Field(None, description="User ID")
173173
mem_cube_id: str | None = Field(None, description="Cube ID to search in")
174-
mode: SearchMode = Field(SearchMode.FAST, description="search mode: fast, fine, or mixture")
174+
mode: SearchMode = Field(SearchMode.FINE, description="search mode: fast, fine, or mixture")
175175
internet_search: bool = Field(False, description="Whether to use internet search")
176176
moscube: bool = Field(False, description="Whether to use MemOSCube")
177177
top_k: int = Field(10, description="Number of results to return")

src/memos/api/routers/server_router.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,10 @@ def search_memories(search_req: APISearchRequest):
232232

233233
if search_mode == SearchMode.FAST:
234234
formatted_memories = fast_search_memories(search_req=search_req, user_context=user_context)
235-
elif search_mode == SearchMode.FINE or search_mode == SearchMode.MIXTURE:
235+
elif search_mode == SearchMode.FINE:
236236
formatted_memories = fine_search_memories(search_req=search_req, user_context=user_context)
237+
elif search_mode == SearchMode.MIXTURE:
238+
formatted_memories = mix_search_memories(search_req=search_req, user_context=user_context)
237239
else:
238240
logger.error(f"Unsupported search mode: {search_mode}")
239241
raise HTTPException(status_code=400, detail=f"Unsupported search mode: {search_mode}")
@@ -251,6 +253,36 @@ def search_memories(search_req: APISearchRequest):
251253
)
252254

253255

256+
def mix_search_memories(
257+
search_req: APISearchRequest,
258+
user_context: UserContext,
259+
):
260+
target_session_id = search_req.session_id
261+
if not target_session_id:
262+
target_session_id = "default_session"
263+
search_filter = {"session_id": search_req.session_id} if search_req.session_id else None
264+
265+
# Create MemCube and perform search
266+
naive_mem_cube = _create_naive_mem_cube()
267+
search_results = naive_mem_cube.text_mem.search(
268+
query=search_req.query,
269+
user_name=user_context.mem_cube_id,
270+
top_k=search_req.top_k,
271+
mode=search_req.mode,
272+
manual_close_internet=not search_req.internet_search,
273+
moscube=search_req.moscube,
274+
search_filter=search_filter,
275+
info={
276+
"user_id": search_req.user_id,
277+
"session_id": target_session_id,
278+
"chat_history": search_req.chat_history,
279+
},
280+
)
281+
formatted_memories = [_format_memory_item(data) for data in search_results]
282+
283+
return formatted_memories
284+
285+
254286
def fine_search_memories(
255287
search_req: APISearchRequest,
256288
user_context: UserContext,

src/memos/mem_scheduler/general_modules/api_misc.py

Whitespace-only changes.

0 commit comments

Comments
 (0)