Skip to content

Commit 1afec31

Browse files
committed
fix(app): add coroutine retry
1 parent 8733a1c commit 1afec31

File tree

2 files changed

+60
-18
lines changed
  • src/tgdb

2 files changed

+60
-18
lines changed

src/tgdb/main/server/di.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
RelationSchemasFromInMemoryDbAsRelationViews,
1515
)
1616
from tgdb.presentation.fastapi.common.app import (
17-
FastAPIAppCoroutines,
17+
FastAPIAppBackground,
1818
FastAPIAppRouters,
1919
FastAPIAppVersion,
2020
)
@@ -46,10 +46,10 @@ def provide_fast_api_app_coroutines(
4646
self,
4747
output_commits_to_tuples: OutputCommitsToTuples,
4848
output_commits: OutputCommits,
49-
) -> FastAPIAppCoroutines:
50-
return FastAPIAppCoroutines((
51-
output_commits(),
52-
output_commits_to_tuples(),
49+
) -> FastAPIAppBackground:
50+
return FastAPIAppBackground((
51+
output_commits,
52+
output_commits_to_tuples,
5353
))
5454

5555
@provide(scope=Scope.APP)

src/tgdb/presentation/fastapi/common/app.py

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import asyncio
2-
from collections.abc import AsyncIterator, Coroutine
3-
from contextlib import asynccontextmanager, suppress
4-
from typing import Any, NewType, cast
2+
from collections.abc import AsyncIterator, Callable, Coroutine
3+
from contextlib import asynccontextmanager
4+
from dataclasses import dataclass, field
5+
from types import TracebackType
6+
from typing import Any, NewType, Self, cast
57

68
from dishka import AsyncContainer
79
from dishka.integrations.fastapi import setup_dishka
@@ -11,24 +13,64 @@
1113
from tgdb.presentation.fastapi.common.tags import tags_metadata
1214

1315

14-
FastAPIAppCoroutines = NewType(
15-
"FastAPIAppCoroutines", tuple[Coroutine[Any, Any, Any], ...]
16+
FastAPIAppBackground = NewType(
17+
"FastAPIAppBackground", tuple[Callable[[], Coroutine[Any, Any, Any]], ...]
1618
)
1719
FastAPIAppRouters = NewType("FastAPIAppRouters", tuple[APIRouter, ...])
1820
FastAPIAppVersion = NewType("FastAPIAppVersion", str)
1921

2022

23+
@dataclass(frozen=True, unsafe_hash=False)
24+
class LefespanBackground:
25+
_loop: asyncio.AbstractEventLoop = field(
26+
default_factory=asyncio.get_running_loop
27+
)
28+
_tasks: set[asyncio.Task[Any]] = field(init=False, default_factory=set)
29+
30+
async def __aenter__(self) -> Self:
31+
return self
32+
33+
async def __aexit__(
34+
self,
35+
error_type: type[BaseException] | None,
36+
error: BaseException | None,
37+
traceback: TracebackType | None,
38+
) -> None:
39+
for task in self._tasks:
40+
task.cancel()
41+
42+
def add(
43+
self, func: Callable[[], Coroutine[Any, Any, Any]],
44+
) -> None:
45+
decorated_func = self._decorator(func)
46+
self._create_task(decorated_func())
47+
48+
def _decorator(
49+
self, func: Callable[[], Coroutine[Any, Any, Any]]
50+
) -> Callable[[], Coroutine[Any, Any, Any]]:
51+
async def decorated_func() -> None:
52+
try:
53+
await func()
54+
except Exception as error:
55+
self._create_task(decorated_func())
56+
raise error from error
57+
58+
return decorated_func
59+
60+
def _create_task(self, coro: Coroutine[Any, Any, Any]) -> None:
61+
task = self._loop.create_task(coro)
62+
self._tasks.add(task)
63+
64+
2165
@asynccontextmanager
2266
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
23-
with suppress(asyncio.CancelledError):
24-
async with asyncio.TaskGroup() as tasks:
25-
for coroutine in cast(FastAPIAppCoroutines, app.state.coroutines):
26-
tasks.create_task(coroutine)
67+
async with LefespanBackground() as background:
68+
for func in cast(FastAPIAppBackground, app.state.background):
69+
background.add(func)
2770

28-
yield
71+
yield
2972

30-
await app.state.dishka_container.close()
31-
raise asyncio.CancelledError
73+
await app.state.dishka_container.close()
3274

3375

3476
async def app_from(container: AsyncContainer) -> FastAPI:
@@ -51,7 +93,7 @@ async def app_from(container: AsyncContainer) -> FastAPI:
5193
docs_url="/",
5294
)
5395

54-
app.state.coroutines = await container.get(FastAPIAppCoroutines)
96+
app.state.background = await container.get(FastAPIAppBackground)
5597
routers = await container.get(FastAPIAppRouters)
5698

5799
for router in routers:

0 commit comments

Comments
 (0)