@@ -600,62 +600,116 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) ->
600600 and hasattr (self .rabbitmq_config , "exchange_type" )
601601 and self .rabbitmq_config .exchange_type == "direct"
602602 )
603- if feedback_result and should_send_log :
604- feedback_content = []
605- for mem_item in feedback_result :
606- # Safely access attributes, assuming mem_item could be dict or object
603+ if should_send_log :
604+ record = feedback_result .get ("record" ) if isinstance (feedback_result , dict ) else {}
605+ add_records = record .get ("add" ) if isinstance (record , dict ) else []
606+ update_records = record .get ("update" ) if isinstance (record , dict ) else []
607+
608+ def _extract_fields (mem_item ):
607609 mem_id = (
608- getattr (mem_item , "id" , None ) or mem_item . get ( "id" )
609- if isinstance (mem_item , dict )
610- else None
610+ getattr (mem_item , "id" , None )
611+ if not isinstance (mem_item , dict )
612+ else mem_item . get ( "id" )
611613 )
612614 mem_memory = (
613- getattr (mem_item , "memory" , None ) or mem_item .get ("memory" )
614- if isinstance (mem_item , dict )
615- else None
615+ getattr (mem_item , "memory" , None )
616+ if not isinstance (mem_item , dict )
617+ else mem_item .get ("memory" ) or mem_item .get ("text" )
618+ )
619+ if mem_memory is None and isinstance (mem_item , dict ):
620+ mem_memory = mem_item .get ("text" )
621+ original_content = (
622+ getattr (mem_item , "origin_memory" , None )
623+ if not isinstance (mem_item , dict )
624+ else mem_item .get ("origin_memory" )
625+ or mem_item .get ("old_memory" )
626+ or mem_item .get ("original_content" )
616627 )
628+ return mem_id , mem_memory , original_content
629+
630+ kb_log_content : list [dict ] = []
631+
632+ for mem_item in add_records or []:
633+ mem_id , mem_memory , _ = _extract_fields (mem_item )
634+ if mem_id and mem_memory :
635+ kb_log_content .append (
636+ {
637+ "log_source" : "KNOWLEDGE_BASE_LOG" ,
638+ "trigger_source" : "Feedback" ,
639+ "operation" : "ADD" ,
640+ "memory_id" : mem_id ,
641+ "content" : mem_memory ,
642+ "original_content" : None ,
643+ "source_doc_id" : None ,
644+ }
645+ )
646+ else :
647+ logger .warning (
648+ "Skipping malformed feedback add item. user_id=%s mem_cube_id=%s task_id=%s item=%s" ,
649+ user_id ,
650+ mem_cube_id ,
651+ task_id ,
652+ mem_item ,
653+ stack_info = True ,
654+ )
617655
656+ for mem_item in update_records or []:
657+ mem_id , mem_memory , original_content = _extract_fields (mem_item )
618658 if mem_id and mem_memory :
619- feedback_content .append (
659+ kb_log_content .append (
620660 {
661+ "log_source" : "KNOWLEDGE_BASE_LOG" ,
662+ "trigger_source" : "Feedback" ,
663+ "operation" : "UPDATE" ,
664+ "memory_id" : mem_id ,
621665 "content" : mem_memory ,
622- "id" : mem_id ,
666+ "original_content" : original_content ,
667+ "source_doc_id" : None ,
623668 }
624669 )
625670 else :
626671 logger .warning (
627- "Skipping malformed mem_item in feedback_result . user_id=%s mem_cube_id=%s task_id=%s item=%s" ,
672+ "Skipping malformed feedback update item . user_id=%s mem_cube_id=%s task_id=%s item=%s" ,
628673 user_id ,
629674 mem_cube_id ,
630675 task_id ,
631676 mem_item ,
632677 stack_info = True ,
633678 )
634679
635- if not feedback_content :
680+ if kb_log_content :
681+ logger .info (
682+ "[DIAGNOSTIC] general_scheduler._mem_feedback_message_consumer: Creating knowledgeBaseUpdate event for feedback. user_id=%s mem_cube_id=%s task_id=%s items=%s" ,
683+ user_id ,
684+ mem_cube_id ,
685+ task_id ,
686+ len (kb_log_content ),
687+ )
688+ event = self .create_event_log (
689+ label = "knowledgeBaseUpdate" ,
690+ from_memory_type = USER_INPUT_TYPE ,
691+ to_memory_type = LONG_TERM_MEMORY_TYPE ,
692+ user_id = user_id ,
693+ mem_cube_id = mem_cube_id ,
694+ mem_cube = mem_cube ,
695+ memcube_log_content = kb_log_content ,
696+ metadata = None ,
697+ memory_len = len (kb_log_content ),
698+ memcube_name = self ._map_memcube_name (mem_cube_id ),
699+ )
700+ event .log_content = (
701+ f"Knowledge Base Memory Update: { len (kb_log_content )} changes."
702+ )
703+ event .task_id = task_id
704+ self ._submit_web_logs ([event ])
705+ else :
636706 logger .warning (
637- "No valid feedback content generated from feedback_result . user_id=%s mem_cube_id=%s task_id=%s" ,
707+ "No valid feedback content generated for web log . user_id=%s mem_cube_id=%s task_id=%s" ,
638708 user_id ,
639709 mem_cube_id ,
640710 task_id ,
641711 stack_info = True ,
642712 )
643- return
644-
645- event = self .create_event_log (
646- label = "feedbackMemory" ,
647- from_memory_type = USER_INPUT_TYPE ,
648- to_memory_type = LONG_TERM_MEMORY_TYPE ,
649- user_id = user_id ,
650- mem_cube_id = mem_cube_id ,
651- mem_cube = mem_cube ,
652- memcube_log_content = feedback_content ,
653- metadata = [],
654- memory_len = len (feedback_content ),
655- memcube_name = self ._map_memcube_name (mem_cube_id ),
656- )
657- event .task_id = task_id
658- self ._submit_web_logs ([event ])
659713
660714 except Exception as e :
661715 logger .error (f"Error processing feedbackMemory message: { e } " , exc_info = True )
0 commit comments