Skip to content

Commit 5923001

Browse files
tangg555CaralHsifridayL
authored
revision of mixture api: add conversation turn and reduce 2 stage ranking to 1 stage (#405)
* debug an error function name * feat: Add DynamicCache compatibility for different transformers versions - Fix build_kv_cache method in hf.py to handle both old and new DynamicCache structures - Support new 'layers' attribute with key_cache/value_cache or keys/values - Maintain backward compatibility with direct key_cache/value_cache attributes - Add comprehensive error handling and logging for unsupported structures - Update move_dynamic_cache_htod function in kv.py for cross-version compatibility - Handle layers-based structure in newer transformers versions - Support alternative attribute names (keys/values vs key_cache/value_cache) - Preserve original functionality for older transformers versions - Add comprehensive tests for DynamicCache compatibility - Test activation memory update with mock DynamicCache layers - Verify layers attribute access across different transformers versions - Fix scheduler logger mock to include memory_manager attribute This resolves AttributeError issues when using different versions of the transformers library and ensures robust handling of DynamicCache objects. debug * feat: implement APIAnalyzerForScheduler for memory operations - Add APIAnalyzerForScheduler class with search/add operations - Support requests and http.client with connection reuse - Include comprehensive error handling and dynamic configuration - Add English test suite with real-world conversation scenarios * feat: Add search_ws API endpoint and enhance API analyzer functionality - Add search_ws endpoint in server_router.py for scheduler-enabled search - Fix missing imports: time module, SearchRequest class, and get_mos_product_instance function - Implement search_ws method in api_analyzer.py with HTTP client support - Add _search_ws_with_requests and _search_ws_with_http_client private methods - Include search_ws usage example in demonstration code - Enhance scheduler and dispatcher capabilities for improved memory management - Expand test coverage to ensure functionality stability This update primarily strengthens the memory scheduling system's search capabilities, providing users with more flexible API interface options. * fix: resolve test failures and warnings in test suite - Fix Pydantic serialization warning in test_memos_chen_tang_hello_world * Add warnings filter to suppress UserWarning from Pydantic serialization - Fix KeyError: 'past_key_values' in test_build_kv_cache_and_generation * Update mock configuration to properly return forward_output with past_key_values * Add DynamicCache version compatibility handling in test mocks * Support both old and new transformers versions with layers/key_cache attributes * Improve assertion logic to check all model calls for required parameters - Update base_scheduler.py to use centralized DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE constant * Add import for DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE from general_schemas * Replace hardcoded value 100 with configurable constant (1000) All tests now pass successfully with proper version compatibility handling. * feat: add a test_robustness execution to test thread pool execution * feat: optimize scheduler configuration and API search functionality - Add DEFAULT_TOP_K and DEFAULT_CONTEXT_WINDOW_SIZE global constants in general_schemas.py - Update base_scheduler.py to use global default values instead of hardcoded numbers - Fix SchedulerConfigFactory initialization issue by using keyword argument expansion - Resolve UnboundLocalError variable conflict in search_memories_ws function - Fix indentation and parameter issues in OptimizedScheduler search_for_api method - Improve code standardization and maintainability * feat: Add Redis auto-initialization with fallback strategies - Add auto_initialize_redis() with config/env/local fallback - Move Redis logic from dispatcher_monitor to redis_service - Update base_scheduler to use auto initialization - Add proper resource cleanup and error handling * feat: add database connection management to ORM module - Add MySQL engine loading from environment variables in BaseDBManager - Add Redis connection loading from environment variables in BaseDBManager - Enhance database configuration validation and error handling - Complete database adapter infrastructure for ORM module - Provide unified database connection management interface This update provides comprehensive database connection management capabilities for the mem_scheduler module, supporting dynamic MySQL and Redis configuration loading from environment variables, establishing reliable data persistence foundation for scheduling services and API services. * remove part of test * feat: add Redis-based ORM with multiprocess synchronization - Add RedisDBManager and RedisLockableORM classes - Implement atomic locking mechanism for concurrent access - Add merge functionality for different object types - Include comprehensive test suite and examples - Fix Redis key type conflicts in lock operations * fix: resolve scheduler module import and Redis integration issues * revise naive memcube creation in server router * remove long-time tests in test_scheduler * remove redis test which needs .env * refactor all codes about mixture search with scheduler * fix: resolve Redis API synchronization issues and implement search API with reranker - Fix running_entries to running_task_ids migration across codebase - Update sync_search_data method to properly handle TaskRunningStatus - Correct variable naming and logic in API synchronization flow - Implement search API endpoint with reranker functionality - Update test files to reflect new running_task_ids convention - Ensure proper Redis state management for concurrent tasks * remove a test for api module * revise to pass the test suite * address some bugs to make mix_search normally running * modify codes according to evaluation logs * feat: Optimize mixture search and enhance API client * feat: Add conversation_turn tracking for session-based memory search - Add conversation_turn field to APIMemoryHistoryEntryItem schema with default value 0 - Implement session counter in OptimizedScheduler to track turn count per session_id - Update sync_search_data method to accept and store conversation_turn parameter - Maintain session history with LRU eviction (max 5 sessions) - Rename conversation_id to session_id for consistency with request object - Enable direct access to session_id from search requests This feature allows tracking conversation turns within the same session, providing better context for memory retrieval and search history management. * adress time bug in monitor * revise simple tree * add mode to evaluation client; rewrite print to logger.info in db files --------- Co-authored-by: CaralHsi <[email protected]> Co-authored-by: chunyu li <[email protected]>
1 parent f74ea76 commit 5923001

File tree

11 files changed

+307
-205
lines changed

11 files changed

+307
-205
lines changed

evaluation/scripts/utils/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def search(self, query, user_id, top_k):
181181
"mem_cube_id": user_id,
182182
"conversation_id": "",
183183
"top_k": top_k,
184-
"mode": "fast",
184+
"mode": os.getenv("SEARCH_MODE", "fast"),
185185
"handle_pref_mem": False,
186186
},
187187
ensure_ascii=False,
@@ -232,7 +232,7 @@ def search(self, query, user_id, top_k):
232232
"query": query,
233233
"user_id": user_id,
234234
"memory_limit_number": top_k,
235-
"mode": "mixture",
235+
"mode": os.getenv("SEARCH_MODE", "fast"),
236236
}
237237
)
238238

src/memos/graph_dbs/neo4j.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1072,7 +1072,7 @@ def drop_database(self) -> None:
10721072

10731073
with self.driver.session(database=self.system_db_name) as session:
10741074
session.run(f"DROP DATABASE {self.db_name} IF EXISTS")
1075-
print(f"Database '{self.db_name}' has been dropped.")
1075+
logger.info(f"Database '{self.db_name}' has been dropped.")
10761076
else:
10771077
raise ValueError(
10781078
f"Refusing to drop protected database: {self.db_name} in "

src/memos/graph_dbs/polardb.py

Lines changed: 27 additions & 64 deletions
Large diffs are not rendered by default.

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from collections.abc import Callable
77
from datetime import datetime
88
from pathlib import Path
9+
from typing import TYPE_CHECKING
910

1011
from sqlalchemy.engine import Engine
1112

@@ -50,6 +51,10 @@
5051
from memos.templates.mem_scheduler_prompts import MEMORY_ASSEMBLY_TEMPLATE
5152

5253

54+
if TYPE_CHECKING:
55+
from memos.mem_cube.base import BaseMemCube
56+
57+
5358
logger = get_logger(__name__)
5459

5560

@@ -124,7 +129,7 @@ def __init__(self, config: BaseSchedulerConfig):
124129
self._context_lock = threading.Lock()
125130
self.current_user_id: UserID | str | None = None
126131
self.current_mem_cube_id: MemCubeID | str | None = None
127-
self.current_mem_cube: GeneralMemCube | None = None
132+
self.current_mem_cube: BaseMemCube | None = None
128133
self.auth_config_path: str | Path | None = self.config.get("auth_config_path", None)
129134
self.auth_config = None
130135
self.rabbitmq_config = None

src/memos/mem_scheduler/general_modules/api_misc.py

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,27 @@
88
APISearchHistoryManager,
99
TaskRunningStatus,
1010
)
11-
from memos.mem_scheduler.utils.db_utils import get_utc_now
1211
from memos.memories.textual.item import TextualMemoryItem
1312

1413

1514
logger = get_logger(__name__)
1615

1716

1817
class SchedulerAPIModule(BaseSchedulerModule):
19-
def __init__(self, window_size=5):
18+
def __init__(self, window_size: int | None = None, history_memory_turns: int | None = None):
2019
super().__init__()
2120
self.window_size = window_size
21+
self.history_memory_turns = history_memory_turns
2222
self.search_history_managers: dict[str, APIRedisDBManager] = {}
23-
self.pre_memory_turns = 5
2423

2524
def get_search_history_manager(self, user_id: str, mem_cube_id: str) -> APIRedisDBManager:
2625
"""Get or create a Redis manager for search history."""
26+
logger.info(
27+
f"Getting search history manager for user_id: {user_id}, mem_cube_id: {mem_cube_id}"
28+
)
2729
key = f"search_history:{user_id}:{mem_cube_id}"
2830
if key not in self.search_history_managers:
31+
logger.info(f"Creating new search history manager for key: {key}")
2932
self.search_history_managers[key] = APIRedisDBManager(
3033
user_id=user_id,
3134
mem_cube_id=mem_cube_id,
@@ -41,8 +44,12 @@ def sync_search_data(
4144
query: str,
4245
memories: list[TextualMemoryItem],
4346
formatted_memories: Any,
44-
conversation_id: str | None = None,
47+
session_id: str | None = None,
48+
conversation_turn: int = 0,
4549
) -> Any:
50+
logger.info(
51+
f"Syncing search data for item_id: {item_id}, user_id: {user_id}, mem_cube_id: {mem_cube_id}"
52+
)
4653
# Get the search history manager
4754
manager = self.get_search_history_manager(user_id, mem_cube_id)
4855
manager.sync_with_redis(size_limit=self.window_size)
@@ -59,7 +66,7 @@ def sync_search_data(
5966
query=query,
6067
formatted_memories=formatted_memories,
6168
task_status=TaskRunningStatus.COMPLETED, # Use the provided running_status
62-
conversation_id=conversation_id,
69+
session_id=session_id,
6370
memories=memories,
6471
)
6572

@@ -69,18 +76,18 @@ def sync_search_data(
6976
logger.warning(f"Failed to update entry with item_id: {item_id}")
7077
else:
7178
# Add new entry based on running_status
72-
search_entry = APIMemoryHistoryEntryItem(
79+
entry_item = APIMemoryHistoryEntryItem(
7380
item_id=item_id,
7481
query=query,
7582
formatted_memories=formatted_memories,
7683
memories=memories,
7784
task_status=TaskRunningStatus.COMPLETED,
78-
conversation_id=conversation_id,
79-
created_time=get_utc_now(),
85+
session_id=session_id,
86+
conversation_turn=conversation_turn,
8087
)
8188

8289
# Add directly to completed list as APIMemoryHistoryEntryItem instance
83-
search_history.completed_entries.append(search_entry)
90+
search_history.completed_entries.append(entry_item)
8491

8592
# Maintain window size
8693
if len(search_history.completed_entries) > search_history.window_size:
@@ -101,37 +108,22 @@ def sync_search_data(
101108
manager.sync_with_redis(size_limit=self.window_size)
102109
return manager
103110

104-
def get_pre_memories(self, user_id: str, mem_cube_id: str) -> list:
105-
"""
106-
Get pre-computed memories from the most recent completed search entry.
107-
108-
Args:
109-
user_id: User identifier
110-
mem_cube_id: Memory cube identifier
111-
112-
Returns:
113-
List of TextualMemoryItem objects from the most recent completed search
114-
"""
115-
manager = self.get_search_history_manager(user_id, mem_cube_id)
116-
117-
existing_data = manager.load_from_db()
118-
if existing_data is None:
119-
return []
120-
121-
search_history: APISearchHistoryManager = existing_data
122-
123-
# Get memories from the most recent completed entry
124-
history_memories = search_history.get_history_memories(turns=self.pre_memory_turns)
125-
return history_memories
126-
127-
def get_history_memories(self, user_id: str, mem_cube_id: str, n: int) -> list:
111+
def get_history_memories(
112+
self, user_id: str, mem_cube_id: str, turns: int | None = None
113+
) -> list:
128114
"""Get history memories for backward compatibility with tests."""
115+
logger.info(
116+
f"Getting history memories for user_id: {user_id}, mem_cube_id: {mem_cube_id}, turns: {turns}"
117+
)
129118
manager = self.get_search_history_manager(user_id, mem_cube_id)
130119
existing_data = manager.load_from_db()
131120

132121
if existing_data is None:
133122
return []
134123

124+
if turns is None:
125+
turns = self.history_memory_turns
126+
135127
# Handle different data formats
136128
if isinstance(existing_data, APISearchHistoryManager):
137129
search_history = existing_data
@@ -142,4 +134,4 @@ def get_history_memories(self, user_id: str, mem_cube_id: str, n: int) -> list:
142134
except Exception:
143135
return []
144136

145-
return search_history.get_history_memories(turns=n)
137+
return search_history.get_history_memories(turns=turns)

src/memos/mem_scheduler/monitors/general_monitor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ def __init__(
7676
] = {}
7777

7878
# Lifecycle monitor
79-
self.last_activation_mem_update_time = datetime.min
80-
self.last_query_consume_time = datetime.min
79+
self.last_activation_mem_update_time = get_utc_now()
80+
self.last_query_consume_time = get_utc_now()
8181

8282
self._register_lock = Lock()
8383
self._process_llm = process_llm

0 commit comments

Comments
 (0)