Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions libs/infinity_emb/infinity_emb/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2023-now michaelfeilfeil
from __future__ import annotations

from asyncio import Semaphore
from asyncio import Lock
from typing import Iterable, Iterator, Optional, Union

from infinity_emb.args import EngineArgs
Expand Down Expand Up @@ -52,7 +52,8 @@ def __init__(
self._engine_args = EngineArgs(**kwargs)

self.running = False
self._running_sepamore: Optional[Semaphore] = None
self._running_lock: Optional[Lock] = None
self._running_counter: int = 0
self._model_replicas, self._min_inference_t, self._max_inference_t = select_model(
self._engine_args
)
Expand Down Expand Up @@ -81,9 +82,11 @@ def __str__(self) -> str:

async def astart(self):
"""startup engine"""
if self._running_sepamore is None:
self._running_sepamore = Semaphore(1)
async with self._running_sepamore:
if self._running_lock is None:
self._running_lock = Lock()
async with self._running_lock:
# Counting the number of launches (when using multiple context managers asynchronously)
self._running_counter += 1
if not self.running:
self.running = True
self._batch_handler = BatchHandler(
Expand All @@ -96,12 +99,16 @@ async def astart(self):
)
await self._batch_handler.spawn()

async def astop(self):
async def astop(self, *, force: bool = False):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is force=True needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be useful when used without a context manager, to ensure the process is terminated. For example, using the on_shutdown event in some framework.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The solution is generally debatable; I've only provided one possible solution.

In my project, I encountered a deadlock when using the context manager in asynchronously launched tasks. Ultimately, I settled on a one-time call to the start method.

"""stop engine"""
if self._running_sepamore is None:
if self._running_lock is None:
return
async with self._running_sepamore:
if self.running:
async with self._running_lock:
if force:
self._running_counter = 0
if self._running_counter > 0:
self._running_counter -= 1
if self.running and self._running_counter == 0:
self.running = False
await self._batch_handler.shutdown()

Expand Down