Skip to content

Commit a2a6f5e

Browse files
fridayLtangg555
andauthored
feat: add try catch for mem scheduler (#144)
* feat: update config * fix:dim * change dim * fix:change default db * fix:delay * fix:len * fix:change recently mem size * fix:dup node error * fix: remove mock_data * fix: change config * feat: reorganize code * add: add json parse for en * fix:change user_id * fix: logger info * fix: remove unsed change * feat: add topk for api * feat: add logger * fix:fix scheduler logs and --------- Co-authored-by: Travis Tang <[email protected]>
1 parent a355cdd commit a2a6f5e

File tree

6 files changed

+106
-143
lines changed

6 files changed

+106
-143
lines changed

src/memos/api/product_models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ class SearchRequest(BaseRequest):
150150
user_id: str = Field(..., description="User ID")
151151
query: str = Field(..., description="Search query")
152152
mem_cube_id: str | None = Field(None, description="Cube ID to search in")
153+
top_k: int = Field(10, description="Number of results to return")
153154

154155

155156
class SuggestionRequest(BaseRequest):

src/memos/api/routers/product_router.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ async def search_memories(search_req: SearchRequest):
194194
query=search_req.query,
195195
user_id=search_req.user_id,
196196
install_cube_ids=[search_req.mem_cube_id] if search_req.mem_cube_id else None,
197+
top_k=search_req.top_k,
197198
)
198199
return SearchResponse(message="Search completed successfully", data=result)
199200

src/memos/mem_os/core.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,9 @@ def add(
631631
for mem in memories:
632632
mem_id_list: list[str] = self.mem_cubes[mem_cube_id].text_mem.add(mem)
633633
mem_ids.extend(mem_id_list)
634+
logger.info(
635+
f"Added memory user {target_user_id} to memcube {mem_cube_id}: {mem_id_list}"
636+
)
634637

635638
# submit messages for scheduler
636639
if self.enable_mem_scheduler and self.mem_scheduler is not None:
@@ -671,6 +674,9 @@ def add(
671674
mem_ids = []
672675
for mem in memories:
673676
mem_id_list: list[str] = self.mem_cubes[mem_cube_id].text_mem.add(mem)
677+
logger.info(
678+
f"Added memory user {target_user_id} to memcube {mem_cube_id}: {mem_id_list}"
679+
)
674680
mem_ids.extend(mem_id_list)
675681

676682
# submit messages for scheduler

src/memos/mem_os/product.py

Lines changed: 9 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@
2323
remove_embedding_recursive,
2424
sort_children_by_memory_type,
2525
)
26-
from memos.mem_scheduler.schemas import ANSWER_LABEL, QUERY_LABEL, ScheduleMessageItem
26+
from memos.mem_scheduler.schemas.general_schemas import (
27+
ANSWER_LABEL,
28+
QUERY_LABEL,
29+
)
30+
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
2731
from memos.mem_user.persistent_user_manager import PersistentUserManager
2832
from memos.mem_user.user_manager import UserRole
2933
from memos.memories.textual.item import (
@@ -601,7 +605,7 @@ def user_register(
601605
try:
602606
default_mem_cube.dump(mem_cube_name_or_path)
603607
except Exception as e:
604-
print(e)
608+
logger.error(f"Failed to dump default cube: {e}")
605609

606610
# Register the default cube with MOS
607611
self.register_mem_cube(
@@ -679,57 +683,6 @@ def get_suggestion_query(self, user_id: str, language: str = "zh") -> list[str]:
679683
response_json = json.loads(clean_response)
680684
return response_json["query"]
681685

682-
def chat(
683-
self,
684-
query: str,
685-
user_id: str,
686-
cube_id: str | None = None,
687-
history: MessageList | None = None,
688-
) -> Generator[str, None, None]:
689-
"""Chat with LLM SSE Type.
690-
Args:
691-
query (str): Query string.
692-
user_id (str): User ID.
693-
cube_id (str, optional): Custom cube ID for user.
694-
history (list[dict], optional): Chat history.
695-
696-
Returns:
697-
Generator[str, None, None]: The response string generator.
698-
"""
699-
# Use MOSCore's built-in validation
700-
if cube_id:
701-
self._validate_cube_access(user_id, cube_id)
702-
else:
703-
self._validate_user_exists(user_id)
704-
705-
# Load user cubes if not already loaded
706-
self._load_user_cubes(user_id, self.default_cube_config)
707-
time_start = time.time()
708-
memories_list = super().search(query, user_id)["text_mem"]
709-
# Get response from parent MOSCore (returns string, not generator)
710-
response = super().chat(query, user_id)
711-
time_end = time.time()
712-
713-
# Use tiktoken for proper token-based chunking
714-
for chunk in self._chunk_response_with_tiktoken(response, chunk_size=5):
715-
chunk_data = f"data: {json.dumps({'type': 'text', 'content': chunk})}\n\n"
716-
yield chunk_data
717-
718-
# Prepare reference data
719-
reference = []
720-
for memories in memories_list:
721-
memories_json = memories.model_dump()
722-
memories_json["metadata"]["ref_id"] = f"[{memories.id.split('-')[0]}]"
723-
memories_json["metadata"]["embedding"] = []
724-
memories_json["metadata"]["sources"] = []
725-
reference.append(memories_json)
726-
727-
yield f"data: {json.dumps({'type': 'reference', 'content': reference})}\n\n"
728-
total_time = round(float(time_end - time_start), 1)
729-
730-
yield f"data: {json.dumps({'type': 'time', 'content': {'total_time': total_time, 'speed_improvement': '23%'}})}\n\n"
731-
yield f"data: {json.dumps({'type': 'end'})}\n\n"
732-
733686
def chat_with_references(
734687
self,
735688
query: str,
@@ -768,6 +721,8 @@ def chat_with_references(
768721
self._register_chat_history(user_id)
769722

770723
chat_history = self.chat_history_manager[user_id]
724+
if history:
725+
chat_history.chat_history = history[-10:]
771726
current_messages = [
772727
{"role": "system", "content": system_prompt},
773728
*chat_history.chat_history,
@@ -853,15 +808,12 @@ def chat_with_references(
853808
yield f"data: {json.dumps({'type': 'reference', 'data': reference})}\n\n"
854809
total_time = round(float(time_end - time_start), 1)
855810
yield f"data: {json.dumps({'type': 'time', 'data': {'total_time': total_time, 'speed_improvement': '23%'}})}\n\n"
856-
chat_history.chat_history.append({"role": "user", "content": query})
857-
chat_history.chat_history.append({"role": "assistant", "content": full_response})
858811
self._send_message_to_scheduler(
859812
user_id=user_id, mem_cube_id=cube_id, query=query, label=QUERY_LABEL
860813
)
861814
self._send_message_to_scheduler(
862815
user_id=user_id, mem_cube_id=cube_id, query=full_response, label=ANSWER_LABEL
863816
)
864-
self.chat_history_manager[user_id] = chat_history
865817

866818
yield f"data: {json.dumps({'type': 'end'})}\n\n"
867819
self.add(
@@ -880,12 +832,6 @@ def chat_with_references(
880832
],
881833
mem_cube_id=cube_id,
882834
)
883-
# Keep chat history under 30 messages by removing oldest conversation pair
884-
if len(self.chat_history_manager[user_id].chat_history) > 10:
885-
self.chat_history_manager[user_id].chat_history.pop(0) # Remove oldest user message
886-
self.chat_history_manager[user_id].chat_history.pop(
887-
0
888-
) # Remove oldest assistant response
889835

890836
def get_all(
891837
self,
@@ -1030,11 +976,9 @@ def get_subgraph(
1030976
return reformat_memory_list
1031977

1032978
def search(
1033-
self, query: str, user_id: str, install_cube_ids: list[str] | None = None, top_k: int = 20
979+
self, query: str, user_id: str, install_cube_ids: list[str] | None = None, top_k: int = 10
1034980
):
1035981
"""Search memories for a specific user."""
1036-
# Validate user access
1037-
self._validate_user_access(user_id)
1038982

1039983
# Load user cubes if not already loaded
1040984
self._load_user_cubes(user_id, self.default_cube_config)

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 51 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,10 @@ def replace_working_memory(
194194
text_mem_base: TreeTextMemory = text_mem_base
195195

196196
# process rerank memories with llm
197-
quey_history = self.monitor.query_monitors.get_queries_with_timesort()
197+
query_history = self.monitor.query_monitors.get_queries_with_timesort()
198198
memories_with_new_order, rerank_success_flag = (
199199
self.retriever.process_and_rerank_memories(
200-
queries=quey_history,
200+
queries=query_history,
201201
original_memory=original_memory,
202202
new_memory=new_memory,
203203
top_k=self.top_k,
@@ -350,54 +350,63 @@ def update_activation_memory_periodically(
350350
):
351351
new_activation_memories = []
352352

353-
if self.monitor.timed_trigger(
354-
last_time=self.monitor.last_activation_mem_update_time,
355-
interval_seconds=interval_seconds,
356-
):
357-
logger.info(f"Updating activation memory for user {user_id} and mem_cube {mem_cube_id}")
353+
try:
354+
if self.monitor.timed_trigger(
355+
last_time=self.monitor.last_activation_mem_update_time,
356+
interval_seconds=interval_seconds,
357+
):
358+
logger.info(
359+
f"Updating activation memory for user {user_id} and mem_cube {mem_cube_id}"
360+
)
361+
362+
if (
363+
user_id not in self.monitor.working_memory_monitors
364+
or mem_cube_id not in self.monitor.working_memory_monitors[user_id]
365+
or len(self.monitor.working_memory_monitors[user_id][mem_cube_id].memories) == 0
366+
):
367+
logger.warning(
368+
"No memories found in working_memory_monitors, initializing from current working_memories"
369+
)
370+
self.initialize_working_memory_monitors(
371+
user_id=user_id,
372+
mem_cube_id=mem_cube_id,
373+
mem_cube=mem_cube,
374+
)
358375

359-
if len(self.monitor.working_memory_monitors[user_id][mem_cube_id].memories) == 0:
360-
logger.warning(
361-
"No memories found in working_memory_monitors, initializing from current working_memories"
376+
self.monitor.update_activation_memory_monitors(
377+
user_id=user_id, mem_cube_id=mem_cube_id, mem_cube=mem_cube
362378
)
363-
self.initialize_working_memory_monitors(
379+
380+
new_activation_memories = [
381+
m.memory_text
382+
for m in self.monitor.activation_memory_monitors[user_id][mem_cube_id].memories
383+
]
384+
385+
logger.info(
386+
f"Collected {len(new_activation_memories)} new memory entries for processing"
387+
)
388+
389+
self.update_activation_memory(
390+
new_memories=new_activation_memories,
391+
label=label,
364392
user_id=user_id,
365393
mem_cube_id=mem_cube_id,
366394
mem_cube=mem_cube,
367395
)
368396

369-
self.monitor.update_activation_memory_monitors(
370-
user_id=user_id, mem_cube_id=mem_cube_id, mem_cube=mem_cube
371-
)
372-
373-
new_activation_memories = [
374-
m.memory_text
375-
for m in self.monitor.activation_memory_monitors[user_id][mem_cube_id].memories
376-
]
377-
378-
logger.info(
379-
f"Collected {len(new_activation_memories)} new memory entries for processing"
380-
)
381-
382-
self.update_activation_memory(
383-
new_memories=new_activation_memories,
384-
label=label,
385-
user_id=user_id,
386-
mem_cube_id=mem_cube_id,
387-
mem_cube=mem_cube,
388-
)
389-
390-
self.monitor.last_activation_mem_update_time = datetime.now()
397+
self.monitor.last_activation_mem_update_time = datetime.now()
391398

392-
logger.debug(
393-
f"Activation memory update completed at {self.monitor.last_activation_mem_update_time}"
394-
)
395-
else:
396-
logger.info(
397-
f"Skipping update - {interval_seconds} second interval not yet reached. "
398-
f"Last update time is {self.monitor.last_activation_mem_update_time} and now is"
399-
f"{datetime.now()}"
400-
)
399+
logger.debug(
400+
f"Activation memory update completed at {self.monitor.last_activation_mem_update_time}"
401+
)
402+
else:
403+
logger.info(
404+
f"Skipping update - {interval_seconds} second interval not yet reached. "
405+
f"Last update time is {self.monitor.last_activation_mem_update_time} and now is"
406+
f"{datetime.now()}"
407+
)
408+
except Exception as e:
409+
logger.error(f"Error: {e}", exc_info=True)
401410

402411
def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageItem]):
403412
"""Submit multiple messages to the message queue."""

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -145,43 +145,45 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
145145
grouped_messages = self.dispatcher.group_messages_by_user_and_cube(messages=messages)
146146

147147
self.validate_schedule_messages(messages=messages, label=ADD_LABEL)
148-
149-
for user_id in grouped_messages:
150-
for mem_cube_id in grouped_messages[user_id]:
151-
messages = grouped_messages[user_id][mem_cube_id]
152-
if len(messages) == 0:
153-
return
154-
155-
# for status update
156-
self._set_current_context_from_message(msg=messages[0])
157-
158-
# submit logs
159-
for msg in messages:
160-
userinput_memory_ids = json.loads(msg.content)
161-
mem_cube = msg.mem_cube
162-
for memory_id in userinput_memory_ids:
163-
mem_item: TextualMemoryItem = mem_cube.text_mem.get(memory_id=memory_id)
164-
mem_type = mem_item.meta_data.memory_type
165-
mem_content = mem_item.memory
166-
167-
self.log_adding_memory(
168-
memory=mem_content,
169-
memory_type=mem_type,
170-
user_id=msg.user_id,
171-
mem_cube_id=msg.mem_cube_id,
172-
mem_cube=msg.mem_cube,
173-
log_func_callback=self._submit_web_logs,
148+
try:
149+
for user_id in grouped_messages:
150+
for mem_cube_id in grouped_messages[user_id]:
151+
messages = grouped_messages[user_id][mem_cube_id]
152+
if len(messages) == 0:
153+
return
154+
155+
# for status update
156+
self._set_current_context_from_message(msg=messages[0])
157+
158+
# submit logs
159+
for msg in messages:
160+
userinput_memory_ids = json.loads(msg.content)
161+
mem_cube = msg.mem_cube
162+
for memory_id in userinput_memory_ids:
163+
mem_item: TextualMemoryItem = mem_cube.text_mem.get(memory_id=memory_id)
164+
mem_type = mem_item.metadata.memory_type
165+
mem_content = mem_item.memory
166+
167+
self.log_adding_memory(
168+
memory=mem_content,
169+
memory_type=mem_type,
170+
user_id=msg.user_id,
171+
mem_cube_id=msg.mem_cube_id,
172+
mem_cube=msg.mem_cube,
173+
log_func_callback=self._submit_web_logs,
174+
)
175+
176+
# update activation memories
177+
if self.enable_act_memory_update:
178+
self.update_activation_memory_periodically(
179+
interval_seconds=self.monitor.act_mem_update_interval,
180+
label=ADD_LABEL,
181+
user_id=user_id,
182+
mem_cube_id=mem_cube_id,
183+
mem_cube=messages[0].mem_cube,
174184
)
175-
176-
# update activation memories
177-
if self.enable_act_memory_update:
178-
self.update_activation_memory_periodically(
179-
interval_seconds=self.monitor.act_mem_update_interval,
180-
label=ADD_LABEL,
181-
user_id=user_id,
182-
mem_cube_id=mem_cube_id,
183-
mem_cube=messages[0].mem_cube,
184-
)
185+
except Exception as e:
186+
logger.error(f"Error: {e}", exc_info=True)
185187

186188
def process_session_turn(
187189
self,

0 commit comments

Comments
 (0)