@@ -600,62 +600,130 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) ->
600600 and hasattr (self .rabbitmq_config , "exchange_type" )
601601 and self .rabbitmq_config .exchange_type == "direct"
602602 )
603- if feedback_result and should_send_log :
604- feedback_content = []
605- for mem_item in feedback_result :
606- # Safely access attributes, assuming mem_item could be dict or object
603+ if should_send_log :
604+ record = feedback_result .get ("record" ) if isinstance (feedback_result , dict ) else {}
605+ add_records = record .get ("add" ) if isinstance (record , dict ) else []
606+ update_records = record .get ("update" ) if isinstance (record , dict ) else []
607+
608+ def _extract_fields (mem_item ):
607609 mem_id = (
608- getattr (mem_item , "id" , None ) or mem_item . get ( "id" )
609- if isinstance (mem_item , dict )
610- else None
610+ getattr (mem_item , "id" , None )
611+ if not isinstance (mem_item , dict )
612+ else mem_item . get ( "id" )
611613 )
612614 mem_memory = (
613- getattr (mem_item , "memory" , None ) or mem_item .get ("memory" )
614- if isinstance (mem_item , dict )
615- else None
615+ getattr (mem_item , "memory" , None )
616+ if not isinstance (mem_item , dict )
617+ else mem_item .get ("memory" ) or mem_item .get ("text" )
618+ )
619+ if mem_memory is None and isinstance (mem_item , dict ):
620+ mem_memory = mem_item .get ("text" )
621+ original_content = (
622+ getattr (mem_item , "origin_memory" , None )
623+ if not isinstance (mem_item , dict )
624+ else mem_item .get ("origin_memory" )
625+ or mem_item .get ("old_memory" )
626+ or mem_item .get ("original_content" )
616627 )
628+ source_doc_id = None
629+ if isinstance (mem_item , dict ):
630+ source_doc_id = (
631+ mem_item .get ("source_doc_id" )
632+ or mem_item .get ("doc_id" )
633+ or (mem_item .get ("metadata" ) or {}).get ("source_doc_id" )
634+ )
635+ else :
636+ metadata = getattr (mem_item , "metadata" , None )
637+ if metadata :
638+ source_doc_id = getattr (metadata , "source_doc_id" , None ) or getattr (
639+ metadata , "doc_id" , None
640+ )
641+
642+ return mem_id , mem_memory , original_content , source_doc_id
643+
644+ kb_log_content : list [dict ] = []
617645
646+ for mem_item in add_records or []:
647+ mem_id , mem_memory , _ , source_doc_id = _extract_fields (mem_item )
618648 if mem_id and mem_memory :
619- feedback_content .append (
649+ kb_log_content .append (
620650 {
651+ "log_source" : "KNOWLEDGE_BASE_LOG" ,
652+ "trigger_source" : "Feedback" ,
653+ "operation" : "ADD" ,
654+ "memory_id" : mem_id ,
655+ "content" : mem_memory ,
656+ "original_content" : None ,
657+ "source_doc_id" : source_doc_id ,
658+ }
659+ )
660+ else :
661+ logger .warning (
662+ "Skipping malformed feedback add item. user_id=%s mem_cube_id=%s task_id=%s item=%s" ,
663+ user_id ,
664+ mem_cube_id ,
665+ task_id ,
666+ mem_item ,
667+ stack_info = True ,
668+ )
669+
670+ for mem_item in update_records or []:
671+ mem_id , mem_memory , original_content , source_doc_id = _extract_fields (mem_item )
672+ if mem_id and mem_memory :
673+ kb_log_content .append (
674+ {
675+ "log_source" : "KNOWLEDGE_BASE_LOG" ,
676+ "trigger_source" : "Feedback" ,
677+ "operation" : "UPDATE" ,
678+ "memory_id" : mem_id ,
621679 "content" : mem_memory ,
622- "id" : mem_id ,
680+ "original_content" : original_content ,
681+ "source_doc_id" : source_doc_id ,
623682 }
624683 )
625684 else :
626685 logger .warning (
627- "Skipping malformed mem_item in feedback_result . user_id=%s mem_cube_id=%s task_id=%s item=%s" ,
686+ "Skipping malformed feedback update item . user_id=%s mem_cube_id=%s task_id=%s item=%s" ,
628687 user_id ,
629688 mem_cube_id ,
630689 task_id ,
631690 mem_item ,
632691 stack_info = True ,
633692 )
634693
635- if not feedback_content :
694+ if kb_log_content :
695+ logger .info (
696+ "[DIAGNOSTIC] general_scheduler._mem_feedback_message_consumer: Creating knowledgeBaseUpdate event for feedback. user_id=%s mem_cube_id=%s task_id=%s items=%s" ,
697+ user_id ,
698+ mem_cube_id ,
699+ task_id ,
700+ len (kb_log_content ),
701+ )
702+ event = self .create_event_log (
703+ label = "knowledgeBaseUpdate" ,
704+ from_memory_type = USER_INPUT_TYPE ,
705+ to_memory_type = LONG_TERM_MEMORY_TYPE ,
706+ user_id = user_id ,
707+ mem_cube_id = mem_cube_id ,
708+ mem_cube = mem_cube ,
709+ memcube_log_content = kb_log_content ,
710+ metadata = None ,
711+ memory_len = len (kb_log_content ),
712+ memcube_name = self ._map_memcube_name (mem_cube_id ),
713+ )
714+ event .log_content = (
715+ f"Knowledge Base Memory Update: { len (kb_log_content )} changes."
716+ )
717+ event .task_id = task_id
718+ self ._submit_web_logs ([event ])
719+ else :
636720 logger .warning (
637- "No valid feedback content generated from feedback_result . user_id=%s mem_cube_id=%s task_id=%s" ,
721+ "No valid feedback content generated for web log . user_id=%s mem_cube_id=%s task_id=%s" ,
638722 user_id ,
639723 mem_cube_id ,
640724 task_id ,
641725 stack_info = True ,
642726 )
643- return
644-
645- event = self .create_event_log (
646- label = "feedbackMemory" ,
647- from_memory_type = USER_INPUT_TYPE ,
648- to_memory_type = LONG_TERM_MEMORY_TYPE ,
649- user_id = user_id ,
650- mem_cube_id = mem_cube_id ,
651- mem_cube = mem_cube ,
652- memcube_log_content = feedback_content ,
653- metadata = [],
654- memory_len = len (feedback_content ),
655- memcube_name = self ._map_memcube_name (mem_cube_id ),
656- )
657- event .task_id = task_id
658- self ._submit_web_logs ([event ])
659727
660728 except Exception as e :
661729 logger .error (f"Error processing feedbackMemory message: { e } " , exc_info = True )
0 commit comments