@@ -290,9 +290,10 @@ async def _receive_from_engine_worker_helper(self):
290290 # Check if this engine is the first to finish processing the latest
291291 # input. If so, it should get the next input from the queue.
292292 if (
293- not producer_info .token_returned
293+ producer_info .pending_token_return
294294 and latest_input .metadata == engine_worker_metadata
295295 ):
296+ producer_info .pending_token_return = False
296297 # Send response to client
297298 logger .debug (
298299 f"Sending result from engine { engine_worker .get_engine_id ()} "
@@ -305,10 +306,9 @@ async def _receive_from_engine_worker_helper(self):
305306 result ,
306307 return_token = True ,
307308 )
308- producer_info .token_returned = True
309309
310310 # Send the next input to the engine from the queue
311- await engine_worker .send_next_input (from_queue = True )
311+ await engine_worker .send_next_input ()
312312 return
313313
314314 if engine_worker .get_all_responses_required ():
@@ -533,7 +533,7 @@ async def send_payload(self, metadata_payload):
533533 self ._latest_input_processed [metadata .producer_id ] = metadata
534534 await self ._send_helper (metadata_payload .payload , heartbeat = False )
535535
536- async def send_next_input (self , from_queue = False ):
536+ async def send_next_input (self ):
537537 """Send next input from queue."""
538538 # for _ in range(len(self._producers)):
539539 # producer_info = self._producers.popleft()
@@ -548,16 +548,15 @@ async def send_next_input(self, from_queue=False):
548548 self ._producers .rotate (- 1 )
549549 producer = self ._producers [0 ]
550550
551- # If the token for the last input sent to an engine was already
552- # returned to the client, we can start working on the next
553- # input in the queue
554- if producer .token_returned :
551+ # If a token return is pending, that means no engine has returned
552+ # a result for the current input for this producer
553+ if not producer .pending_token_return :
555554 # Send the next input from the queue
556555 metadata_payload = await producer .get_input_from_queue (
557556 self ._engine_id
558557 )
559558 if metadata_payload is not None :
560- producer .token_returned = False
559+ producer .pending_token_return = True
561560 await self .send_payload (metadata_payload )
562561 return
563562
@@ -608,8 +607,9 @@ def __init__(self, producer_id, engine_workers, size_for_queues):
608607 self .latest_input_sent_to_engine = None
609608 self .target_engines = None
610609
611- # Whether a token was returned for the latest input
612- self .token_returned = None
610+ # Whether the token return for the last input sent to at least one
611+ # engine is pending
612+ self .pending_token_return = None
613613
614614 def get_name (self ):
615615 return self ._producer_id
@@ -707,7 +707,7 @@ async def process_input_from_client(
707707 # Latest input is only set if the input was sent to at least one
708708 # engine.
709709 self .latest_input_sent_to_engine = metadata_payload
710- self .token_returned = False
710+ self .pending_token_return = True
711711 return (StatusCode .SUCCESS , "" )
712712
713713 async def add_input_to_queue (self , metadata_payload ):
@@ -735,5 +735,5 @@ async def get_input_from_queue(self, engine_id):
735735 return None
736736 metadata_payload = self ._input_queue [0 ]
737737 self .latest_input_sent_to_engine = metadata_payload
738- self .token_returned = False
738+ self .pending_token_return = True
739739 return self ._input_queue .popleft ()
0 commit comments