Skip to content

Commit ae89138

Browse files
committed
fix:fix scheduler logs and
1 parent b219f9b commit ae89138

File tree

3 files changed

+94
-81
lines changed

3 files changed

+94
-81
lines changed

src/memos/mem_os/product.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@
2323
remove_embedding_recursive,
2424
sort_children_by_memory_type,
2525
)
26-
from memos.mem_scheduler.schemas import ANSWER_LABEL, QUERY_LABEL, ScheduleMessageItem
26+
from memos.mem_scheduler.schemas.general_schemas import (
27+
ANSWER_LABEL,
28+
QUERY_LABEL,
29+
)
30+
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
2731
from memos.mem_user.persistent_user_manager import PersistentUserManager
2832
from memos.mem_user.user_manager import UserRole
2933
from memos.memories.textual.item import (
@@ -975,8 +979,6 @@ def search(
975979
self, query: str, user_id: str, install_cube_ids: list[str] | None = None, top_k: int = 10
976980
):
977981
"""Search memories for a specific user."""
978-
# Validate user access
979-
self._validate_user_access(user_id)
980982

981983
# Load user cubes if not already loaded
982984
self._load_user_cubes(user_id, self.default_cube_config)

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 51 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,10 @@ def replace_working_memory(
194194
text_mem_base: TreeTextMemory = text_mem_base
195195

196196
# process rerank memories with llm
197-
quey_history = self.monitor.query_monitors.get_queries_with_timesort()
197+
query_history = self.monitor.query_monitors.get_queries_with_timesort()
198198
memories_with_new_order, rerank_success_flag = (
199199
self.retriever.process_and_rerank_memories(
200-
queries=quey_history,
200+
queries=query_history,
201201
original_memory=original_memory,
202202
new_memory=new_memory,
203203
top_k=self.top_k,
@@ -350,54 +350,63 @@ def update_activation_memory_periodically(
350350
):
351351
new_activation_memories = []
352352

353-
if self.monitor.timed_trigger(
354-
last_time=self.monitor.last_activation_mem_update_time,
355-
interval_seconds=interval_seconds,
356-
):
357-
logger.info(f"Updating activation memory for user {user_id} and mem_cube {mem_cube_id}")
353+
try:
354+
if self.monitor.timed_trigger(
355+
last_time=self.monitor.last_activation_mem_update_time,
356+
interval_seconds=interval_seconds,
357+
):
358+
logger.info(
359+
f"Updating activation memory for user {user_id} and mem_cube {mem_cube_id}"
360+
)
361+
362+
if (
363+
user_id not in self.monitor.working_memory_monitors
364+
or mem_cube_id not in self.monitor.working_memory_monitors[user_id]
365+
or len(self.monitor.working_memory_monitors[user_id][mem_cube_id].memories) == 0
366+
):
367+
logger.warning(
368+
"No memories found in working_memory_monitors, initializing from current working_memories"
369+
)
370+
self.initialize_working_memory_monitors(
371+
user_id=user_id,
372+
mem_cube_id=mem_cube_id,
373+
mem_cube=mem_cube,
374+
)
358375

359-
if len(self.monitor.working_memory_monitors[user_id][mem_cube_id].memories) == 0:
360-
logger.warning(
361-
"No memories found in working_memory_monitors, initializing from current working_memories"
376+
self.monitor.update_activation_memory_monitors(
377+
user_id=user_id, mem_cube_id=mem_cube_id, mem_cube=mem_cube
362378
)
363-
self.initialize_working_memory_monitors(
379+
380+
new_activation_memories = [
381+
m.memory_text
382+
for m in self.monitor.activation_memory_monitors[user_id][mem_cube_id].memories
383+
]
384+
385+
logger.info(
386+
f"Collected {len(new_activation_memories)} new memory entries for processing"
387+
)
388+
389+
self.update_activation_memory(
390+
new_memories=new_activation_memories,
391+
label=label,
364392
user_id=user_id,
365393
mem_cube_id=mem_cube_id,
366394
mem_cube=mem_cube,
367395
)
368396

369-
self.monitor.update_activation_memory_monitors(
370-
user_id=user_id, mem_cube_id=mem_cube_id, mem_cube=mem_cube
371-
)
372-
373-
new_activation_memories = [
374-
m.memory_text
375-
for m in self.monitor.activation_memory_monitors[user_id][mem_cube_id].memories
376-
]
377-
378-
logger.info(
379-
f"Collected {len(new_activation_memories)} new memory entries for processing"
380-
)
381-
382-
self.update_activation_memory(
383-
new_memories=new_activation_memories,
384-
label=label,
385-
user_id=user_id,
386-
mem_cube_id=mem_cube_id,
387-
mem_cube=mem_cube,
388-
)
389-
390-
self.monitor.last_activation_mem_update_time = datetime.now()
397+
self.monitor.last_activation_mem_update_time = datetime.now()
391398

392-
logger.debug(
393-
f"Activation memory update completed at {self.monitor.last_activation_mem_update_time}"
394-
)
395-
else:
396-
logger.info(
397-
f"Skipping update - {interval_seconds} second interval not yet reached. "
398-
f"Last update time is {self.monitor.last_activation_mem_update_time} and now is"
399-
f"{datetime.now()}"
400-
)
399+
logger.debug(
400+
f"Activation memory update completed at {self.monitor.last_activation_mem_update_time}"
401+
)
402+
else:
403+
logger.info(
404+
f"Skipping update - {interval_seconds} second interval not yet reached. "
405+
f"Last update time is {self.monitor.last_activation_mem_update_time} and now is"
406+
f"{datetime.now()}"
407+
)
408+
except Exception as e:
409+
logger.error(f"Error: {e}", exc_info=True)
401410

402411
def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageItem]):
403412
"""Submit multiple messages to the message queue."""

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -145,43 +145,45 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
145145
grouped_messages = self.dispatcher.group_messages_by_user_and_cube(messages=messages)
146146

147147
self.validate_schedule_messages(messages=messages, label=ADD_LABEL)
148-
149-
for user_id in grouped_messages:
150-
for mem_cube_id in grouped_messages[user_id]:
151-
messages = grouped_messages[user_id][mem_cube_id]
152-
if len(messages) == 0:
153-
return
154-
155-
# for status update
156-
self._set_current_context_from_message(msg=messages[0])
157-
158-
# submit logs
159-
for msg in messages:
160-
userinput_memory_ids = json.loads(msg.content)
161-
mem_cube = msg.mem_cube
162-
for memory_id in userinput_memory_ids:
163-
mem_item: TextualMemoryItem = mem_cube.text_mem.get(memory_id=memory_id)
164-
mem_type = mem_item.meta_data.memory_type
165-
mem_content = mem_item.memory
166-
167-
self.log_adding_memory(
168-
memory=mem_content,
169-
memory_type=mem_type,
170-
user_id=msg.user_id,
171-
mem_cube_id=msg.mem_cube_id,
172-
mem_cube=msg.mem_cube,
173-
log_func_callback=self._submit_web_logs,
148+
try:
149+
for user_id in grouped_messages:
150+
for mem_cube_id in grouped_messages[user_id]:
151+
messages = grouped_messages[user_id][mem_cube_id]
152+
if len(messages) == 0:
153+
return
154+
155+
# for status update
156+
self._set_current_context_from_message(msg=messages[0])
157+
158+
# submit logs
159+
for msg in messages:
160+
userinput_memory_ids = json.loads(msg.content)
161+
mem_cube = msg.mem_cube
162+
for memory_id in userinput_memory_ids:
163+
mem_item: TextualMemoryItem = mem_cube.text_mem.get(memory_id=memory_id)
164+
mem_type = mem_item.metadata.memory_type
165+
mem_content = mem_item.memory
166+
167+
self.log_adding_memory(
168+
memory=mem_content,
169+
memory_type=mem_type,
170+
user_id=msg.user_id,
171+
mem_cube_id=msg.mem_cube_id,
172+
mem_cube=msg.mem_cube,
173+
log_func_callback=self._submit_web_logs,
174+
)
175+
176+
# update activation memories
177+
if self.enable_act_memory_update:
178+
self.update_activation_memory_periodically(
179+
interval_seconds=self.monitor.act_mem_update_interval,
180+
label=ADD_LABEL,
181+
user_id=user_id,
182+
mem_cube_id=mem_cube_id,
183+
mem_cube=messages[0].mem_cube,
174184
)
175-
176-
# update activation memories
177-
if self.enable_act_memory_update:
178-
self.update_activation_memory_periodically(
179-
interval_seconds=self.monitor.act_mem_update_interval,
180-
label=ADD_LABEL,
181-
user_id=user_id,
182-
mem_cube_id=mem_cube_id,
183-
mem_cube=messages[0].mem_cube,
184-
)
185+
except Exception as e:
186+
logger.error(f"Error: {e}", exc_info=True)
185187

186188
def process_session_turn(
187189
self,

0 commit comments

Comments
 (0)