Skip to content

Commit 159c47e

Browse files
authored
Hotfix/cloud log handler (#598)
* Add cloud add-log handler fallback for schedulers * Implement cloud add log handler for optimized scheduler * Refine cloud add log handler output * Format general_scheduler with ruff * Add stack_info to scheduler logging and format --------- Co-authored-by: [email protected] <>
1 parent 4dd7f76 commit 159c47e

File tree

1 file changed

+117
-15
lines changed

1 file changed

+117
-15
lines changed

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 117 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,63 @@ def send_add_log_messages_to_local_env(
464464
if events:
465465
self._submit_web_logs(events, additional_log_info="send_add_log_messages_to_cloud_env")
466466

467+
def send_add_log_messages_to_cloud_env(
468+
self, msg: ScheduleMessageItem, prepared_add_items, prepared_update_items_with_original
469+
):
470+
"""
471+
Cloud logging path for add/update events.
472+
"""
473+
kb_log_content: list[dict] = []
474+
info = msg.info or {}
475+
# Process added items
476+
for item in prepared_add_items:
477+
kb_log_content.append(
478+
{
479+
"log_source": "KNOWLEDGE_BASE_LOG",
480+
"trigger_source": info.get("trigger_source", "Messages"),
481+
"operation": "ADD",
482+
"memory_id": item.id,
483+
"content": item.memory,
484+
"original_content": None,
485+
"source_doc_id": getattr(item.metadata, "source_doc_id", None),
486+
}
487+
)
488+
489+
# Process updated items
490+
for item_data in prepared_update_items_with_original:
491+
item = item_data["new_item"]
492+
kb_log_content.append(
493+
{
494+
"log_source": "KNOWLEDGE_BASE_LOG",
495+
"trigger_source": info.get("trigger_source", "Messages"),
496+
"operation": "UPDATE",
497+
"memory_id": item.id,
498+
"content": item.memory,
499+
"original_content": item_data.get("original_content"),
500+
"source_doc_id": getattr(item.metadata, "source_doc_id", None),
501+
}
502+
)
503+
504+
if kb_log_content:
505+
logger.info(
506+
f"[DIAGNOSTIC] general_scheduler.send_add_log_messages_to_cloud_env: Creating event log for KB update. Label: knowledgeBaseUpdate, user_id: {msg.user_id}, mem_cube_id: {msg.mem_cube_id}, task_id: {msg.task_id}. KB content: {json.dumps(kb_log_content, indent=2)}"
507+
)
508+
event = self.create_event_log(
509+
label="knowledgeBaseUpdate",
510+
from_memory_type=USER_INPUT_TYPE,
511+
to_memory_type=LONG_TERM_MEMORY_TYPE,
512+
user_id=msg.user_id,
513+
mem_cube_id=msg.mem_cube_id,
514+
mem_cube=self.current_mem_cube,
515+
memcube_log_content=kb_log_content,
516+
metadata=None,
517+
memory_len=len(kb_log_content),
518+
memcube_name=self._map_memcube_name(msg.mem_cube_id),
519+
)
520+
event.log_content = f"Knowledge Base Memory Update: {len(kb_log_content)} changes."
521+
event.task_id = msg.task_id
522+
self._submit_web_logs([event])
523+
467524
def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
468525
logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.")
469526
# Process the query in a session turn
@@ -502,28 +559,40 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
502559

503560
def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
504561
try:
562+
if not messages:
563+
return
505564
message = messages[0]
506565
mem_cube = self.current_mem_cube
507566

508567
user_id = message.user_id
509568
mem_cube_id = message.mem_cube_id
510569
content = message.content
511570

512-
feedback_data = json.loads(content)
571+
try:
572+
feedback_data = json.loads(content) if isinstance(content, str) else content
573+
if not isinstance(feedback_data, dict):
574+
logger.error(
575+
f"Failed to decode feedback_data or it is not a dict: {feedback_data}"
576+
)
577+
return
578+
except json.JSONDecodeError:
579+
logger.error(f"Invalid JSON content for feedback message: {content}", exc_info=True)
580+
return
513581

582+
task_id = feedback_data.get("task_id") or message.task_id
514583
feedback_result = self.feedback_server.process_feedback(
515584
user_id=user_id,
516585
user_name=mem_cube_id,
517-
session_id=feedback_data["session_id"],
518-
chat_history=feedback_data["history"],
519-
retrieved_memory_ids=feedback_data["retrieved_memory_ids"],
520-
feedback_content=feedback_data["feedback_content"],
521-
feedback_time=feedback_data["feedback_time"],
522-
task_id=feedback_data["task_id"],
586+
session_id=feedback_data.get("session_id"),
587+
chat_history=feedback_data.get("history", []),
588+
retrieved_memory_ids=feedback_data.get("retrieved_memory_ids", []),
589+
feedback_content=feedback_data.get("feedback_content"),
590+
feedback_time=feedback_data.get("feedback_time"),
591+
task_id=task_id,
523592
)
524593

525594
logger.info(
526-
f"Successfully feedback memories for user_id={user_id}, mem_cube_id={mem_cube_id}"
595+
f"Successfully processed feedback for user_id={user_id}, mem_cube_id={mem_cube_id}"
527596
)
528597

529598
should_send_log = (
@@ -533,13 +602,46 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) ->
533602
)
534603
if feedback_result and should_send_log:
535604
feedback_content = []
536-
for _i, mem_item in enumerate(feedback_result):
537-
feedback_content.append(
538-
{
539-
"content": mem_item.memory,
540-
"id": mem_item["id"],
541-
}
605+
for mem_item in feedback_result:
606+
# Safely access attributes, assuming mem_item could be dict or object
607+
mem_id = (
608+
getattr(mem_item, "id", None) or mem_item.get("id")
609+
if isinstance(mem_item, dict)
610+
else None
611+
)
612+
mem_memory = (
613+
getattr(mem_item, "memory", None) or mem_item.get("memory")
614+
if isinstance(mem_item, dict)
615+
else None
542616
)
617+
618+
if mem_id and mem_memory:
619+
feedback_content.append(
620+
{
621+
"content": mem_memory,
622+
"id": mem_id,
623+
}
624+
)
625+
else:
626+
logger.warning(
627+
"Skipping malformed mem_item in feedback_result. user_id=%s mem_cube_id=%s task_id=%s item=%s",
628+
user_id,
629+
mem_cube_id,
630+
task_id,
631+
mem_item,
632+
stack_info=True,
633+
)
634+
635+
if not feedback_content:
636+
logger.warning(
637+
"No valid feedback content generated from feedback_result. user_id=%s mem_cube_id=%s task_id=%s",
638+
user_id,
639+
mem_cube_id,
640+
task_id,
641+
stack_info=True,
642+
)
643+
return
644+
543645
event = self.create_event_log(
544646
label="feedbackMemory",
545647
from_memory_type=USER_INPUT_TYPE,
@@ -552,7 +654,7 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) ->
552654
memory_len=len(feedback_content),
553655
memcube_name=self._map_memcube_name(mem_cube_id),
554656
)
555-
event.task_id = message.task_id
657+
event.task_id = task_id
556658
self._submit_web_logs([event])
557659

558660
except Exception as e:

0 commit comments

Comments
 (0)