Skip to content

Commit c72858e

Browse files
committed
addressed all conflicts
2 parents 0b2b6ed + 1de72cf commit c72858e

File tree

16 files changed

+974
-851
lines changed

16 files changed

+974
-851
lines changed

evaluation/scripts/utils/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ def add(self, messages, user_id, iso_date):
311311
agent_name=self.agent_id,
312312
session_date=iso_date,
313313
)
314-
self.wait_for_completion(response.task_id)
314+
self.wait_for_completion(response.item_id)
315315
except Exception as error:
316316
print("❌ Error saving conversation:", error)
317317

src/memos/api/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,8 @@ def get_scheduler_config() -> dict[str, Any]:
334334
"thread_pool_max_workers": int(
335335
os.getenv("MOS_SCHEDULER_THREAD_POOL_MAX_WORKERS", "10")
336336
),
337-
"consume_interval_seconds": int(
338-
os.getenv("MOS_SCHEDULER_CONSUME_INTERVAL_SECONDS", "3")
337+
"consume_interval_seconds": float(
338+
os.getenv("MOS_SCHEDULER_CONSUME_INTERVAL_SECONDS", "0.01")
339339
),
340340
"enable_parallel_dispatch": os.getenv(
341341
"MOS_SCHEDULER_ENABLE_PARALLEL_DISPATCH", "true"

src/memos/api/routers/server_router.py

Lines changed: 41 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
import json
21
import os
32
import traceback
43

54
from concurrent.futures import ThreadPoolExecutor
6-
from typing import Any
5+
from typing import TYPE_CHECKING, Any
76

87
from fastapi import APIRouter, HTTPException
98

@@ -33,11 +32,8 @@
3332
from memos.mem_scheduler.orm_modules.base_model import BaseDBManager
3433
from memos.mem_scheduler.scheduler_factory import SchedulerFactory
3534
from memos.mem_scheduler.schemas.general_schemas import (
36-
API_MIX_SEARCH_LABEL,
3735
SearchMode,
3836
)
39-
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
40-
from memos.mem_scheduler.utils.db_utils import get_utc_now
4137
from memos.memories.textual.prefer_text_memory.config import (
4238
AdderConfigFactory,
4339
ExtractorConfigFactory,
@@ -54,6 +50,10 @@
5450
)
5551
from memos.reranker.factory import RerankerFactory
5652
from memos.templates.instruction_completion import instruct_completion
53+
54+
55+
if TYPE_CHECKING:
56+
from memos.mem_scheduler.optimized_scheduler import OptimizedScheduler
5757
from memos.types import MOSSearchResult, UserContext
5858
from memos.vec_dbs.factory import VecDBFactory
5959

@@ -208,36 +208,53 @@ def init_server():
208208
online_bot=False,
209209
)
210210

211+
naive_mem_cube = NaiveMemCube(
212+
llm=llm,
213+
embedder=embedder,
214+
mem_reader=mem_reader,
215+
graph_db=graph_db,
216+
reranker=reranker,
217+
internet_retriever=internet_retriever,
218+
memory_manager=memory_manager,
219+
default_cube_config=default_cube_config,
220+
vector_db=vector_db,
221+
pref_extractor=pref_extractor,
222+
pref_adder=pref_adder,
223+
pref_retriever=pref_retriever,
224+
)
225+
211226
# Initialize Scheduler
212227
scheduler_config_dict = APIConfig.get_scheduler_config()
213228
scheduler_config = SchedulerConfigFactory(
214229
backend="optimized_scheduler", config=scheduler_config_dict
215230
)
216-
mem_scheduler = SchedulerFactory.from_config(scheduler_config)
231+
mem_scheduler: OptimizedScheduler = SchedulerFactory.from_config(scheduler_config)
217232
mem_scheduler.initialize_modules(
218233
chat_llm=llm,
219234
process_llm=mem_reader.llm,
220235
db_engine=BaseDBManager.create_default_sqlite_engine(),
221236
)
237+
mem_scheduler.current_mem_cube = naive_mem_cube
222238
mem_scheduler.start()
223239

224240
# Initialize SchedulerAPIModule
225241
api_module = mem_scheduler.api_module
226242

227-
naive_mem_cube = NaiveMemCube(
228-
llm=llm,
229-
embedder=embedder,
230-
mem_reader=mem_reader,
231-
graph_db=graph_db,
232-
reranker=reranker,
233-
internet_retriever=internet_retriever,
234-
memory_manager=memory_manager,
235-
default_cube_config=default_cube_config,
236-
vector_db=vector_db,
237-
pref_extractor=pref_extractor,
238-
pref_adder=pref_adder,
239-
pref_retriever=pref_retriever,
243+
# Initialize Scheduler
244+
scheduler_config_dict = APIConfig.get_scheduler_config()
245+
scheduler_config = SchedulerConfigFactory(
246+
backend="optimized_scheduler", config=scheduler_config_dict
247+
)
248+
mem_scheduler = SchedulerFactory.from_config(scheduler_config)
249+
mem_scheduler.initialize_modules(
250+
chat_llm=llm,
251+
process_llm=mem_reader.llm,
252+
db_engine=BaseDBManager.create_default_sqlite_engine(),
240253
)
254+
mem_scheduler.start()
255+
256+
# Initialize SchedulerAPIModule
257+
api_module = mem_scheduler.api_module
241258

242259
return (
243260
graph_db,
@@ -398,96 +415,12 @@ def mix_search_memories(
398415
"""
399416
Mix search memories: fast search + async fine search
400417
"""
401-
# Get fast memories first
402-
fast_memories = fast_search_memories(search_req, user_context)
403-
404-
# Check if scheduler and dispatcher are available for async execution
405-
if mem_scheduler and hasattr(mem_scheduler, "dispatcher") and mem_scheduler.dispatcher:
406-
try:
407-
# Create message for async fine search
408-
message_content = {
409-
"search_req": {
410-
"query": search_req.query,
411-
"user_id": search_req.user_id,
412-
"session_id": search_req.session_id,
413-
"top_k": search_req.top_k,
414-
"internet_search": search_req.internet_search,
415-
"moscube": search_req.moscube,
416-
"chat_history": search_req.chat_history,
417-
},
418-
"user_context": {"mem_cube_id": user_context.mem_cube_id},
419-
}
420-
421-
message = ScheduleMessageItem(
422-
item_id=f"mix_search_{search_req.user_id}_{get_utc_now().timestamp()}",
423-
user_id=search_req.user_id,
424-
mem_cube_id=user_context.mem_cube_id,
425-
label=API_MIX_SEARCH_LABEL,
426-
mem_cube=naive_mem_cube,
427-
content=json.dumps(message_content),
428-
timestamp=get_utc_now(),
429-
)
430418

431-
# Submit async task
432-
mem_scheduler.dispatcher.submit_message(message)
433-
logger.info(f"Submitted async fine search task for user {search_req.user_id}")
434-
435-
# Try to get pre-computed fine memories if available
436-
try:
437-
pre_fine_memories = api_module.get_pre_fine_memories(
438-
user_id=search_req.user_id, mem_cube_id=user_context.mem_cube_id
439-
)
440-
if pre_fine_memories:
441-
# Merge fast and pre-computed fine memories
442-
all_memories = fast_memories + pre_fine_memories
443-
# Remove duplicates based on content
444-
seen_contents = set()
445-
unique_memories = []
446-
for memory in all_memories:
447-
content_key = memory.get("content", "")
448-
if content_key not in seen_contents:
449-
seen_contents.add(content_key)
450-
unique_memories.append(memory)
451-
return unique_memories
452-
except Exception as e:
453-
logger.warning(f"Failed to get pre-computed fine memories: {e}")
454-
455-
except Exception as e:
456-
logger.error(f"Failed to submit async fine search task: {e}")
457-
# Fall back to synchronous execution
458-
459-
# Fallback: synchronous fine search
460-
try:
461-
fine_memories = fine_search_memories(search_req, user_context)
462-
463-
# Merge fast and fine memories
464-
all_memories = fast_memories + fine_memories
465-
466-
# Remove duplicates based on content
467-
seen_contents = set()
468-
unique_memories = []
469-
for memory in all_memories:
470-
content_key = memory.get("content", "")
471-
if content_key not in seen_contents:
472-
seen_contents.add(content_key)
473-
unique_memories.append(memory)
474-
475-
# Sync search data to Redis
476-
try:
477-
api_module.sync_search_data(
478-
user_id=search_req.user_id,
479-
mem_cube_id=user_context.mem_cube_id,
480-
query=search_req.query,
481-
formatted_memories=unique_memories,
482-
)
483-
except Exception as e:
484-
logger.error(f"Failed to sync search data: {e}")
485-
486-
return unique_memories
487-
488-
except Exception as e:
489-
logger.error(f"Fine search failed: {e}")
490-
return fast_memories
419+
formatted_memories = mem_scheduler.mix_search_memories(
420+
search_req=search_req,
421+
user_context=user_context,
422+
)
423+
return formatted_memories
491424

492425

493426
def fine_search_memories(

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ def update_activation_memory_periodically(
502502
except Exception as e:
503503
logger.error(f"Error in update_activation_memory_periodically: {e}", exc_info=True)
504504

505-
async def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageItem]):
505+
def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageItem]):
506506
"""Submit messages to the message queue (either local queue or Redis)."""
507507
if isinstance(messages, ScheduleMessageItem):
508508
messages = [messages] # transform single message to list
@@ -519,7 +519,7 @@ async def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMes
519519

520520
if self.use_redis_queue:
521521
# Use Redis stream for message queue
522-
await self.redis_add_message_stream(message.to_dict())
522+
self.redis_add_message_stream(message.to_dict())
523523
logger.info(f"Submitted message to Redis: {message.label} - {message.content}")
524524
else:
525525
# Use local queue
@@ -774,34 +774,6 @@ def unregister_handlers(self, labels: list[str]) -> dict[str, bool]:
774774
return self.dispatcher.unregister_handlers(labels)
775775

776776
def get_running_tasks(self, filter_func: Callable | None = None) -> dict[str, dict]:
777-
"""
778-
Get currently running tasks, optionally filtered by a custom function.
779-
780-
This method delegates to the dispatcher's get_running_tasks method.
781-
782-
Args:
783-
filter_func: Optional function to filter tasks. Should accept a RunningTaskItem
784-
and return True if the task should be included in results.
785-
786-
Returns:
787-
dict[str, dict]: Dictionary mapping task IDs to task information dictionaries.
788-
Each task dict contains: item_id, user_id, mem_cube_id, task_info,
789-
task_name, start_time, end_time, status, result, error_message, messages
790-
791-
Examples:
792-
# Get all running tasks
793-
all_tasks = scheduler.get_running_tasks()
794-
795-
# Get tasks for specific user
796-
user_tasks = scheduler.get_running_tasks(
797-
filter_func=lambda task: task.user_id == "user123"
798-
)
799-
800-
# Get tasks with specific status
801-
active_tasks = scheduler.get_running_tasks(
802-
filter_func=lambda task: task.status == "running"
803-
)
804-
"""
805777
if not self.dispatcher:
806778
logger.warning("Dispatcher is not initialized, returning empty tasks dict")
807779
return {}

0 commit comments

Comments
 (0)