Skip to content

Commit 0cb1b8a

Browse files
glin93CaralHsi
andauthored
Fix/async add logging (#554)
* feat: Add consistent logging for async memory addition * fix: log mem_reader failures with task status * chore: format scheduler logging files --------- Co-authored-by: [email protected] <> Co-authored-by: CaralHsi <[email protected]>
1 parent 09f219f commit 0cb1b8a

File tree

2 files changed

+122
-31
lines changed

2 files changed

+122
-31
lines changed

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,8 @@ def process_message(message: ScheduleMessageItem):
504504
text_mem=text_mem,
505505
user_name=user_name,
506506
custom_tags=info.get("custom_tags", None),
507+
task_id=message.task_id,
508+
info=info,
507509
)
508510

509511
logger.info(
@@ -529,6 +531,8 @@ def _process_memories_with_reader(
529531
text_mem: TreeTextMemory,
530532
user_name: str,
531533
custom_tags: list[str] | None = None,
534+
task_id: str | None = None,
535+
info: dict | None = None,
532536
) -> None:
533537
"""
534538
Process memories using mem_reader for enhanced memory processing.
@@ -540,6 +544,7 @@ def _process_memories_with_reader(
540544
text_mem: Text memory instance
541545
custom_tags: Optional list of custom tags for memory processing
542546
"""
547+
kb_log_content: list[dict] = []
543548
try:
544549
# Get the mem_reader from the parent MOSCore
545550
if not hasattr(self, "mem_reader") or self.mem_reader is None:
@@ -602,6 +607,86 @@ def _process_memories_with_reader(
602607
logger.info(
603608
f"Added {len(enhanced_mem_ids)} enhanced memories: {enhanced_mem_ids}"
604609
)
610+
611+
# LOGGING BLOCK START
612+
# This block is replicated from _add_message_consumer to ensure consistent logging
613+
is_cloud_env = (
614+
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
615+
)
616+
if is_cloud_env:
617+
# New: Knowledge Base Logging (Cloud Service)
618+
kb_log_content = []
619+
for item in flattened_memories:
620+
kb_log_content.append(
621+
{
622+
"log_source": "KNOWLEDGE_BASE_LOG",
623+
"trigger_source": info.get("trigger_source", "Messages")
624+
if info
625+
else "Messages",
626+
"operation": "ADD",
627+
"memory_id": item.id,
628+
"content": item.memory,
629+
"original_content": None,
630+
"source_doc_id": getattr(item.metadata, "source_doc_id", None),
631+
}
632+
)
633+
if kb_log_content:
634+
event = self.create_event_log(
635+
label="knowledgeBaseUpdate",
636+
log_content=f"Knowledge Base Memory Update: {len(kb_log_content)} changes.",
637+
user_id=user_id,
638+
mem_cube_id=mem_cube_id,
639+
mem_cube=self.current_mem_cube,
640+
memcube_log_content=kb_log_content,
641+
metadata=None,
642+
memory_len=len(kb_log_content),
643+
memcube_name=self._map_memcube_name(mem_cube_id),
644+
)
645+
event.task_id = task_id
646+
self._submit_web_logs([event])
647+
else:
648+
# Existing: Playground/Default Logging
649+
add_content_legacy: list[dict] = []
650+
add_meta_legacy: list[dict] = []
651+
for item_id, item in zip(
652+
enhanced_mem_ids, flattened_memories, strict=False
653+
):
654+
key = getattr(item.metadata, "key", None) or transform_name_to_key(
655+
name=item.memory
656+
)
657+
add_content_legacy.append(
658+
{"content": f"{key}: {item.memory}", "ref_id": item_id}
659+
)
660+
add_meta_legacy.append(
661+
{
662+
"ref_id": item_id,
663+
"id": item_id,
664+
"key": item.metadata.key,
665+
"memory": item.memory,
666+
"memory_type": item.metadata.memory_type,
667+
"status": item.metadata.status,
668+
"confidence": item.metadata.confidence,
669+
"tags": item.metadata.tags,
670+
"updated_at": getattr(item.metadata, "updated_at", None)
671+
or getattr(item.metadata, "update_at", None),
672+
}
673+
)
674+
if add_content_legacy:
675+
event = self.create_event_log(
676+
label="addMemory",
677+
from_memory_type=USER_INPUT_TYPE,
678+
to_memory_type=LONG_TERM_MEMORY_TYPE,
679+
user_id=user_id,
680+
mem_cube_id=mem_cube_id,
681+
mem_cube=self.current_mem_cube,
682+
memcube_log_content=add_content_legacy,
683+
metadata=add_meta_legacy,
684+
memory_len=len(add_content_legacy),
685+
memcube_name=self._map_memcube_name(mem_cube_id),
686+
)
687+
event.task_id = task_id
688+
self._submit_web_logs([event])
689+
# LOGGING BLOCK END
605690
else:
606691
logger.info("No enhanced memories generated by mem_reader")
607692
else:
@@ -630,10 +715,45 @@ def _process_memories_with_reader(
630715
logger.info("Remove and Refresh Memories")
631716
logger.debug(f"Finished add {user_id} memory: {mem_ids}")
632717

633-
except Exception:
718+
except Exception as exc:
634719
logger.error(
635720
f"Error in _process_memories_with_reader: {traceback.format_exc()}", exc_info=True
636721
)
722+
with contextlib.suppress(Exception):
723+
is_cloud_env = (
724+
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
725+
)
726+
if is_cloud_env:
727+
if not kb_log_content:
728+
trigger_source = (
729+
info.get("trigger_source", "Messages") if info else "Messages"
730+
)
731+
kb_log_content = [
732+
{
733+
"log_source": "KNOWLEDGE_BASE_LOG",
734+
"trigger_source": trigger_source,
735+
"operation": "ADD",
736+
"memory_id": mem_id,
737+
"content": None,
738+
"original_content": None,
739+
"source_doc_id": None,
740+
}
741+
for mem_id in mem_ids
742+
]
743+
event = self.create_event_log(
744+
label="knowledgeBaseUpdate",
745+
log_content=f"Knowledge Base Memory Update failed: {exc!s}",
746+
user_id=user_id,
747+
mem_cube_id=mem_cube_id,
748+
mem_cube=self.current_mem_cube,
749+
memcube_log_content=kb_log_content,
750+
metadata=None,
751+
memory_len=len(kb_log_content),
752+
memcube_name=self._map_memcube_name(mem_cube_id),
753+
)
754+
event.task_id = task_id
755+
event.status = "failed"
756+
self._submit_web_logs([event])
637757

638758
def _mem_reorganize_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
639759
logger.info(f"Messages {messages} assigned to {MEM_ORGANIZE_LABEL} handler.")

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import concurrent
2-
import os
32
import threading
43
import time
54

@@ -15,7 +14,7 @@
1514
from memos.mem_scheduler.schemas.general_schemas import (
1615
DEFAULT_STOP_WAIT,
1716
)
18-
from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem, ScheduleMessageItem
17+
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
1918
from memos.mem_scheduler.schemas.task_schemas import RunningTaskItem
2019
from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube
2120
from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker
@@ -159,20 +158,6 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
159158
)
160159
self.metrics.task_completed(user_id=m.user_id, task_type=m.label)
161160

162-
is_cloud_env = (
163-
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
164-
)
165-
if self.submit_web_logs and is_cloud_env:
166-
status_log = ScheduleLogForWebItem(
167-
user_id=task_item.user_id,
168-
mem_cube_id=task_item.mem_cube_id,
169-
item_id=task_item.item_id,
170-
label=m.label,
171-
log_content=f"Task {task_item.item_id} completed successfully for user {task_item.user_id}.",
172-
status="completed",
173-
)
174-
self.submit_web_logs([status_log])
175-
176161
# acknowledge redis messages
177162
if self.use_redis_queue and self.memos_message_queue is not None:
178163
for msg in messages:
@@ -211,20 +196,6 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
211196
self._completed_tasks.pop(0)
212197
logger.error(f"Task failed: {task_item.get_execution_info()}, Error: {e}")
213198

214-
is_cloud_env = (
215-
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
216-
)
217-
if self.submit_web_logs and is_cloud_env:
218-
status_log = ScheduleLogForWebItem(
219-
user_id=task_item.user_id,
220-
mem_cube_id=task_item.mem_cube_id,
221-
item_id=task_item.item_id,
222-
label=m.label,
223-
log_content=f"Task {task_item.item_id} failed for user {task_item.user_id} with error: {e!s}.",
224-
status="failed",
225-
exception=str(e),
226-
)
227-
self.submit_web_logs([status_log])
228199
raise
229200

230201
return wrapped_handler

0 commit comments

Comments
 (0)