Skip to content

Commit e6009db

Browse files
Implement cloud add log handler for optimized scheduler
1 parent 647b4c0 commit e6009db

File tree

1 file changed

+50
-12
lines changed

1 file changed

+50
-12
lines changed

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -469,19 +469,57 @@ def send_add_log_messages_to_cloud_env(
469469
):
470470
"""
471471
Cloud logging path for add/update events.
472-
473-
Currently reuses local env logging to avoid missing method errors in subclasses.
474472
"""
475-
logger.info(
476-
"send_add_log_messages_to_cloud_env fallback to local handler. user_id=%s mem_cube_id=%s task_id=%s item_id=%s",
477-
msg.user_id,
478-
msg.mem_cube_id,
479-
msg.task_id,
480-
msg.item_id,
481-
)
482-
return self.send_add_log_messages_to_local_env(
483-
msg, prepared_add_items, prepared_update_items_with_original
484-
)
473+
kb_log_content: list[dict] = []
474+
info = msg.info or {}
475+
# Process added items
476+
for item in prepared_add_items:
477+
kb_log_content.append(
478+
{
479+
"log_source": "KNOWLEDGE_BASE_LOG",
480+
"trigger_source": info.get("trigger_source", "Messages"),
481+
"operation": "ADD",
482+
"memory_id": item.id,
483+
"content": item.memory,
484+
"original_content": None,
485+
"source_doc_id": getattr(item.metadata, "source_doc_id", None),
486+
}
487+
)
488+
489+
# Process updated items
490+
for item_data in prepared_update_items_with_original:
491+
item = item_data["new_item"]
492+
kb_log_content.append(
493+
{
494+
"log_source": "KNOWLEDGE_BASE_LOG",
495+
"trigger_source": info.get("trigger_source", "Messages"),
496+
"operation": "UPDATE",
497+
"memory_id": item.id,
498+
"content": item.memory,
499+
"original_content": item_data.get("original_content"),
500+
"source_doc_id": getattr(item.metadata, "source_doc_id", None),
501+
}
502+
)
503+
504+
if kb_log_content:
505+
logger.info(
506+
f"[DIAGNOSTIC] general_scheduler.send_add_log_messages_to_cloud_env: Creating event log for KB update. Label: knowledgeBaseUpdate, user_id: {msg.user_id}, mem_cube_id: {msg.mem_cube_id}, task_id: {msg.task_id}. KB content: {json.dumps(kb_log_content, indent=2)}"
507+
)
508+
event = self.create_event_log(
509+
label="knowledgeBaseUpdate",
510+
from_memory_type=USER_INPUT_TYPE,
511+
to_memory_type=LONG_TERM_MEMORY_TYPE,
512+
user_id=msg.user_id,
513+
mem_cube_id=msg.mem_cube_id,
514+
mem_cube=self.current_mem_cube,
515+
memcube_log_content=kb_log_content,
516+
metadata=None,
517+
memory_len=len(kb_log_content),
518+
memcube_name=self._map_memcube_name(msg.mem_cube_id),
519+
)
520+
event.log_content = f"Knowledge Base Memory Update: {len(kb_log_content)} changes."
521+
event.task_id = msg.task_id
522+
self._submit_web_logs([event])
485523

486524
def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
487525
logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.")

0 commit comments

Comments
 (0)