Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/celery-library/src/celery_library/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from celery import Celery # type: ignore[import-untyped]
from celery.worker.worker import WorkController # type: ignore[import-untyped]
from servicelib.celery.app_server import STARTUP_TIMEOUT, BaseAppServer
from servicelib.celery.app_server import BaseAppServer
from servicelib.logging_utils import log_context
from settings_library.celery import CelerySettings

Expand Down Expand Up @@ -55,7 +55,7 @@ async def _setup_task_manager():
)
thread.start()

startup_complete_event.wait(STARTUP_TIMEOUT * 1.1)
startup_complete_event.wait()


def on_worker_shutdown(sender, **_kwargs) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import asyncio
import datetime
import threading
from abc import ABC, abstractmethod
from asyncio import AbstractEventLoop
from typing import Final, Generic, TypeVar
from typing import Generic, TypeVar

from servicelib.celery.task_manager import TaskManager

STARTUP_TIMEOUT: Final[float] = datetime.timedelta(minutes=1).total_seconds()

T = TypeVar("T")


Expand Down Expand Up @@ -42,11 +39,11 @@ async def on_startup(self) -> None:
raise NotImplementedError

async def startup(
self, completed_event: threading.Event, shutdown_event: asyncio.Event
self, startup_completed_event: threading.Event, shutdown_event: asyncio.Event
) -> None:
self._shutdown_event = shutdown_event
completed_event.set()
await self.on_startup()
startup_completed_event.set()
await self._shutdown_event.wait()

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
from datetime import timedelta
from typing import Final

from asgi_lifespan import LifespanManager
from fastapi import FastAPI

from ...celery.app_server import BaseAppServer

_SHUTDOWN_TIMEOUT: Final[float] = timedelta(seconds=10).total_seconds()
_STARTUP_TIMEOUT: Final[float] = timedelta(minutes=1).total_seconds()


class FastAPIAppServer(BaseAppServer[FastAPI]):
def __init__(self, app: FastAPI):
Expand All @@ -18,8 +12,8 @@ def __init__(self, app: FastAPI):
async def on_startup(self) -> None:
self._lifespan_manager = LifespanManager(
self.app,
startup_timeout=_STARTUP_TIMEOUT,
shutdown_timeout=_SHUTDOWN_TIMEOUT,
startup_timeout=None,
shutdown_timeout=None,
)
await self._lifespan_manager.__aenter__()

Expand Down
Loading