Skip to content

Commit 65fd72a

Browse files
author
Andrei Neagu
committed
added rpc lrt interface to dynamic-sidecar
1 parent 5687558 commit 65fd72a

File tree

13 files changed

+1053
-87
lines changed

13 files changed

+1053
-87
lines changed

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_sidecar/containers_long_running_tasks.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
@log_decorator(_logger, level=logging.DEBUG)
17-
async def pull_user_services_docker_images(
17+
async def pull_user_services_docker_images_task(
1818
rabbitmq_rpc_client: RabbitMQRPCClient,
1919
*,
2020
node_id: NodeID,
@@ -23,7 +23,9 @@ async def pull_user_services_docker_images(
2323
rpc_namespace = get_rpc_namespace(node_id)
2424
result = await rabbitmq_rpc_client.request(
2525
rpc_namespace,
26-
TypeAdapter(RPCMethodName).validate_python("pull_user_services_docker_images"),
26+
TypeAdapter(RPCMethodName).validate_python(
27+
"pull_user_services_docker_images_task"
28+
),
2729
lrt_namespace=lrt_namespace,
2830
)
2931
return TaskId(result)

services/director-v2/src/simcore_service_director_v2/cli/_close_and_save_service.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
)
1616
from servicelib.fastapi.http_client_thin import UnexpectedStatusError
1717
from servicelib.fastapi.long_running_tasks.client import (
18-
Client,
18+
HttpClient,
1919
periodic_task_result,
2020
setup,
2121
)
@@ -47,7 +47,7 @@ async def _minimal_app() -> AsyncIterator[FastAPI]:
4747

4848

4949
async def _track_and_display(
50-
client: Client,
50+
client: HttpClient,
5151
task_id: TaskId,
5252
update_interval: PositiveFloat,
5353
task_timeout: PositiveFloat,
@@ -105,7 +105,7 @@ async def async_close_and_save_service(
105105
f"{node_id}", is_disabled=True
106106
)
107107

108-
client = Client(
108+
client = HttpClient(
109109
app=app,
110110
async_client=thin_dv2_localhost_client.client,
111111
base_url=f"{TypeAdapter(AnyHttpUrl).validate_python(thin_dv2_localhost_client.BASE_ADDRESS)}",

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
BaseHttpClientError,
2323
UnexpectedStatusError,
2424
)
25-
from servicelib.fastapi.long_running_tasks.client import Client, periodic_task_result
25+
from servicelib.fastapi.long_running_tasks.client import (
26+
HttpClient,
27+
periodic_task_result,
28+
)
2629
from servicelib.logging_utils import log_context, log_decorator
2730
from servicelib.long_running_tasks.models import (
2831
ProgressCallback,
@@ -288,8 +291,8 @@ async def submit_docker_compose_spec(
288291
dynamic_sidecar_endpoint, compose_spec=compose_spec
289292
)
290293

291-
def _get_client(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> Client:
292-
return Client(
294+
def _get_client(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> HttpClient:
295+
return HttpClient(
293296
app=self._app,
294297
async_client=self._async_client,
295298
base_url=f"{dynamic_sidecar_endpoint}",

services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@
5050
from pytest_simcore.helpers.host import get_localhost_ip
5151
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
5252
from pytest_simcore.helpers.typing_env import EnvVarsDict
53-
from servicelib.fastapi.long_running_tasks.client import Client, periodic_task_result
53+
from servicelib.fastapi.long_running_tasks.client import (
54+
HttpClient,
55+
periodic_task_result,
56+
)
5457
from servicelib.long_running_tasks.models import (
5558
ProgressMessage,
5659
ProgressPercent,
@@ -863,7 +866,7 @@ async def _debug_progress_callback(
863866
logger.debug("%s: %.2f %s", task_id, percent, message)
864867

865868
async with periodic_task_result(
866-
Client(
869+
HttpClient(
867870
app=initialized_app,
868871
async_client=director_v2_client,
869872
base_url=TypeAdapter(AnyHttpUrl).validate_python(
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from fastapi import FastAPI
2+
from models_library.api_schemas_directorv2.dynamic_services import ContainersComposeSpec
3+
from servicelib.rabbitmq import RPCRouter
4+
5+
from ...core.validation import InvalidComposeSpecError
6+
from ...services import containers
7+
8+
router = RPCRouter()
9+
10+
11+
@router.expose(reraise_if_error_type=(InvalidComposeSpecError,))
12+
async def store_compose_spec(
13+
app: FastAPI,
14+
*,
15+
containers_compose_spec: ContainersComposeSpec,
16+
) -> None:
17+
await containers.store_compose_spec(app, containers_compose_spec)
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from fastapi import FastAPI
2+
from models_library.api_schemas_directorv2.dynamic_services import ContainersCreate
3+
from servicelib.long_running_tasks.models import LRTNamespace, TaskId
4+
from servicelib.rabbitmq import RPCRouter
5+
6+
from ...core.rabbitmq import get_rabbitmq_rpc_client
7+
from ...services import containers_long_running_tasks
8+
9+
router = RPCRouter()
10+
11+
12+
@router.expose()
13+
async def pull_user_services_docker_images_task(
14+
app: FastAPI, *, lrt_namespace: LRTNamespace
15+
) -> TaskId:
16+
rpc_client = get_rabbitmq_rpc_client(app)
17+
return await containers_long_running_tasks.pull_user_services_docker_images(
18+
rpc_client, lrt_namespace
19+
)
20+
21+
22+
@router.expose()
23+
async def create_service_containers_task(
24+
app: FastAPI, *, lrt_namespace: LRTNamespace, containers_create: ContainersCreate
25+
) -> TaskId:
26+
rpc_client = get_rabbitmq_rpc_client(app)
27+
return await containers_long_running_tasks.create_service_containers_task(
28+
rpc_client, lrt_namespace, containers_create
29+
)
30+
31+
32+
@router.expose()
33+
async def runs_docker_compose_down_task(
34+
app: FastAPI, *, lrt_namespace: LRTNamespace
35+
) -> TaskId:
36+
rpc_client = get_rabbitmq_rpc_client(app)
37+
return await containers_long_running_tasks.runs_docker_compose_down_task(
38+
rpc_client, lrt_namespace
39+
)
40+
41+
42+
@router.expose()
43+
async def state_restore_task(app: FastAPI, *, lrt_namespace: LRTNamespace) -> TaskId:
44+
rpc_client = get_rabbitmq_rpc_client(app)
45+
return await containers_long_running_tasks.state_restore_task(
46+
rpc_client, lrt_namespace
47+
)
48+
49+
50+
@router.expose()
51+
async def state_save_task(app: FastAPI, *, lrt_namespace: LRTNamespace) -> TaskId:
52+
rpc_client = get_rabbitmq_rpc_client(app)
53+
return await containers_long_running_tasks.state_save_task(
54+
rpc_client, lrt_namespace
55+
)
56+
57+
58+
@router.expose()
59+
async def ports_inputs_pull_task(
60+
app: FastAPI,
61+
*,
62+
lrt_namespace: LRTNamespace,
63+
port_keys: list[str] | None,
64+
) -> TaskId:
65+
rpc_client = get_rabbitmq_rpc_client(app)
66+
return await containers_long_running_tasks.ports_inputs_pull_task(
67+
rpc_client, lrt_namespace, port_keys
68+
)
69+
70+
71+
@router.expose()
72+
async def ports_outputs_pull_task(
73+
app: FastAPI,
74+
*,
75+
lrt_namespace: LRTNamespace,
76+
port_keys: list[str] | None,
77+
) -> TaskId:
78+
rpc_client = get_rabbitmq_rpc_client(app)
79+
return await containers_long_running_tasks.ports_outputs_pull_task(
80+
rpc_client, lrt_namespace, port_keys
81+
)
82+
83+
84+
@router.expose()
85+
async def ports_outputs_push_task(
86+
app: FastAPI, *, lrt_namespace: LRTNamespace
87+
) -> TaskId:
88+
rpc_client = get_rabbitmq_rpc_client(app)
89+
return await containers_long_running_tasks.ports_outputs_push_task(
90+
rpc_client, lrt_namespace
91+
)
92+
93+
94+
@router.expose()
95+
async def containers_restart_task(
96+
app: FastAPI, *, lrt_namespace: LRTNamespace
97+
) -> TaskId:
98+
rpc_client = get_rabbitmq_rpc_client(app)
99+
return await containers_long_running_tasks.containers_restart_task(
100+
rpc_client, lrt_namespace
101+
)

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/rpc/routes.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44

55
from ...core.rabbitmq import get_rabbitmq_rpc_server
66
from ...core.settings import ApplicationSettings
7-
from . import _disk, _disk_usage, _volumes
7+
from . import _containers, _containers_long_running_tasks, _disk, _disk_usage, _volumes
88

99
ROUTERS: list[RPCRouter] = [
10+
_containers_long_running_tasks.router,
11+
_containers.router,
1012
_disk_usage.router,
1113
_disk.router,
1214
_volumes.router,

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient:
121121
return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_server)
122122

123123

124+
def get_rabbitmq_rpc_client(app: FastAPI) -> RabbitMQRPCClient:
125+
_raise_if_not_initialized(app, "rabbitmq_rpc_client")
126+
return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_server)
127+
128+
124129
def setup_rabbitmq(app: FastAPI) -> None:
125130
async def on_startup() -> None:
126131
app_settings: ApplicationSettings = app.state.settings
@@ -134,7 +139,12 @@ async def on_startup() -> None:
134139
)
135140
with log_context(_logger, logging.INFO, msg="Create RabbitMQRPCClient"):
136141
app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create(
137-
client_name=f"dynamic-sidecar_rpc_{app_settings.DY_SIDECAR_NODE_ID}",
142+
client_name=f"dynamic-sidecar_rpc_server_{app_settings.DY_SIDECAR_NODE_ID}",
143+
settings=settings,
144+
)
145+
with log_context(_logger, logging.INFO, msg="Create RabbitMQRPCClient"):
146+
app.state.rabbitmq_rpc_client = await RabbitMQRPCClient.create(
147+
client_name=f"dynamic-sidecar_rpc_client_{app_settings.DY_SIDECAR_NODE_ID}",
138148
settings=settings,
139149
)
140150

@@ -143,6 +153,8 @@ async def on_shutdown() -> None:
143153
await app.state.rabbitmq_client.close()
144154
if app.state.rabbitmq_rpc_server:
145155
await app.state.rabbitmq_rpc_server.close()
156+
if app.state.rabbitmq_rpc_client:
157+
await app.state.rabbitmq_rpc_client.close()
146158

147159
app.add_event_handler("startup", on_startup)
148160
app.add_event_handler("shutdown", on_shutdown)

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -651,51 +651,51 @@ async def on_startup() -> None:
651651

652652
task_context.update(
653653
{
654-
task_pull_user_servcices_docker_images: dict(
655-
shared_store=shared_store,
656-
app=app,
657-
),
658-
task_create_service_containers: dict(
659-
app=app,
660-
settings=settings,
661-
shared_store=shared_store,
662-
application_health=application_health,
663-
),
664-
task_runs_docker_compose_down: dict(
665-
app=app,
666-
settings=settings,
667-
shared_store=shared_store,
668-
mounted_volumes=mounted_volumes,
669-
),
670-
task_restore_state: dict(
671-
app=app,
672-
settings=settings,
673-
mounted_volumes=mounted_volumes,
674-
),
675-
task_save_state: dict(
676-
app=app,
677-
settings=settings,
678-
mounted_volumes=mounted_volumes,
679-
),
680-
task_ports_inputs_pull: dict(
681-
app=app,
682-
settings=settings,
683-
mounted_volumes=mounted_volumes,
684-
inputs_state=inputs_state,
685-
),
686-
task_ports_outputs_pull: dict(
687-
app=app,
688-
mounted_volumes=mounted_volumes,
689-
),
690-
task_ports_outputs_push: dict(
691-
app=app,
692-
outputs_manager=outputs_manager,
693-
),
694-
task_containers_restart: dict(
695-
app=app,
696-
settings=settings,
697-
shared_store=shared_store,
698-
),
654+
task_pull_user_servcices_docker_images: {
655+
"shared_store": shared_store,
656+
"app": app,
657+
},
658+
task_create_service_containers: {
659+
"app": app,
660+
"settings": settings,
661+
"shared_store": shared_store,
662+
"application_health": application_health,
663+
},
664+
task_runs_docker_compose_down: {
665+
"app": app,
666+
"settings": settings,
667+
"shared_store": shared_store,
668+
"mounted_volumes": mounted_volumes,
669+
},
670+
task_restore_state: {
671+
"app": app,
672+
"settings": settings,
673+
"mounted_volumes": mounted_volumes,
674+
},
675+
task_save_state: {
676+
"app": app,
677+
"settings": settings,
678+
"mounted_volumes": mounted_volumes,
679+
},
680+
task_ports_inputs_pull: {
681+
"app": app,
682+
"settings": settings,
683+
"mounted_volumes": mounted_volumes,
684+
"inputs_state": inputs_state,
685+
},
686+
task_ports_outputs_pull: {
687+
"app": app,
688+
"mounted_volumes": mounted_volumes,
689+
},
690+
task_ports_outputs_push: {
691+
"app": app,
692+
"outputs_manager": outputs_manager,
693+
},
694+
task_containers_restart: {
695+
"app": app,
696+
"settings": settings,
697+
"shared_store": shared_store,
698+
},
699699
}
700700
)
701701

0 commit comments

Comments
 (0)