2424# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 
2525# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
2626
27+ import  queue 
28+ import  threading 
2729from  typing  import  Dict , List , Union 
2830
2931import  triton_python_backend_utils  as  pb_utils 
@@ -170,11 +172,18 @@ def __init__(self, labels: List[str], max_model_len: int):
170172class  VllmStatLogger (VllmStatLoggerBase ):
171173    """StatLogger is used as an adapter between vLLM stats collector and Triton metrics provider.""" 
172174
173-     # local_interval not used here. It's for vLLM logs to stdout. 
174-     def  __init__ (self , labels : Dict , max_model_len : int ) ->  None :
175+     def  __init__ (self , labels : Dict , max_model_len : int , log_logger ) ->  None :
175176        # Tracked stats over current local logging interval. 
177+         # local_interval not used here. It's for vLLM logs to stdout. 
176178        super ().__init__ (local_interval = 0 )
177179        self .metrics  =  TritonMetrics (labels , max_model_len )
180+         self .log_logger  =  log_logger 
181+ 
182+         # Starting the metrics thread. It allows vLLM to keep making progress 
183+         # while reporting metrics to triton metrics service. 
184+         self ._logger_queue  =  queue .Queue ()
185+         self ._logger_thread  =  threading .Thread (target = self .logger_loop )
186+         self ._logger_thread .start ()
178187
179188    def  info (self , type : str , obj : SupportsMetricsInfo ) ->  None :
180189        pass 
@@ -190,7 +199,7 @@ def _log_counter(self, counter, data: Union[int, float]) -> None:
190199            None 
191200        """ 
192201        if  data  !=  0 :
193-             counter . increment ( data )
202+             self . _logger_queue . put_nowait (( counter ,  "increment" ,  data ) )
194203
195204    def  _log_histogram (self , histogram , data : Union [List [int ], List [float ]]) ->  None :
196205        """Convenience function for logging list to histogram. 
@@ -203,7 +212,7 @@ def _log_histogram(self, histogram, data: Union[List[int], List[float]]) -> None
203212            None 
204213        """ 
205214        for  datum  in  data :
206-             histogram . observe ( datum )
215+             self . _logger_queue . put_nowait (( histogram ,  "observe" ,  datum ) )
207216
208217    def  log (self , stats : VllmStats ) ->  None :
209218        """Report stats to Triton metrics server. 
@@ -246,3 +255,24 @@ def log(self, stats: VllmStats) -> None:
246255            self ._log_counter (metric , data )
247256        for  metric , data  in  histogram_metrics :
248257            self ._log_histogram (metric , data )
258+ 
259+     def  logger_loop (self ):
260+         while  True :
261+             item  =  self ._logger_queue .get ()
262+             # To signal shutdown a None item will be added to the queue. 
263+             if  item  is  None :
264+                 break 
265+             metric , command , data  =  item 
266+             if  command  ==  "increment" :
267+                 metric .increment (data )
268+             elif  command  ==  "observe" :
269+                 metric .observe (data )
270+             else :
271+                 self .log_logger .log_error (f"Undefined command name: { command }  " )
272+ 
273+     def  finalize (self ):
274+         # Shutdown the logger thread. 
275+         self ._logger_queue .put (None )
276+         if  self ._logger_thread  is  not   None :
277+             self ._logger_thread .join ()
278+             self ._logger_thread  =  None 
0 commit comments