33import time
44import traceback
55
6- from queue import Queue
6+ from queue import PriorityQueue
77from typing import Literal
88
99import numpy as np
@@ -46,10 +46,13 @@ def __init__(
4646 def __str__ (self ) -> str :
4747 return f"QueueMessage(op={ self .op } , before_node={ self .before_node if self .before_node is None else len (self .before_node )} , after_node={ self .after_node if self .after_node is None else len (self .after_node )} )"
4848
49+ def __lt__ (self , other : "QueueMessage" ) -> bool :
50+ op_priority = {"add" : 2 , "remove" : 2 , "merge" : 1 }
51+ return op_priority [self .op ] < op_priority [other .op ]
4952
5053class GraphStructureReorganizer :
5154 def __init__ (self , graph_store : Neo4jGraphDB , llm : BaseLLM , embedder : OllamaEmbedder ):
52- self .queue = Queue ()
55+ self .queue = PriorityQueue () # Min-heap
5356 self .graph_store = graph_store
5457 self .llm = llm
5558 self .embedder = embedder
@@ -129,7 +132,7 @@ def handle_message(self, message: QueueMessage):
129132 handle_map = {
130133 "add" : self .handle_add ,
131134 "remove" : self .handle_remove ,
132- "update " : self .handle_update ,
135+ "merge " : self .handle_merge ,
133136 }
134137 handle_map [message .op ](message )
135138 logger .debug (f"message queue size: { self .queue .qsize ()} " )
@@ -151,16 +154,24 @@ def handle_add(self, message: QueueMessage):
151154 logger .info (f"Resolved conflict between { added_node .id } and { existing_node .id } ." )
152155
153156 def handle_remove (self , message : QueueMessage ):
154- """
155- Handle removing a memory item from the graph.
156- """
157157 logger .debug (f"Handling remove operation: { str (message )[:50 ]} " )
158158
159- def handle_update (self , message : QueueMessage ):
160- """
161- Handle updating a memory item in the graph.
162- """
163- logger .debug (f"Handling update operation: { str (message )[:50 ]} " )
159+ def handle_merge (self , message : QueueMessage ):
160+ after_node = message .after_node [0 ]
161+ logger .debug (f"Handling merge operation: <{ after_node .memory } >" )
162+ prompt = [
163+ {
164+ "role" : "user" ,
165+ "content" : MERGE_PROMPT .format (merged_text = after_node .memory ),
166+ },
167+ ]
168+ response = self .llm .generate (prompt )
169+ after_node .memory = response .strip ()
170+ self .graph_store .update_node (
171+ after_node .id ,
172+ {"memory" : after_node .memory , ** after_node .metadata .model_dump (exclude_none = True )},
173+ )
174+ logger .debug (f"Merged memory: { after_node .memory } " )
164175
165176 def optimize_structure (self , scope : str = "LongTermMemory" , local_tree_threshold : int = 10 ):
166177 """
@@ -473,3 +484,5 @@ def _convert_id_to_node(self, message: QueueMessage) -> QueueMessage:
473484 else :
474485 message .after_node [i ] = GraphDBNode (** raw_node )
475486 return message
487+
488+ MERGE_PROMPT = """You are given two pieces of text joined by the marker `⟵MERGED⟶`. Please carefully read both sides of the merged text. Your task is to summarize and consolidate all the factual details from both sides into a single, coherent text, without omitting any information. You must include every distinct detail mentioned in either text. Do not provide any explanation or analysis — only return the merged summary. Don't use pronouns or subjective language, just the facts as they are presented.\n {merged_text}"""
0 commit comments