From 0e6f5ef938c7556238b4d62de4b08befc82a2e5a Mon Sep 17 00:00:00 2001 From: oandreeva-nv Date: Thu, 19 Dec 2024 18:00:42 -0800 Subject: [PATCH 1/2] replasing asyncio event with threading event --- ci/L0_backend_vllm/metrics_test/test.sh | 4 ++-- src/model.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/ci/L0_backend_vllm/metrics_test/test.sh b/ci/L0_backend_vllm/metrics_test/test.sh index 884be624..a9a4db90 100755 --- a/ci/L0_backend_vllm/metrics_test/test.sh +++ b/ci/L0_backend_vllm/metrics_test/test.sh @@ -75,11 +75,11 @@ run_test() { fi fi + set -e + # TODO: Non-graceful shutdown when metrics are enabled. kill $SERVER_PID wait $SERVER_PID - - set -e } RET=0 diff --git a/src/model.py b/src/model.py index 46c35a2f..ad2a5c88 100644 --- a/src/model.py +++ b/src/model.py @@ -228,7 +228,7 @@ def _init_engine(self): # Run the engine in a separate thread running the AsyncIO event loop. self._llm_engine = None self._llm_engine_start_cv = threading.Condition() - self._llm_engine_shutdown_event = asyncio.Event() + self._llm_engine_shutdown_event = threading.Event() self._event_thread = threading.Thread( target=asyncio.run, args=(self._run_llm_engine(),) ) @@ -268,7 +268,8 @@ async def _run_llm_engine(self): self._llm_engine_start_cv.notify_all() # Wait for the engine shutdown signal. - await self._llm_engine_shutdown_event.wait() + while not self._llm_engine_shutdown_event.is_set(): + await asyncio.sleep(0.1) # Prevent busy-waiting # Wait for the ongoing requests to complete. while self._ongoing_request_count > 0: From 8c3020870c68d2f0ed3d288ee80e1fc9e8798560 Mon Sep 17 00:00:00 2001 From: oandreeva-nv Date: Fri, 20 Dec 2024 12:07:20 -0800 Subject: [PATCH 2/2] Using self._event_loop.call_soon_threadsafe --- src/model.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/model.py b/src/model.py index ad2a5c88..4c351f14 100644 --- a/src/model.py +++ b/src/model.py @@ -228,7 +228,7 @@ def _init_engine(self): # Run the engine in a separate thread running the AsyncIO event loop. self._llm_engine = None self._llm_engine_start_cv = threading.Condition() - self._llm_engine_shutdown_event = threading.Event() + self._llm_engine_shutdown_event = asyncio.Event() self._event_thread = threading.Thread( target=asyncio.run, args=(self._run_llm_engine(),) ) @@ -268,8 +268,7 @@ async def _run_llm_engine(self): self._llm_engine_start_cv.notify_all() # Wait for the engine shutdown signal. - while not self._llm_engine_shutdown_event.is_set(): - await asyncio.sleep(0.1) # Prevent busy-waiting + await self._llm_engine_shutdown_event.wait() # Wait for the ongoing requests to complete. while self._ongoing_request_count > 0: @@ -801,7 +800,7 @@ def _check_health(self, requests): def finalize(self): self.logger.log_info("[vllm] Issuing finalize to vllm backend") - self._llm_engine_shutdown_event.set() + self._event_loop.call_soon_threadsafe(self._llm_engine_shutdown_event.set) # Shutdown the event thread. if self._event_thread is not None: