Skip to content

Commit 3680286

Browse files
tangg555CaralHsi
andauthored
feat: redis for sync history memories and new api of mixture search (#398)
* debug an error function name * feat: Add DynamicCache compatibility for different transformers versions - Fix build_kv_cache method in hf.py to handle both old and new DynamicCache structures - Support new 'layers' attribute with key_cache/value_cache or keys/values - Maintain backward compatibility with direct key_cache/value_cache attributes - Add comprehensive error handling and logging for unsupported structures - Update move_dynamic_cache_htod function in kv.py for cross-version compatibility - Handle layers-based structure in newer transformers versions - Support alternative attribute names (keys/values vs key_cache/value_cache) - Preserve original functionality for older transformers versions - Add comprehensive tests for DynamicCache compatibility - Test activation memory update with mock DynamicCache layers - Verify layers attribute access across different transformers versions - Fix scheduler logger mock to include memory_manager attribute This resolves AttributeError issues when using different versions of the transformers library and ensures robust handling of DynamicCache objects. debug * feat: implement APIAnalyzerForScheduler for memory operations - Add APIAnalyzerForScheduler class with search/add operations - Support requests and http.client with connection reuse - Include comprehensive error handling and dynamic configuration - Add English test suite with real-world conversation scenarios * feat: Add search_ws API endpoint and enhance API analyzer functionality - Add search_ws endpoint in server_router.py for scheduler-enabled search - Fix missing imports: time module, SearchRequest class, and get_mos_product_instance function - Implement search_ws method in api_analyzer.py with HTTP client support - Add _search_ws_with_requests and _search_ws_with_http_client private methods - Include search_ws usage example in demonstration code - Enhance scheduler and dispatcher capabilities for improved memory management - Expand test coverage to ensure functionality stability This update primarily strengthens the memory scheduling system's search capabilities, providing users with more flexible API interface options. * fix: resolve test failures and warnings in test suite - Fix Pydantic serialization warning in test_memos_chen_tang_hello_world * Add warnings filter to suppress UserWarning from Pydantic serialization - Fix KeyError: 'past_key_values' in test_build_kv_cache_and_generation * Update mock configuration to properly return forward_output with past_key_values * Add DynamicCache version compatibility handling in test mocks * Support both old and new transformers versions with layers/key_cache attributes * Improve assertion logic to check all model calls for required parameters - Update base_scheduler.py to use centralized DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE constant * Add import for DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE from general_schemas * Replace hardcoded value 100 with configurable constant (1000) All tests now pass successfully with proper version compatibility handling. * feat: add a test_robustness execution to test thread pool execution * feat: optimize scheduler configuration and API search functionality - Add DEFAULT_TOP_K and DEFAULT_CONTEXT_WINDOW_SIZE global constants in general_schemas.py - Update base_scheduler.py to use global default values instead of hardcoded numbers - Fix SchedulerConfigFactory initialization issue by using keyword argument expansion - Resolve UnboundLocalError variable conflict in search_memories_ws function - Fix indentation and parameter issues in OptimizedScheduler search_for_api method - Improve code standardization and maintainability * feat: Add Redis auto-initialization with fallback strategies - Add auto_initialize_redis() with config/env/local fallback - Move Redis logic from dispatcher_monitor to redis_service - Update base_scheduler to use auto initialization - Add proper resource cleanup and error handling * feat: add database connection management to ORM module - Add MySQL engine loading from environment variables in BaseDBManager - Add Redis connection loading from environment variables in BaseDBManager - Enhance database configuration validation and error handling - Complete database adapter infrastructure for ORM module - Provide unified database connection management interface This update provides comprehensive database connection management capabilities for the mem_scheduler module, supporting dynamic MySQL and Redis configuration loading from environment variables, establishing reliable data persistence foundation for scheduling services and API services. * remove part of test * feat: add Redis-based ORM with multiprocess synchronization - Add RedisDBManager and RedisLockableORM classes - Implement atomic locking mechanism for concurrent access - Add merge functionality for different object types - Include comprehensive test suite and examples - Fix Redis key type conflicts in lock operations * fix: resolve scheduler module import and Redis integration issues * revise naive memcube creation in server router * remove long-time tests in test_scheduler * remove redis test which needs .env * refactor all codes about mixture search with scheduler * fix: resolve Redis API synchronization issues and implement search API with reranker - Fix running_entries to running_task_ids migration across codebase - Update sync_search_data method to properly handle TaskRunningStatus - Correct variable naming and logic in API synchronization flow - Implement search API endpoint with reranker functionality - Update test files to reflect new running_task_ids convention - Ensure proper Redis state management for concurrent tasks * remove a test for api module * revise to pass the test suite * address some bugs to make mix_search normally running * modify codes according to evaluation logs --------- Co-authored-by: CaralHsi <[email protected]>
1 parent 018d759 commit 3680286

File tree

21 files changed

+1407
-1047
lines changed

21 files changed

+1407
-1047
lines changed

evaluation/scripts/utils/client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ def search(self, query, user_id, top_k):
183183
"mem_cube_id": user_id,
184184
"conversation_id": "",
185185
"top_k": top_k,
186+
"mode": "mixture",
186187
},
187188
ensure_ascii=False,
188189
)
@@ -230,6 +231,7 @@ def search(self, query, user_id, top_k):
230231
"query": query,
231232
"user_id": user_id,
232233
"memory_limit_number": top_k,
234+
"mode": "mixture",
233235
}
234236
)
235237

@@ -311,7 +313,7 @@ def add(self, messages, user_id, iso_date):
311313
agent_name=self.agent_id,
312314
session_date=iso_date,
313315
)
314-
self.wait_for_completion(response.task_id)
316+
self.wait_for_completion(response.item_id)
315317
except Exception as error:
316318
print("❌ Error saving conversation:", error)
317319

src/memos/api/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,8 +361,8 @@ def get_scheduler_config() -> dict[str, Any]:
361361
"thread_pool_max_workers": int(
362362
os.getenv("MOS_SCHEDULER_THREAD_POOL_MAX_WORKERS", "10")
363363
),
364-
"consume_interval_seconds": int(
365-
os.getenv("MOS_SCHEDULER_CONSUME_INTERVAL_SECONDS", "3")
364+
"consume_interval_seconds": float(
365+
os.getenv("MOS_SCHEDULER_CONSUME_INTERVAL_SECONDS", "0.01")
366366
),
367367
"enable_parallel_dispatch": os.getenv(
368368
"MOS_SCHEDULER_ENABLE_PARALLEL_DISPATCH", "true"

src/memos/api/routers/server_router.py

Lines changed: 27 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
import json
21
import os
32
import traceback
43

54
from concurrent.futures import ThreadPoolExecutor
6-
from typing import Any
5+
from typing import TYPE_CHECKING, Any
76

87
from fastapi import APIRouter, HTTPException
98

@@ -33,11 +32,8 @@
3332
from memos.mem_scheduler.orm_modules.base_model import BaseDBManager
3433
from memos.mem_scheduler.scheduler_factory import SchedulerFactory
3534
from memos.mem_scheduler.schemas.general_schemas import (
36-
API_MIX_SEARCH_LABEL,
3735
SearchMode,
3836
)
39-
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
40-
from memos.mem_scheduler.utils.db_utils import get_utc_now
4137
from memos.memories.textual.prefer_text_memory.config import (
4238
AdderConfigFactory,
4339
ExtractorConfigFactory,
@@ -54,6 +50,10 @@
5450
)
5551
from memos.reranker.factory import RerankerFactory
5652
from memos.templates.instruction_completion import instruct_completion
53+
54+
55+
if TYPE_CHECKING:
56+
from memos.mem_scheduler.optimized_scheduler import OptimizedScheduler
5757
from memos.types import MOSSearchResult, UserContext
5858
from memos.vec_dbs.factory import VecDBFactory
5959

@@ -154,7 +154,6 @@ def init_server():
154154

155155
# Build component configurations
156156
graph_db_config = _build_graph_db_config()
157-
print(graph_db_config)
158157
llm_config = _build_llm_config()
159158
embedder_config = _build_embedder_config()
160159
mem_reader_config = _build_mem_reader_config()
@@ -209,22 +208,6 @@ def init_server():
209208
online_bot=False,
210209
)
211210

212-
# Initialize Scheduler
213-
scheduler_config_dict = APIConfig.get_scheduler_config()
214-
scheduler_config = SchedulerConfigFactory(
215-
backend="optimized_scheduler", config=scheduler_config_dict
216-
)
217-
mem_scheduler = SchedulerFactory.from_config(scheduler_config)
218-
mem_scheduler.initialize_modules(
219-
chat_llm=llm,
220-
process_llm=mem_reader.llm,
221-
db_engine=BaseDBManager.create_default_sqlite_engine(),
222-
)
223-
mem_scheduler.start()
224-
225-
# Initialize SchedulerAPIModule
226-
api_module = mem_scheduler.api_module
227-
228211
naive_mem_cube = NaiveMemCube(
229212
llm=llm,
230213
embedder=embedder,
@@ -240,6 +223,23 @@ def init_server():
240223
pref_retriever=pref_retriever,
241224
)
242225

226+
# Initialize Scheduler
227+
scheduler_config_dict = APIConfig.get_scheduler_config()
228+
scheduler_config = SchedulerConfigFactory(
229+
backend="optimized_scheduler", config=scheduler_config_dict
230+
)
231+
mem_scheduler: OptimizedScheduler = SchedulerFactory.from_config(scheduler_config)
232+
mem_scheduler.initialize_modules(
233+
chat_llm=llm,
234+
process_llm=mem_reader.llm,
235+
db_engine=BaseDBManager.create_default_sqlite_engine(),
236+
)
237+
mem_scheduler.current_mem_cube = naive_mem_cube
238+
mem_scheduler.start()
239+
240+
# Initialize SchedulerAPIModule
241+
api_module = mem_scheduler.api_module
242+
243243
return (
244244
graph_db,
245245
mem_reader,
@@ -400,96 +400,12 @@ def mix_search_memories(
400400
"""
401401
Mix search memories: fast search + async fine search
402402
"""
403-
# Get fast memories first
404-
fast_memories = fast_search_memories(search_req, user_context)
405-
406-
# Check if scheduler and dispatcher are available for async execution
407-
if mem_scheduler and hasattr(mem_scheduler, "dispatcher") and mem_scheduler.dispatcher:
408-
try:
409-
# Create message for async fine search
410-
message_content = {
411-
"search_req": {
412-
"query": search_req.query,
413-
"user_id": search_req.user_id,
414-
"session_id": search_req.session_id,
415-
"top_k": search_req.top_k,
416-
"internet_search": search_req.internet_search,
417-
"moscube": search_req.moscube,
418-
"chat_history": search_req.chat_history,
419-
},
420-
"user_context": {"mem_cube_id": user_context.mem_cube_id},
421-
}
422-
423-
message = ScheduleMessageItem(
424-
item_id=f"mix_search_{search_req.user_id}_{get_utc_now().timestamp()}",
425-
user_id=search_req.user_id,
426-
mem_cube_id=user_context.mem_cube_id,
427-
label=API_MIX_SEARCH_LABEL,
428-
mem_cube=naive_mem_cube,
429-
content=json.dumps(message_content),
430-
timestamp=get_utc_now(),
431-
)
432-
433-
# Submit async task
434-
mem_scheduler.dispatcher.submit_message(message)
435-
logger.info(f"Submitted async fine search task for user {search_req.user_id}")
436-
437-
# Try to get pre-computed fine memories if available
438-
try:
439-
pre_fine_memories = api_module.get_pre_fine_memories(
440-
user_id=search_req.user_id, mem_cube_id=user_context.mem_cube_id
441-
)
442-
if pre_fine_memories:
443-
# Merge fast and pre-computed fine memories
444-
all_memories = fast_memories + pre_fine_memories
445-
# Remove duplicates based on content
446-
seen_contents = set()
447-
unique_memories = []
448-
for memory in all_memories:
449-
content_key = memory.get("content", "")
450-
if content_key not in seen_contents:
451-
seen_contents.add(content_key)
452-
unique_memories.append(memory)
453-
return unique_memories
454-
except Exception as e:
455-
logger.warning(f"Failed to get pre-computed fine memories: {e}")
456-
457-
except Exception as e:
458-
logger.error(f"Failed to submit async fine search task: {e}")
459-
# Fall back to synchronous execution
460-
461-
# Fallback: synchronous fine search
462-
try:
463-
fine_memories = fine_search_memories(search_req, user_context)
464-
465-
# Merge fast and fine memories
466-
all_memories = fast_memories + fine_memories
467-
468-
# Remove duplicates based on content
469-
seen_contents = set()
470-
unique_memories = []
471-
for memory in all_memories:
472-
content_key = memory.get("content", "")
473-
if content_key not in seen_contents:
474-
seen_contents.add(content_key)
475-
unique_memories.append(memory)
476-
477-
# Sync search data to Redis
478-
try:
479-
api_module.sync_search_data(
480-
user_id=search_req.user_id,
481-
mem_cube_id=user_context.mem_cube_id,
482-
query=search_req.query,
483-
formatted_memories=unique_memories,
484-
)
485-
except Exception as e:
486-
logger.error(f"Failed to sync search data: {e}")
487-
488-
return unique_memories
489403

490-
except Exception as e:
491-
logger.error(f"Fine search failed: {e}")
492-
return fast_memories
404+
formatted_memories = mem_scheduler.mix_search_memories(
405+
search_req=search_req,
406+
user_context=user_context,
407+
)
408+
return formatted_memories
493409

494410

495411
def fine_search_memories(

src/memos/configs/mem_scheduler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
DEFAULT_CONSUME_INTERVAL_SECONDS,
1616
DEFAULT_CONTEXT_WINDOW_SIZE,
1717
DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE,
18+
DEFAULT_MULTI_TASK_RUNNING_TIMEOUT,
1819
DEFAULT_THREAD_POOL_MAX_WORKERS,
1920
DEFAULT_TOP_K,
2021
DEFAULT_USE_REDIS_QUEUE,
@@ -59,6 +60,10 @@ class BaseSchedulerConfig(BaseConfig):
5960
default=DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE,
6061
description="Maximum size of internal message queue when not using Redis",
6162
)
63+
multi_task_running_timeout: int = Field(
64+
default=DEFAULT_MULTI_TASK_RUNNING_TIMEOUT,
65+
description="Default timeout for multi-task running operations in seconds",
66+
)
6267

6368

6469
class GeneralSchedulerConfig(BaseSchedulerConfig):

0 commit comments

Comments
 (0)