@@ -77,7 +77,6 @@ async def _handle_complete_event(self, event: FastAPIEvent) -> None:
7777 queue_item = self .get_queue_item (item_id )
7878 if queue_item .status not in ["completed" , "failed" , "canceled" ]:
7979 queue_item = self ._set_queue_item_status (item_id = queue_item .item_id , status = "completed" )
80- self .__invoker .services .events .emit_queue_item_status_changed (queue_item )
8180 except SessionQueueItemNotFoundError :
8281 return
8382
@@ -86,17 +85,17 @@ async def _handle_error_event(self, event: FastAPIEvent) -> None:
8685 item_id = event [1 ]["data" ]["queue_item_id" ]
8786 error = event [1 ]["data" ]["error" ]
8887 queue_item = self .get_queue_item (item_id )
88+ # always set to failed if have an error, even if previously the item was marked completed or canceled
8989 queue_item = self ._set_queue_item_status (item_id = queue_item .item_id , status = "failed" , error = error )
90- self .__invoker .services .events .emit_queue_item_status_changed (queue_item )
9190 except SessionQueueItemNotFoundError :
9291 return
9392
9493 async def _handle_cancel_event (self , event : FastAPIEvent ) -> None :
9594 try :
9695 item_id = event [1 ]["data" ]["queue_item_id" ]
9796 queue_item = self .get_queue_item (item_id )
98- queue_item = self . _set_queue_item_status ( item_id = queue_item .item_id , status = " canceled")
99- self .__invoker . services . events . emit_queue_item_status_changed ( queue_item )
97+ if queue_item .status not in [ "completed" , "failed" , " canceled"]:
98+ queue_item = self ._set_queue_item_status ( item_id = queue_item . item_id , status = "canceled" )
10099 except SessionQueueItemNotFoundError :
101100 return
102101
@@ -354,7 +353,6 @@ def dequeue(self) -> Optional[SessionQueueItem]:
354353 return None
355354 queue_item = SessionQueueItem .from_dict (dict (result ))
356355 queue_item = self ._set_queue_item_status (item_id = queue_item .item_id , status = "in_progress" )
357- self .__invoker .services .events .emit_queue_item_status_changed (queue_item )
358356 return queue_item
359357
360358 def get_next (self , queue_id : str ) -> Optional [SessionQueueItem ]:
@@ -427,7 +425,9 @@ def _set_queue_item_status(
427425 raise
428426 finally :
429427 self .__lock .release ()
430- return self .get_queue_item (item_id )
428+ queue_item = self .get_queue_item (item_id )
429+ self .__invoker .services .events .emit_queue_item_status_changed (queue_item )
430+ return queue_item
431431
432432 def is_empty (self , queue_id : str ) -> IsEmptyResult :
433433 try :
@@ -565,7 +565,6 @@ def cancel_queue_item(self, item_id: int) -> SessionQueueItem:
565565 queue_batch_id = queue_item .batch_id ,
566566 graph_execution_state_id = queue_item .session_id ,
567567 )
568- self .__invoker .services .events .emit_queue_item_status_changed (queue_item )
569568 return queue_item
570569
571570 def cancel_by_batch_ids (self , queue_id : str , batch_ids : list [str ]) -> CancelByBatchIDsResult :
0 commit comments