Skip to content

Commit 2ae799d

Browse files
author
Andrei Neagu
committed
refactor long running manager
1 parent 67f44e3 commit 2ae799d

File tree

11 files changed

+125
-300
lines changed

11 files changed

+125
-300
lines changed

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_manager.py

Lines changed: 1 addition & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,12 @@
1-
import datetime
2-
31
from aiohttp import web
4-
from settings_library.rabbit import RabbitSettings
5-
from settings_library.redis import RedisSettings
62

7-
from ...long_running_tasks import lrt_api
83
from ...long_running_tasks.base_long_running_manager import BaseLongRunningManager
9-
from ...long_running_tasks.models import RabbitNamespace, TaskContext
10-
from ...long_running_tasks.task import RedisNamespace, TasksManager
11-
from ...rabbitmq._client_rpc import RabbitMQRPCClient
4+
from ...long_running_tasks.models import TaskContext
125
from ._constants import APP_LONG_RUNNING_MANAGER_KEY
136
from ._request import get_task_context
147

158

169
class AiohttpLongRunningManager(BaseLongRunningManager):
17-
def __init__(
18-
self,
19-
app: web.Application,
20-
stale_task_check_interval: datetime.timedelta,
21-
stale_task_detect_timeout: datetime.timedelta,
22-
redis_settings: RedisSettings,
23-
rabbit_settings: RabbitSettings,
24-
redis_namespace: RedisNamespace,
25-
rabbit_namespace: RabbitNamespace,
26-
):
27-
self._app = app
28-
self._tasks_manager = TasksManager(
29-
stale_task_check_interval=stale_task_check_interval,
30-
stale_task_detect_timeout=stale_task_detect_timeout,
31-
redis_settings=redis_settings,
32-
redis_namespace=redis_namespace,
33-
)
34-
self._rabbit_namespace = rabbit_namespace
35-
self.rabbit_settings = rabbit_settings
36-
self._rpc_server: RabbitMQRPCClient | None = None
37-
self._rpc_client: RabbitMQRPCClient | None = None
38-
39-
@property
40-
def tasks_manager(self) -> TasksManager:
41-
return self._tasks_manager
42-
43-
@property
44-
def rpc_server(self) -> RabbitMQRPCClient:
45-
assert self._rpc_server is not None # nosec
46-
return self._rpc_server
47-
48-
@property
49-
def rpc_client(self) -> RabbitMQRPCClient:
50-
assert self._rpc_client is not None # nosec
51-
return self._rpc_client
52-
53-
@property
54-
def rabbit_namespace(self) -> RabbitNamespace:
55-
return self._rabbit_namespace
56-
57-
async def setup(self) -> None:
58-
await self._tasks_manager.setup()
59-
self._rpc_server = await RabbitMQRPCClient.create(
60-
client_name=f"lrt-server-{self.rabbit_namespace}",
61-
settings=self.rabbit_settings,
62-
)
63-
self._rpc_client = await RabbitMQRPCClient.create(
64-
client_name=f"lrt-client-{self.rabbit_namespace}",
65-
settings=self.rabbit_settings,
66-
)
67-
await lrt_api.register_rabbit_routes(self)
68-
69-
async def teardown(self) -> None:
70-
await self._tasks_manager.teardown()
71-
72-
if self._rpc_server is not None:
73-
await self._rpc_server.close()
74-
self._rpc_server = None
75-
76-
if self._rpc_client is not None:
77-
await self._rpc_client.close()
78-
self._rpc_client = None
7910

8011
@staticmethod
8112
def get_task_context(request: web.Request) -> TaskContext:

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ async def on_cleanup_ctx(app: web.Application) -> AsyncGenerator[None, None]:
175175
# add components to state
176176
app[APP_LONG_RUNNING_MANAGER_KEY] = long_running_manager = (
177177
AiohttpLongRunningManager(
178-
app=app,
179178
stale_task_check_interval=stale_task_check_interval,
180179
stale_task_detect_timeout=stale_task_detect_timeout,
181180
redis_settings=redis_settings,
Lines changed: 1 addition & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,5 @@
1-
import datetime
2-
3-
from fastapi import FastAPI
4-
from settings_library.rabbit import RabbitSettings
5-
from settings_library.redis import RedisSettings
6-
7-
from ...long_running_tasks import lrt_api
81
from ...long_running_tasks.base_long_running_manager import BaseLongRunningManager
9-
from ...long_running_tasks.models import RabbitNamespace
10-
from ...long_running_tasks.task import RedisNamespace, TasksManager
11-
from ...rabbitmq._client_rpc import RabbitMQRPCClient
122

133

144
class FastAPILongRunningManager(BaseLongRunningManager):
15-
def __init__(
16-
self,
17-
app: FastAPI,
18-
stale_task_check_interval: datetime.timedelta,
19-
stale_task_detect_timeout: datetime.timedelta,
20-
redis_settings: RedisSettings,
21-
rabbit_settings: RabbitSettings,
22-
redis_namespace: RedisNamespace,
23-
rabbit_namespace: RabbitNamespace,
24-
):
25-
self._app = app
26-
self._tasks_manager = TasksManager(
27-
stale_task_check_interval=stale_task_check_interval,
28-
stale_task_detect_timeout=stale_task_detect_timeout,
29-
redis_settings=redis_settings,
30-
redis_namespace=redis_namespace,
31-
)
32-
self._rabbit_namespace = rabbit_namespace
33-
self.rabbit_settings = rabbit_settings
34-
self._rpc_server: RabbitMQRPCClient | None = None
35-
self._rpc_client: RabbitMQRPCClient | None = None
36-
37-
@property
38-
def tasks_manager(self) -> TasksManager:
39-
return self._tasks_manager
40-
41-
@property
42-
def rpc_server(self) -> RabbitMQRPCClient:
43-
assert self._rpc_server is not None # nosec
44-
return self._rpc_server
45-
46-
@property
47-
def rpc_client(self) -> RabbitMQRPCClient:
48-
assert self._rpc_client is not None # nosec
49-
return self._rpc_client
50-
51-
@property
52-
def rabbit_namespace(self) -> str:
53-
return self._rabbit_namespace
54-
55-
async def setup(self) -> None:
56-
await self._tasks_manager.setup()
57-
self._rpc_server = await RabbitMQRPCClient.create(
58-
client_name=f"lrt-server-{self.rabbit_namespace}",
59-
settings=self.rabbit_settings,
60-
)
61-
self._rpc_client = await RabbitMQRPCClient.create(
62-
client_name=f"lrt-client-{self.rabbit_namespace}",
63-
settings=self.rabbit_settings,
64-
)
65-
await lrt_api.register_rabbit_routes(self)
66-
67-
async def teardown(self) -> None:
68-
await self._tasks_manager.teardown()
69-
70-
if self._rpc_server is not None:
71-
await self._rpc_server.close()
72-
self._rpc_server = None
73-
74-
if self._rpc_client is not None:
75-
await self._rpc_client.close()
76-
self._rpc_client = None
5+
pass

packages/service-library/src/servicelib/fastapi/long_running_tasks/_server.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ async def on_startup() -> None:
4646
# add components to state
4747
app.state.long_running_manager = long_running_manager = (
4848
FastAPILongRunningManager(
49-
app=app,
5049
stale_task_check_interval=stale_task_check_interval,
5150
stale_task_detect_timeout=stale_task_detect_timeout,
5251
redis_settings=redis_settings,

packages/service-library/src/servicelib/long_running_tasks/_rabbit/lrt_server.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import logging
21
import traceback
32
from typing import Any
43

@@ -10,8 +9,6 @@
109
from ..task import RegisteredTaskName
1110
from ._models import RPCErrorResponse
1211

13-
_logger = logging.getLogger(__name__)
14-
1512
router = RPCRouter()
1613

1714

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,83 @@
1-
from abc import ABC, abstractmethod
1+
import datetime
2+
3+
from settings_library.rabbit import RabbitSettings
4+
from settings_library.redis import RedisSettings
25

36
from ..rabbitmq._client_rpc import RabbitMQRPCClient
7+
from ._rabbit.namespace import get_namespace
48
from .models import RabbitNamespace
5-
from .task import TasksManager
9+
from .task import RedisNamespace, TasksManager
610

711

8-
class BaseLongRunningManager(ABC):
12+
class BaseLongRunningManager:
913
"""
1014
Provides a commond inteface for aiohttp and fastapi services
1115
"""
1216

17+
def __init__(
18+
self,
19+
stale_task_check_interval: datetime.timedelta,
20+
stale_task_detect_timeout: datetime.timedelta,
21+
redis_settings: RedisSettings,
22+
rabbit_settings: RabbitSettings,
23+
redis_namespace: RedisNamespace,
24+
rabbit_namespace: RabbitNamespace,
25+
):
26+
self._tasks_manager = TasksManager(
27+
stale_task_check_interval=stale_task_check_interval,
28+
stale_task_detect_timeout=stale_task_detect_timeout,
29+
redis_settings=redis_settings,
30+
redis_namespace=redis_namespace,
31+
)
32+
self._rabbit_namespace = rabbit_namespace
33+
self.rabbit_settings = rabbit_settings
34+
self._rpc_server: RabbitMQRPCClient | None = None
35+
self._rpc_client: RabbitMQRPCClient | None = None
36+
1337
@property
14-
@abstractmethod
1538
def tasks_manager(self) -> TasksManager:
16-
pass
39+
return self._tasks_manager
1740

1841
@property
19-
@abstractmethod
2042
def rpc_server(self) -> RabbitMQRPCClient:
21-
pass
43+
assert self._rpc_server is not None # nosec
44+
return self._rpc_server
45+
46+
@property
47+
def rpc_client(self) -> RabbitMQRPCClient:
48+
assert self._rpc_client is not None # nosec
49+
return self._rpc_client
2250

2351
@property
24-
@abstractmethod
2552
def rabbit_namespace(self) -> RabbitNamespace:
26-
pass
53+
return self._rabbit_namespace
2754

28-
@abstractmethod
2955
async def setup(self) -> None:
30-
pass
56+
await self._tasks_manager.setup()
57+
self._rpc_server = await RabbitMQRPCClient.create(
58+
client_name=f"lrt-server-{self.rabbit_namespace}",
59+
settings=self.rabbit_settings,
60+
)
61+
self._rpc_client = await RabbitMQRPCClient.create(
62+
client_name=f"lrt-client-{self.rabbit_namespace}",
63+
settings=self.rabbit_settings,
64+
)
65+
66+
from ._rabbit.lrt_server import router
67+
68+
await self.rpc_server.register_router(
69+
router,
70+
get_namespace(self.rabbit_namespace),
71+
self,
72+
)
3173

32-
@abstractmethod
3374
async def teardown(self) -> None:
34-
pass
75+
await self._tasks_manager.teardown()
76+
77+
if self._rpc_server is not None:
78+
await self._rpc_server.close()
79+
self._rpc_server = None
80+
81+
if self._rpc_client is not None:
82+
await self._rpc_client.close()
83+
self._rpc_client = None

packages/service-library/src/servicelib/long_running_tasks/lrt_api.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
1-
import logging
21
from typing import Any
32

43
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
54

6-
from ._rabbit import lrt_client, lrt_server
7-
from ._rabbit.namespace import get_namespace
5+
from ._rabbit import lrt_client
86
from .base_long_running_manager import BaseLongRunningManager
97
from .models import TaskBase, TaskContext, TaskId, TaskStatus
108
from .task import RegisteredTaskName
119

12-
_logger = logging.getLogger(__name__)
13-
1410

1511
async def start_task(
1612
rabbitmq_rpc_client: RabbitMQRPCClient,
@@ -115,12 +111,3 @@ async def remove_task(
115111
task_id=task_id,
116112
task_context=task_context,
117113
)
118-
119-
120-
async def register_rabbit_routes(long_running_manager: BaseLongRunningManager) -> None:
121-
rpc_server = long_running_manager.rpc_server
122-
await rpc_server.register_router(
123-
lrt_server.router,
124-
get_namespace(long_running_manager.rabbit_namespace),
125-
long_running_manager,
126-
)

0 commit comments

Comments
 (0)