Skip to content

Commit 651e8df

Browse files
authored
Meger update about scheduler and new api to Dev (#386)
* debug an error function name * feat: Add DynamicCache compatibility for different transformers versions - Fix build_kv_cache method in hf.py to handle both old and new DynamicCache structures - Support new 'layers' attribute with key_cache/value_cache or keys/values - Maintain backward compatibility with direct key_cache/value_cache attributes - Add comprehensive error handling and logging for unsupported structures - Update move_dynamic_cache_htod function in kv.py for cross-version compatibility - Handle layers-based structure in newer transformers versions - Support alternative attribute names (keys/values vs key_cache/value_cache) - Preserve original functionality for older transformers versions - Add comprehensive tests for DynamicCache compatibility - Test activation memory update with mock DynamicCache layers - Verify layers attribute access across different transformers versions - Fix scheduler logger mock to include memory_manager attribute This resolves AttributeError issues when using different versions of the transformers library and ensures robust handling of DynamicCache objects. debug * feat: implement APIAnalyzerForScheduler for memory operations - Add APIAnalyzerForScheduler class with search/add operations - Support requests and http.client with connection reuse - Include comprehensive error handling and dynamic configuration - Add English test suite with real-world conversation scenarios * feat: Add search_ws API endpoint and enhance API analyzer functionality - Add search_ws endpoint in server_router.py for scheduler-enabled search - Fix missing imports: time module, SearchRequest class, and get_mos_product_instance function - Implement search_ws method in api_analyzer.py with HTTP client support - Add _search_ws_with_requests and _search_ws_with_http_client private methods - Include search_ws usage example in demonstration code - Enhance scheduler and dispatcher capabilities for improved memory management - Expand test coverage to ensure functionality stability This update primarily strengthens the memory scheduling system's search capabilities, providing users with more flexible API interface options. * fix: resolve test failures and warnings in test suite - Fix Pydantic serialization warning in test_memos_chen_tang_hello_world * Add warnings filter to suppress UserWarning from Pydantic serialization - Fix KeyError: 'past_key_values' in test_build_kv_cache_and_generation * Update mock configuration to properly return forward_output with past_key_values * Add DynamicCache version compatibility handling in test mocks * Support both old and new transformers versions with layers/key_cache attributes * Improve assertion logic to check all model calls for required parameters - Update base_scheduler.py to use centralized DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE constant * Add import for DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE from general_schemas * Replace hardcoded value 100 with configurable constant (1000) All tests now pass successfully with proper version compatibility handling. * feat: add a test_robustness execution to test thread pool execution * feat: optimize scheduler configuration and API search functionality - Add DEFAULT_TOP_K and DEFAULT_CONTEXT_WINDOW_SIZE global constants in general_schemas.py - Update base_scheduler.py to use global default values instead of hardcoded numbers - Fix SchedulerConfigFactory initialization issue by using keyword argument expansion - Resolve UnboundLocalError variable conflict in search_memories_ws function - Fix indentation and parameter issues in OptimizedScheduler search_for_api method - Improve code standardization and maintainability * feat: Add Redis auto-initialization with fallback strategies - Add auto_initialize_redis() with config/env/local fallback - Move Redis logic from dispatcher_monitor to redis_service - Update base_scheduler to use auto initialization - Add proper resource cleanup and error handling * feat: add database connection management to ORM module - Add MySQL engine loading from environment variables in BaseDBManager - Add Redis connection loading from environment variables in BaseDBManager - Enhance database configuration validation and error handling - Complete database adapter infrastructure for ORM module - Provide unified database connection management interface This update provides comprehensive database connection management capabilities for the mem_scheduler module, supporting dynamic MySQL and Redis configuration loading from environment variables, establishing reliable data persistence foundation for scheduling services and API services. * remove part of test * feat: add Redis-based ORM with multiprocess synchronization - Add RedisDBManager and RedisLockableORM classes - Implement atomic locking mechanism for concurrent access - Add merge functionality for different object types - Include comprehensive test suite and examples - Fix Redis key type conflicts in lock operations * fix: resolve scheduler module import and Redis integration issues * revise naive memcube creation in server router * remove long-time tests in test_scheduler * remove redis test which needs .env
1 parent 6efe419 commit 651e8df

File tree

30 files changed

+3799
-318
lines changed

30 files changed

+3799
-318
lines changed
Lines changed: 374 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,374 @@
1+
#!/usr/bin/env python3
2+
"""
3+
ORM Examples for MemScheduler
4+
5+
This script demonstrates how to use the BaseDBManager's new environment variable loading methods
6+
for MySQL and Redis connections.
7+
"""
8+
9+
import multiprocessing
10+
import os
11+
import sys
12+
13+
from pathlib import Path
14+
15+
16+
# Add the src directory to the Python path
17+
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
18+
19+
from memos.log import get_logger
20+
from memos.mem_scheduler.orm_modules.base_model import BaseDBManager, DatabaseError
21+
from memos.mem_scheduler.orm_modules.redis_model import RedisDBManager, SimpleListManager
22+
23+
24+
logger = get_logger(__name__)
25+
26+
27+
def test_mysql_engine_from_env():
28+
"""Test loading MySQL engine from environment variables"""
29+
print("\n" + "=" * 60)
30+
print("Testing MySQL Engine from Environment Variables")
31+
print("=" * 60)
32+
33+
try:
34+
# Test loading MySQL engine from current environment variables
35+
mysql_engine = BaseDBManager.load_mysql_engine_from_env()
36+
if mysql_engine is None:
37+
print("❌ Failed to create MySQL engine - check environment variables")
38+
return
39+
40+
print(f"✅ Successfully created MySQL engine: {mysql_engine}")
41+
print(f" Engine URL: {mysql_engine.url}")
42+
43+
# Test connection
44+
with mysql_engine.connect() as conn:
45+
from sqlalchemy import text
46+
47+
result = conn.execute(text("SELECT 'MySQL connection test successful' as message"))
48+
message = result.fetchone()[0]
49+
print(f" Connection test: {message}")
50+
51+
mysql_engine.dispose()
52+
print(" MySQL engine disposed successfully")
53+
54+
except DatabaseError as e:
55+
print(f"❌ DatabaseError: {e}")
56+
except Exception as e:
57+
print(f"❌ Unexpected error: {e}")
58+
59+
60+
def test_redis_connection_from_env():
61+
"""Test loading Redis connection from environment variables"""
62+
print("\n" + "=" * 60)
63+
print("Testing Redis Connection from Environment Variables")
64+
print("=" * 60)
65+
66+
try:
67+
# Test loading Redis connection from current environment variables
68+
redis_client = BaseDBManager.load_redis_engine_from_env()
69+
if redis_client is None:
70+
print("❌ Failed to create Redis connection - check environment variables")
71+
return
72+
73+
print(f"✅ Successfully created Redis connection: {redis_client}")
74+
75+
# Test basic Redis operations
76+
redis_client.set("test_key", "Hello from ORM Examples!")
77+
value = redis_client.get("test_key")
78+
print(f" Redis test - Set/Get: {value}")
79+
80+
# Test Redis info
81+
info = redis_client.info("server")
82+
redis_version = info.get("redis_version", "unknown")
83+
print(f" Redis server version: {redis_version}")
84+
85+
# Clean up test key
86+
redis_client.delete("test_key")
87+
print(" Test key cleaned up")
88+
89+
redis_client.close()
90+
print(" Redis connection closed successfully")
91+
92+
except DatabaseError as e:
93+
print(f"❌ DatabaseError: {e}")
94+
except Exception as e:
95+
print(f"❌ Unexpected error: {e}")
96+
97+
98+
def test_environment_variables():
99+
"""Test and display current environment variables"""
100+
print("\n" + "=" * 60)
101+
print("Current Environment Variables")
102+
print("=" * 60)
103+
104+
# MySQL environment variables
105+
mysql_vars = [
106+
"MYSQL_HOST",
107+
"MYSQL_PORT",
108+
"MYSQL_USERNAME",
109+
"MYSQL_PASSWORD",
110+
"MYSQL_DATABASE",
111+
"MYSQL_CHARSET",
112+
]
113+
114+
print("\nMySQL Environment Variables:")
115+
for var in mysql_vars:
116+
value = os.getenv(var, "Not set")
117+
# Mask password for security
118+
if "PASSWORD" in var and value != "Not set":
119+
value = "*" * len(value)
120+
print(f" {var}: {value}")
121+
122+
# Redis environment variables
123+
redis_vars = [
124+
"REDIS_HOST",
125+
"REDIS_PORT",
126+
"REDIS_DB",
127+
"REDIS_PASSWORD",
128+
"MEMSCHEDULER_REDIS_HOST",
129+
"MEMSCHEDULER_REDIS_PORT",
130+
"MEMSCHEDULER_REDIS_DB",
131+
"MEMSCHEDULER_REDIS_PASSWORD",
132+
]
133+
134+
print("\nRedis Environment Variables:")
135+
for var in redis_vars:
136+
value = os.getenv(var, "Not set")
137+
# Mask password for security
138+
if "PASSWORD" in var and value != "Not set":
139+
value = "*" * len(value)
140+
print(f" {var}: {value}")
141+
142+
143+
def test_manual_env_loading():
144+
"""Test loading environment variables manually from .env file"""
145+
print("\n" + "=" * 60)
146+
print("Testing Manual Environment Loading")
147+
print("=" * 60)
148+
149+
env_file_path = "/Users/travistang/Documents/codes/memos/.env"
150+
151+
if not os.path.exists(env_file_path):
152+
print(f"❌ Environment file not found: {env_file_path}")
153+
return
154+
155+
try:
156+
from dotenv import load_dotenv
157+
158+
# Load environment variables
159+
load_dotenv(env_file_path)
160+
print(f"✅ Successfully loaded environment variables from {env_file_path}")
161+
162+
# Test some key variables
163+
test_vars = ["OPENAI_API_KEY", "MOS_CHAT_MODEL", "TZ"]
164+
for var in test_vars:
165+
value = os.getenv(var, "Not set")
166+
if "KEY" in var and value != "Not set":
167+
value = f"{value[:10]}..." if len(value) > 10 else value
168+
print(f" {var}: {value}")
169+
170+
except ImportError:
171+
print("❌ python-dotenv not installed. Install with: pip install python-dotenv")
172+
except Exception as e:
173+
print(f"❌ Error loading environment file: {e}")
174+
175+
176+
def test_redis_lockable_orm_with_list():
177+
"""Test RedisDBManager with list[str] type synchronization"""
178+
print("\n" + "=" * 60)
179+
print("Testing RedisDBManager with list[str]")
180+
print("=" * 60)
181+
182+
try:
183+
from memos.mem_scheduler.orm_modules.redis_model import RedisDBManager
184+
185+
# Create a simple list manager instance
186+
list_manager = SimpleListManager(["apple", "banana", "cherry"])
187+
print(f"Original list manager: {list_manager}")
188+
189+
# Create RedisDBManager instance
190+
redis_client = BaseDBManager.load_redis_engine_from_env()
191+
if redis_client is None:
192+
print("❌ Failed to create Redis connection - check environment variables")
193+
return
194+
195+
db_manager = RedisDBManager(
196+
redis_client=redis_client,
197+
user_id="test_user",
198+
mem_cube_id="test_list_cube",
199+
obj=list_manager,
200+
)
201+
202+
# Save to Redis
203+
db_manager.save_to_db(list_manager)
204+
print("✅ List manager saved to Redis")
205+
206+
# Load from Redis
207+
loaded_manager = db_manager.load_from_db()
208+
if loaded_manager:
209+
print(f"Loaded list manager: {loaded_manager}")
210+
print(f"Items match: {list_manager.items == loaded_manager.items}")
211+
else:
212+
print("❌ Failed to load list manager from Redis")
213+
214+
# Clean up
215+
redis_client.delete("lockable_orm:test_user:test_list_cube:data")
216+
redis_client.delete("lockable_orm:test_user:test_list_cube:lock")
217+
redis_client.delete("lockable_orm:test_user:test_list_cube:version")
218+
redis_client.close()
219+
220+
except Exception as e:
221+
print(f"❌ Error in RedisDBManager test: {e}")
222+
223+
224+
def modify_list_process(process_id: int, items_to_add: list[str]):
225+
"""Function to be run in separate processes to modify the list using merge_items"""
226+
try:
227+
from memos.mem_scheduler.orm_modules.redis_model import RedisDBManager
228+
229+
# Create Redis connection
230+
redis_client = BaseDBManager.load_redis_engine_from_env()
231+
if redis_client is None:
232+
print(f"Process {process_id}: Failed to create Redis connection")
233+
return
234+
235+
# Create a temporary list manager for this process with items to add
236+
temp_manager = SimpleListManager()
237+
238+
db_manager = RedisDBManager(
239+
redis_client=redis_client,
240+
user_id="test_user",
241+
mem_cube_id="multiprocess_list",
242+
obj=temp_manager,
243+
)
244+
245+
print(f"Process {process_id}: Starting modification with items: {items_to_add}")
246+
for item in items_to_add:
247+
db_manager.obj.add_item(item)
248+
# Use sync_with_orm which internally uses merge_items
249+
db_manager.sync_with_orm(size_limit=None)
250+
251+
print(f"Process {process_id}: Successfully synchronized with Redis")
252+
253+
redis_client.close()
254+
255+
except Exception as e:
256+
print(f"Process {process_id}: Error - {e}")
257+
import traceback
258+
259+
traceback.print_exc()
260+
261+
262+
def test_multiprocess_synchronization():
263+
"""Test multiprocess synchronization with RedisDBManager"""
264+
print("\n" + "=" * 60)
265+
print("Testing Multiprocess Synchronization")
266+
print("=" * 60)
267+
268+
try:
269+
# Initialize Redis with empty list
270+
redis_client = BaseDBManager.load_redis_engine_from_env()
271+
if redis_client is None:
272+
print("❌ Failed to create Redis connection")
273+
return
274+
275+
# Initialize with empty list
276+
initial_manager = SimpleListManager([])
277+
db_manager = RedisDBManager(
278+
redis_client=redis_client,
279+
user_id="test_user",
280+
mem_cube_id="multiprocess_list",
281+
obj=initial_manager,
282+
)
283+
db_manager.save_to_db(initial_manager)
284+
print("✅ Initialized empty list manager in Redis")
285+
286+
# Define items for each process to add
287+
process_items = [
288+
["item1", "item2"],
289+
["item3", "item4"],
290+
["item5", "item6"],
291+
["item1", "item7"], # item1 is duplicate, should not be added twice
292+
]
293+
294+
# Create and start processes
295+
processes = []
296+
for i, items in enumerate(process_items):
297+
p = multiprocessing.Process(target=modify_list_process, args=(i + 1, items))
298+
processes.append(p)
299+
p.start()
300+
301+
# Wait for all processes to complete
302+
for p in processes:
303+
p.join()
304+
305+
print("\n" + "-" * 40)
306+
print("All processes completed. Checking final result...")
307+
308+
# Load final result
309+
final_db_manager = RedisDBManager(
310+
redis_client=redis_client,
311+
user_id="test_user",
312+
mem_cube_id="multiprocess_list",
313+
obj=SimpleListManager([]),
314+
)
315+
final_manager = final_db_manager.load_from_db()
316+
317+
if final_manager:
318+
print(f"Final synchronized list manager: {final_manager}")
319+
print(f"Final list length: {len(final_manager)}")
320+
print("Expected items: {'item1', 'item2', 'item3', 'item4', 'item5', 'item6', 'item7'}")
321+
print(f"Actual items: {set(final_manager.items)}")
322+
323+
# Check if all unique items are present
324+
expected_items = {"item1", "item2", "item3", "item4", "item5", "item6", "item7"}
325+
actual_items = set(final_manager.items)
326+
327+
if expected_items == actual_items:
328+
print("✅ All processes contributed correctly - synchronization successful!")
329+
else:
330+
print(f"❌ Expected items: {expected_items}")
331+
print(f" Actual items: {actual_items}")
332+
else:
333+
print("❌ Failed to load final result")
334+
335+
# Clean up
336+
redis_client.delete("lockable_orm:test_user:multiprocess_list:data")
337+
redis_client.delete("lockable_orm:test_user:multiprocess_list:lock")
338+
redis_client.delete("lockable_orm:test_user:multiprocess_list:version")
339+
redis_client.close()
340+
341+
except Exception as e:
342+
print(f"❌ Error in multiprocess synchronization test: {e}")
343+
344+
345+
def main():
346+
"""Main function to run all tests"""
347+
print("ORM Examples - Environment Variable Loading Tests")
348+
print("=" * 80)
349+
350+
# Test environment variables display
351+
test_environment_variables()
352+
353+
# Test manual environment loading
354+
test_manual_env_loading()
355+
356+
# Test MySQL engine loading
357+
test_mysql_engine_from_env()
358+
359+
# Test Redis connection loading
360+
test_redis_connection_from_env()
361+
362+
# Test RedisLockableORM with list[str]
363+
test_redis_lockable_orm_with_list()
364+
365+
# Test multiprocess synchronization
366+
test_multiprocess_synchronization()
367+
368+
print("\n" + "=" * 80)
369+
print("All tests completed!")
370+
print("=" * 80)
371+
372+
373+
if __name__ == "__main__":
374+
main()

src/memos/api/product_models.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from pydantic import BaseModel, Field
66

77
# Import message types from core types module
8+
from memos.mem_scheduler.schemas.general_schemas import SearchMode
89
from memos.types import MessageDict, PermissionDict
910

1011

@@ -170,7 +171,7 @@ class APISearchRequest(BaseRequest):
170171
query: str = Field(..., description="Search query")
171172
user_id: str = Field(None, description="User ID")
172173
mem_cube_id: str | None = Field(None, description="Cube ID to search in")
173-
mode: str = Field("fast", description="search mode fast or fine")
174+
mode: SearchMode = Field(SearchMode.FINE, description="search mode: fast, fine, or mixture")
174175
internet_search: bool = Field(False, description="Whether to use internet search")
175176
moscube: bool = Field(False, description="Whether to use MemOSCube")
176177
top_k: int = Field(10, description="Number of results to return")

0 commit comments

Comments
 (0)