@@ -293,12 +293,12 @@ async def _receive_from_engine_worker_helper(self):
293293 producer_info .pending_token_return
294294 and latest_input .metadata == engine_worker_metadata
295295 ):
296- producer_info .pending_token_return = False
297296 # Send response to client
298297 logger .debug (
299298 f"Sending result from engine { engine_worker .get_engine_id ()} "
300299 f" to client { engine_worker_metadata .client_address } "
301300 )
301+ producer_info .pending_token_return = False
302302 await self .server .send_result (
303303 engine_worker_metadata .client_address ,
304304 producer_info .get_name (),
@@ -534,34 +534,25 @@ async def send_payload(self, metadata_payload):
534534 await self ._send_helper (metadata_payload .payload , heartbeat = False )
535535
536536 async def send_next_input (self ):
537- """Send next input from queue."""
538- # for _ in range(len(self._producers)):
539- # producer_info = self._producers.popleft()
540- # self._producers.append(producer_info)
541- # metadata_payload = await producer_info.get_input_from_queue(
542- # self._engine_id
543- # )
544- # if metadata_payload is not None:
545- # await self.send_payload(metadata_payload)
546- # return
537+ """Send next input."""
547538 for _ in range (len (self ._producers )):
548539 self ._producers .rotate (- 1 )
549540 producer = self ._producers [0 ]
550541
551542 # If a token return is pending, that means no engine has returned
552- # a result for the current input for this producer
543+ # a result for the current input for this producer. So we cannot
544+ # get a new item from the queue yet.
553545 if not producer .pending_token_return :
554546 # Send the next input from the queue
555547 metadata_payload = await producer .get_input_from_queue (
556548 self ._engine_id
557549 )
558550 if metadata_payload is not None :
559- producer .pending_token_return = True
560551 await self .send_payload (metadata_payload )
561552 return
562553
563554 # Send the latest available frame from this producer if we haven't
564- # processed it yet
555+ # processed it yet.
565556 metadata_payload = producer .latest_input_sent_to_engine
566557 if metadata_payload is None :
567558 continue
@@ -570,13 +561,14 @@ async def send_next_input(self):
570561 producer_id , None
571562 )
572563 if (
573- latest_processed_frame is None
574- or metadata_payload .metadata .frame_id
564+ latest_processed_frame is not None
565+ and metadata_payload .metadata .frame_id
575566 > latest_processed_frame .frame_id
576567 ):
577568 await self .send_payload (metadata_payload )
578569 return
579570
571+ # No input available
580572 self .clear_current_input_metadata ()
581573
582574 async def add_producer (self , producer_info ):
0 commit comments