3131class QueueMessage :
3232 def __init__ (
3333 self ,
34- op : Literal ["add" , "remove" , "merge" , "update" ],
34+ op : Literal ["add" , "remove" , "merge" , "update" , "end" ],
3535 # `str` for node and edge IDs, `GraphDBNode` and `GraphDBEdge` for actual objects
3636 before_node : list [str ] | list [GraphDBNode ] | None = None ,
3737 before_edge : list [str ] | list [GraphDBEdge ] | None = None ,
@@ -48,7 +48,7 @@ def __str__(self) -> str:
4848 return f"QueueMessage(op={ self .op } , before_node={ self .before_node if self .before_node is None else len (self .before_node )} , after_node={ self .after_node if self .after_node is None else len (self .after_node )} )"
4949
5050 def __lt__ (self , other : "QueueMessage" ) -> bool :
51- op_priority = {"add" : 2 , "remove" : 2 , "merge" : 1 }
51+ op_priority = {"add" : 2 , "remove" : 2 , "merge" : 1 , "end" : 0 }
5252 return op_priority [self .op ] < op_priority [other .op ]
5353
5454
@@ -103,7 +103,7 @@ def wait_until_current_task_done(self):
103103 def _run_message_consumer_loop (self ):
104104 while True :
105105 message = self .queue .get ()
106- if message is None :
106+ if message . op == "end" :
107107 break
108108
109109 try :
@@ -140,7 +140,7 @@ def stop(self):
140140 if not self .is_reorganize :
141141 return
142142
143- self .add_message (None )
143+ self .add_message (QueueMessage ( op = "end" ) )
144144 self .thread .join ()
145145 logger .info ("Reorganize thread stopped." )
146146 self ._stop_scheduler = True
@@ -158,9 +158,6 @@ def handle_message(self, message: QueueMessage):
158158
159159 def handle_add (self , message : QueueMessage ):
160160 logger .debug (f"Handling add operation: { str (message )[:500 ]} " )
161- assert message .before_node is None and message .before_edge is None , (
162- "Before node and edge should be None for `add` operation."
163- )
164161 # ———————— 1. check for conflicts ————————
165162 added_node = message .after_node [0 ]
166163 conflicts = self .conflict .detect (added_node , scope = added_node .metadata .memory_type )
@@ -586,7 +583,7 @@ def _link_cluster_nodes(self, parent_node: GraphDBNode, child_nodes: list[GraphD
586583
587584 def _preprocess_message (self , message : QueueMessage ) -> bool :
588585 message = self ._convert_id_to_node (message )
589- if None in message .after_node :
586+ if message . after_node is None or None in message .after_node :
590587 logger .debug (
591588 f"Found non-existent node in after_node in message: { message } , skip this message."
592589 )
0 commit comments