diff --git a/libs/infinity_emb/infinity_emb/engine.py b/libs/infinity_emb/infinity_emb/engine.py index 153e15ba..71ba130c 100644 --- a/libs/infinity_emb/infinity_emb/engine.py +++ b/libs/infinity_emb/infinity_emb/engine.py @@ -2,7 +2,7 @@ # Copyright (c) 2023-now michaelfeilfeil from __future__ import annotations -from asyncio import Semaphore +from asyncio import Lock from typing import Iterable, Iterator, Optional, Union from infinity_emb.args import EngineArgs @@ -52,7 +52,8 @@ def __init__( self._engine_args = EngineArgs(**kwargs) self.running = False - self._running_sepamore: Optional[Semaphore] = None + self._running_lock: Optional[Lock] = None + self._running_counter: int = 0 self._model_replicas, self._min_inference_t, self._max_inference_t = select_model( self._engine_args ) @@ -81,9 +82,11 @@ def __str__(self) -> str: async def astart(self): """startup engine""" - if self._running_sepamore is None: - self._running_sepamore = Semaphore(1) - async with self._running_sepamore: + if self._running_lock is None: + self._running_lock = Lock() + async with self._running_lock: + # Counting the number of launches (when using multiple context managers asynchronously) + self._running_counter += 1 if not self.running: self.running = True self._batch_handler = BatchHandler( @@ -96,12 +99,16 @@ async def astart(self): ) await self._batch_handler.spawn() - async def astop(self): + async def astop(self, *, force: bool = False): """stop engine""" - if self._running_sepamore is None: + if self._running_lock is None: return - async with self._running_sepamore: - if self.running: + async with self._running_lock: + if force: + self._running_counter = 0 + if self._running_counter > 0: + self._running_counter -= 1 + if self.running and self._running_counter == 0: self.running = False await self._batch_handler.shutdown()