Skip to content

Commit dd9df67

Browse files
author
Andrei Neagu
committed
added RPC endpoint
1 parent e29a397 commit dd9df67

File tree

5 files changed

+126
-6
lines changed

5 files changed

+126
-6
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import logging
2+
3+
from models_library.projects_nodes_io import NodeID
4+
from models_library.rabbitmq_basic_types import RPCMethodName
5+
from pydantic import TypeAdapter
6+
7+
from ....logging_utils import log_decorator
8+
from ... import RabbitMQRPCClient
9+
from ._utils import get_rpc_namespace
10+
11+
_logger = logging.getLogger(__name__)
12+
13+
14+
@log_decorator(_logger, level=logging.DEBUG)
15+
async def cleanup_local_long_running_tasks(
16+
rabbitmq_rpc_client: RabbitMQRPCClient,
17+
*,
18+
node_id: NodeID,
19+
) -> None:
20+
rpc_namespace = get_rpc_namespace(node_id)
21+
result = await rabbitmq_rpc_client.request(
22+
rpc_namespace,
23+
TypeAdapter(RPCMethodName).validate_python("cleanup_local_long_running_tasks"),
24+
)
25+
assert result is None # nosec
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from fastapi import FastAPI
2+
from servicelib.rabbitmq import RPCRouter
3+
4+
from ...services import long_running_tasks
5+
6+
router = RPCRouter()
7+
8+
9+
@router.expose()
10+
async def cleanup_local_long_running_tasks(app: FastAPI) -> None:
11+
await long_running_tasks.cleanup_long_running_tasks(app)

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rpc/routes.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44

55
from ...core.rabbitmq import get_rabbitmq_rpc_server
66
from ...core.settings import ApplicationSettings
7-
from . import _disk, _disk_usage, _volumes
7+
from . import _disk, _disk_usage, _long_running_tasks, _volumes
88

99
ROUTERS: list[RPCRouter] = [
1010
_disk_usage.router,
1111
_disk.router,
12+
_long_running_tasks.router,
1213
_volumes.router,
1314
]
1415

services/dynamic-sidecar/tests/unit/test_api_rest_long_running_tasks.py renamed to services/dynamic-sidecar/tests/unit/api/rest/test_long_running_tasks.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,12 @@
2525
]
2626

2727

28-
async def sleeping_forever(progress: TaskProgress) -> None:
28+
async def sleeping_very_long(progress: TaskProgress) -> None:
2929
_ = progress
30-
while True: # noqa: ASYNC110
31-
await asyncio.sleep(1)
30+
await asyncio.sleep(10_000)
3231

3332

34-
TaskRegistry.register(sleeping_forever)
33+
TaskRegistry.register(sleeping_very_long)
3534

3635

3736
@pytest.fixture
@@ -68,7 +67,7 @@ async def test_cleanup_long_running_tasks(test_client: TestClient) -> None:
6867
await lrt_api.start_task(
6968
long_running_manager.rpc_client,
7069
long_running_manager.lrt_namespace,
71-
sleeping_forever.__name__,
70+
sleeping_very_long.__name__,
7271
)
7372

7473
_assert_long_running_tasks_count(long_running_manager, count=1)
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# pylint: disable=protected-access
2+
# pylint: disable=redefined-outer-name
3+
# pylint: disable=unused-argument
4+
5+
import asyncio
6+
import json
7+
8+
import pytest
9+
from common_library.serialization import model_dump_with_secrets
10+
from fastapi import FastAPI
11+
from models_library.api_schemas_long_running_tasks.base import TaskProgress
12+
from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict
13+
from servicelib.fastapi.long_running_tasks._manager import FastAPILongRunningManager
14+
from servicelib.fastapi.long_running_tasks.server import (
15+
get_long_running_manager_from_app,
16+
)
17+
from servicelib.long_running_tasks import lrt_api
18+
from servicelib.long_running_tasks.task import TaskRegistry
19+
from servicelib.rabbitmq import RabbitMQRPCClient
20+
from servicelib.rabbitmq.rpc_interfaces.dynamic_sidecar import long_running_tasks
21+
from settings_library.rabbit import RabbitSettings
22+
from simcore_service_dynamic_sidecar.core.settings import ApplicationSettings
23+
24+
pytest_simcore_core_services_selection = [
25+
"rabbit",
26+
]
27+
28+
29+
async def sleeping_very_long(progress: TaskProgress) -> None:
30+
_ = progress
31+
await asyncio.sleep(10_000)
32+
33+
34+
TaskRegistry.register(sleeping_very_long)
35+
36+
37+
@pytest.fixture
38+
def mock_environment(
39+
monkeypatch: pytest.MonkeyPatch,
40+
rabbit_service: RabbitSettings,
41+
mock_environment: EnvVarsDict,
42+
) -> EnvVarsDict:
43+
return setenvs_from_dict(
44+
monkeypatch,
45+
{
46+
**mock_environment,
47+
"RABBIT_SETTINGS": json.dumps(
48+
model_dump_with_secrets(rabbit_service, show_secrets=True)
49+
),
50+
},
51+
)
52+
53+
54+
def _assert_long_running_tasks_count(
55+
long_running_manager: FastAPILongRunningManager, *, count: int
56+
) -> None:
57+
assert (
58+
len(long_running_manager.tasks_manager._created_tasks) == count # noqa: SLF001
59+
)
60+
61+
62+
async def test_cleanup_long_running_tasks(
63+
app: FastAPI, rpc_client: RabbitMQRPCClient
64+
) -> None:
65+
settings: ApplicationSettings = app.state.settings
66+
long_running_manager = get_long_running_manager_from_app(app)
67+
68+
_assert_long_running_tasks_count(long_running_manager, count=0)
69+
70+
await lrt_api.start_task(
71+
long_running_manager.rpc_client,
72+
long_running_manager.lrt_namespace,
73+
sleeping_very_long.__name__,
74+
)
75+
76+
_assert_long_running_tasks_count(long_running_manager, count=1)
77+
78+
result = await long_running_tasks.cleanup_local_long_running_tasks(
79+
rpc_client,
80+
node_id=settings.DY_SIDECAR_NODE_ID,
81+
)
82+
assert result is None
83+
84+
_assert_long_running_tasks_count(long_running_manager, count=0)

0 commit comments

Comments
 (0)