@@ -238,12 +238,19 @@ async def _run_llm_engine(self):
238238 # Counter to keep track of ongoing request counts.
239239 self ._ongoing_request_count = 0
240240
241+ # Check if metrics are enabled. The ZMQ process cannot be used when metrics are
242+ # enabled.
243+ self ._enable_metrics = (
244+ self ._get_bool_config_param ("REPORT_CUSTOM_METRICS" )
245+ and not self ._aync_engine_args .disable_log_stats
246+ )
247+
241248 try :
242249 # Start the vLLM engine. The engine lives for the scope of this with
243250 # statement.
244251 async with build_async_engine_client_from_engine_args (
245252 engine_args = self ._aync_engine_args ,
246- disable_frontend_multiprocessing = False ,
253+ disable_frontend_multiprocessing = self . _enable_metrics ,
247254 ) as engine :
248255 # Capture the engine event loop and make it visible to other threads.
249256 self ._event_loop = asyncio .get_running_loop ()
@@ -334,20 +341,20 @@ def _setup_lora(self):
334341 )
335342
336343 def _setup_metrics (self ):
337- # Create vLLM custom metrics
338344 self ._vllm_metrics = None
339- if (
340- self . _get_bool_config_param ( "REPORT_CUSTOM_METRICS" )
341- and not self . _aync_engine_args . disable_log_stats
342- ) :
345+ # TODO: Do not read metrics directly from the vLLM engine, read from prometheus
346+ # client to allow the use of ZMQ process when metrics are enabled. See
347+ # https://github.com/vllm-project/vllm/blob/v0.6.3.post1/vllm/entrypoints/openai/api_server.py#L222-L245
348+ if self . _enable_metrics :
343349 try :
344350 labels = {
345351 "model" : self .args ["model_name" ],
346352 "version" : self .args ["model_version" ],
347353 }
348354 # Add vLLM custom metrics
355+ engine_config = self ._llm_engine .engine .model_config
349356 self ._vllm_metrics = VllmStatLogger (
350- labels , self . _llm_engine . model_config .max_model_len , self .logger
357+ labels , engine_config .max_model_len , self .logger
351358 )
352359 self ._llm_engine .add_logger ("triton" , self ._vllm_metrics )
353360 except pb_utils .TritonModelException as e :
@@ -786,6 +793,12 @@ def _check_health(self, requests):
786793
787794 def finalize (self ):
788795 self .logger .log_info ("[vllm] Issuing finalize to vllm backend" )
796+ self ._llm_engine_shutdown_event .set ()
797+
798+ # Shutdown the event thread.
799+ if self ._event_thread is not None :
800+ self ._event_thread .join ()
801+ self ._event_thread = None
789802
790803 # Shutdown the response thread.
791804 self ._response_queue .put (None )
@@ -797,12 +810,6 @@ def finalize(self):
797810 if self ._vllm_metrics is not None :
798811 self ._vllm_metrics .finalize ()
799812
800- # Shutdown the event thread and engine.
801- self ._llm_engine_shutdown_event .set ()
802- if self ._event_thread is not None :
803- self ._event_thread .join ()
804- self ._event_thread = None
805-
806813 # When using parallel tensors, the stub process may not shutdown due to
807814 # unreleased references, so manually run the garbage collector once.
808815 self .logger .log_info ("[vllm] Running Garbage Collector on finalize..." )
0 commit comments