Skip to content

Commit 1f37607

Browse files
GitHKAndrei Neagu
andauthored
✨ Add long running rpc interface to dynamic-sidecar (#8255)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent 4c530c7 commit 1f37607

File tree

40 files changed

+1738
-427
lines changed

40 files changed

+1738
-427
lines changed

.github/workflows/ci-testing-deploy.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1057,7 +1057,7 @@ jobs:
10571057
unit-test-dynamic-sidecar:
10581058
needs: changes
10591059
if: ${{ needs.changes.outputs.dynamic-sidecar == 'true' || github.event_name == 'push' || github.event.inputs.force_all_builds == 'true' }}
1060-
timeout-minutes: 18 # if this timeout gets too small, then split the tests
1060+
timeout-minutes: 19 # if this timeout gets too small, then split the tests
10611061
name: "[unit] dynamic-sidecar"
10621062
runs-on: ${{ matrix.os }}
10631063
strategy:

packages/models-library/src/models_library/api_schemas_directorv2/dynamic_services.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from ..resource_tracker import HardwareInfo, PricingInfo
77
from ..services import ServicePortKey
8+
from ..services_creation import CreateServiceMetricsAdditionalParams
89
from ..services_resources import ServiceResourcesDict, ServiceResourcesDictHelpers
910
from ..wallets import WalletInfo
1011
from .dynamic_services_service import RunningDynamicServiceDetails, ServiceDetails
@@ -104,3 +105,11 @@ class GetProjectInactivityResponse(BaseModel):
104105
is_inactive: bool
105106

106107
model_config = ConfigDict(json_schema_extra={"example": {"is_inactive": "false"}})
108+
109+
110+
class ContainersComposeSpec(BaseModel):
111+
docker_compose_yaml: str
112+
113+
114+
class ContainersCreate(BaseModel):
115+
metrics_params: CreateServiceMetricsAdditionalParams

packages/service-library/src/servicelib/fastapi/long_running_tasks/_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def retry_on_http_errors(
8888
assert asyncio.iscoroutinefunction(request_func)
8989

9090
@functools.wraps(request_func)
91-
async def request_wrapper(zelf: "Client", *args, **kwargs) -> Any:
91+
async def request_wrapper(zelf: "HttpClient", *args, **kwargs) -> Any:
9292
async for attempt in AsyncRetrying(
9393
stop=stop_after_attempt(max_attempt_number=3),
9494
wait=wait_exponential(min=1),
@@ -106,7 +106,7 @@ async def request_wrapper(zelf: "Client", *args, **kwargs) -> Any:
106106
return request_wrapper
107107

108108

109-
class Client:
109+
class HttpClient:
110110
"""
111111
This is a client that aims to simplify the requests to get the
112112
status, result and/or cancel of a long running task.

packages/service-library/src/servicelib/fastapi/long_running_tasks/_context_manager.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
import warnings
34
from collections.abc import AsyncIterator
45
from contextlib import asynccontextmanager
56
from typing import Any, Final
@@ -15,7 +16,7 @@
1516
TaskId,
1617
TaskStatus,
1718
)
18-
from ._client import Client
19+
from ._client import HttpClient
1920

2021
_logger = logging.getLogger(__name__)
2122

@@ -69,7 +70,7 @@ async def update(
6970

7071
@asynccontextmanager
7172
async def periodic_task_result(
72-
client: Client,
73+
client: HttpClient,
7374
task_id: TaskId,
7475
*,
7576
task_timeout: PositiveFloat,
@@ -95,6 +96,13 @@ async def periodic_task_result(
9596
raises: `asyncio.TimeoutError` NOTE: the remote task will also be removed
9697
"""
9798

99+
warnings.warn(
100+
"This context manager is deprecated and will be removed in future releases. "
101+
"Please use the `servicelib.long_running_tasks.lrt_api` instead.",
102+
DeprecationWarning,
103+
stacklevel=2,
104+
)
105+
98106
progress_manager = _ProgressManager(progress_callback)
99107

100108
async def _status_update() -> TaskStatus:

packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
Provides a convenient way to return the result given a TaskId.
33
"""
44

5-
from ._client import Client, setup
5+
from ._client import HttpClient, setup
66
from ._context_manager import periodic_task_result # attach to the same object!
77

88
__all__: tuple[str, ...] = (
9-
"Client",
9+
"HttpClient",
1010
"periodic_task_result",
1111
"setup",
1212
)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import logging
2+
3+
from models_library.api_schemas_directorv2.dynamic_services import ContainersComposeSpec
4+
from models_library.projects_nodes_io import NodeID
5+
from models_library.rabbitmq_basic_types import RPCMethodName
6+
from pydantic import TypeAdapter
7+
8+
from ....logging_utils import log_decorator
9+
from ... import RabbitMQRPCClient
10+
from ._utils import get_rpc_namespace
11+
12+
_logger = logging.getLogger(__name__)
13+
14+
15+
@log_decorator(_logger, level=logging.DEBUG)
16+
async def create_compose_spec(
17+
rabbitmq_rpc_client: RabbitMQRPCClient,
18+
*,
19+
node_id: NodeID,
20+
containers_compose_spec: ContainersComposeSpec,
21+
) -> None:
22+
rpc_namespace = get_rpc_namespace(node_id)
23+
result = await rabbitmq_rpc_client.request(
24+
rpc_namespace,
25+
TypeAdapter(RPCMethodName).validate_python("create_compose_spec"),
26+
containers_compose_spec=containers_compose_spec,
27+
)
28+
assert result is None # nosec
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import logging
2+
3+
from models_library.api_schemas_directorv2.dynamic_services import ContainersCreate
4+
from models_library.projects_nodes_io import NodeID
5+
from models_library.rabbitmq_basic_types import RPCMethodName
6+
from pydantic import TypeAdapter
7+
from servicelib.long_running_tasks.models import LRTNamespace, TaskId
8+
9+
from ....logging_utils import log_decorator
10+
from ... import RabbitMQRPCClient
11+
from ._utils import get_rpc_namespace
12+
13+
_logger = logging.getLogger(__name__)
14+
15+
16+
@log_decorator(_logger, level=logging.DEBUG)
17+
async def pull_user_services_images(
18+
rabbitmq_rpc_client: RabbitMQRPCClient,
19+
*,
20+
node_id: NodeID,
21+
lrt_namespace: LRTNamespace,
22+
) -> TaskId:
23+
rpc_namespace = get_rpc_namespace(node_id)
24+
result = await rabbitmq_rpc_client.request(
25+
rpc_namespace,
26+
TypeAdapter(RPCMethodName).validate_python("pull_user_services_images"),
27+
lrt_namespace=lrt_namespace,
28+
)
29+
assert isinstance(result, TaskId) # nosec
30+
return result
31+
32+
33+
@log_decorator(_logger, level=logging.DEBUG)
34+
async def create_user_services(
35+
rabbitmq_rpc_client: RabbitMQRPCClient,
36+
*,
37+
node_id: NodeID,
38+
lrt_namespace: LRTNamespace,
39+
containers_create: ContainersCreate,
40+
) -> TaskId:
41+
rpc_namespace = get_rpc_namespace(node_id)
42+
result = await rabbitmq_rpc_client.request(
43+
rpc_namespace,
44+
TypeAdapter(RPCMethodName).validate_python("create_user_services"),
45+
lrt_namespace=lrt_namespace,
46+
containers_create=containers_create,
47+
)
48+
assert isinstance(result, TaskId) # nosec
49+
return result
50+
51+
52+
@log_decorator(_logger, level=logging.DEBUG)
53+
async def remove_user_services(
54+
rabbitmq_rpc_client: RabbitMQRPCClient,
55+
*,
56+
node_id: NodeID,
57+
lrt_namespace: LRTNamespace,
58+
) -> TaskId:
59+
rpc_namespace = get_rpc_namespace(node_id)
60+
result = await rabbitmq_rpc_client.request(
61+
rpc_namespace,
62+
TypeAdapter(RPCMethodName).validate_python("remove_user_services"),
63+
lrt_namespace=lrt_namespace,
64+
)
65+
assert isinstance(result, TaskId) # nosec
66+
return result
67+
68+
69+
@log_decorator(_logger, level=logging.DEBUG)
70+
async def restore_user_services_state_paths(
71+
rabbitmq_rpc_client: RabbitMQRPCClient,
72+
*,
73+
node_id: NodeID,
74+
lrt_namespace: LRTNamespace,
75+
) -> TaskId:
76+
rpc_namespace = get_rpc_namespace(node_id)
77+
result = await rabbitmq_rpc_client.request(
78+
rpc_namespace,
79+
TypeAdapter(RPCMethodName).validate_python("restore_user_services_state_paths"),
80+
lrt_namespace=lrt_namespace,
81+
)
82+
assert isinstance(result, TaskId) # nosec
83+
return result
84+
85+
86+
@log_decorator(_logger, level=logging.DEBUG)
87+
async def save_user_services_state_paths(
88+
rabbitmq_rpc_client: RabbitMQRPCClient,
89+
*,
90+
node_id: NodeID,
91+
lrt_namespace: LRTNamespace,
92+
) -> TaskId:
93+
rpc_namespace = get_rpc_namespace(node_id)
94+
result = await rabbitmq_rpc_client.request(
95+
rpc_namespace,
96+
TypeAdapter(RPCMethodName).validate_python("save_user_services_state_paths"),
97+
lrt_namespace=lrt_namespace,
98+
)
99+
assert isinstance(result, TaskId) # nosec
100+
return result
101+
102+
103+
@log_decorator(_logger, level=logging.DEBUG)
104+
async def pull_user_services_input_ports(
105+
rabbitmq_rpc_client: RabbitMQRPCClient,
106+
*,
107+
node_id: NodeID,
108+
lrt_namespace: LRTNamespace,
109+
port_keys: list[str] | None,
110+
) -> TaskId:
111+
rpc_namespace = get_rpc_namespace(node_id)
112+
result = await rabbitmq_rpc_client.request(
113+
rpc_namespace,
114+
TypeAdapter(RPCMethodName).validate_python("pull_user_services_input_ports"),
115+
lrt_namespace=lrt_namespace,
116+
port_keys=port_keys,
117+
)
118+
assert isinstance(result, TaskId) # nosec
119+
return result
120+
121+
122+
@log_decorator(_logger, level=logging.DEBUG)
123+
async def pull_user_services_output_ports(
124+
rabbitmq_rpc_client: RabbitMQRPCClient,
125+
*,
126+
node_id: NodeID,
127+
lrt_namespace: LRTNamespace,
128+
port_keys: list[str] | None,
129+
) -> TaskId:
130+
rpc_namespace = get_rpc_namespace(node_id)
131+
result = await rabbitmq_rpc_client.request(
132+
rpc_namespace,
133+
TypeAdapter(RPCMethodName).validate_python("pull_user_services_output_ports"),
134+
lrt_namespace=lrt_namespace,
135+
port_keys=port_keys,
136+
)
137+
assert isinstance(result, TaskId) # nosec
138+
return result
139+
140+
141+
@log_decorator(_logger, level=logging.DEBUG)
142+
async def push_user_services_output_ports(
143+
rabbitmq_rpc_client: RabbitMQRPCClient,
144+
*,
145+
node_id: NodeID,
146+
lrt_namespace: LRTNamespace,
147+
) -> TaskId:
148+
rpc_namespace = get_rpc_namespace(node_id)
149+
result = await rabbitmq_rpc_client.request(
150+
rpc_namespace,
151+
TypeAdapter(RPCMethodName).validate_python("push_user_services_output_ports"),
152+
lrt_namespace=lrt_namespace,
153+
)
154+
assert isinstance(result, TaskId) # nosec
155+
return result
156+
157+
158+
@log_decorator(_logger, level=logging.DEBUG)
159+
async def restart_user_services(
160+
rabbitmq_rpc_client: RabbitMQRPCClient,
161+
*,
162+
node_id: NodeID,
163+
lrt_namespace: LRTNamespace,
164+
) -> TaskId:
165+
rpc_namespace = get_rpc_namespace(node_id)
166+
result = await rabbitmq_rpc_client.request(
167+
rpc_namespace,
168+
TypeAdapter(RPCMethodName).validate_python("restart_user_services"),
169+
lrt_namespace=lrt_namespace,
170+
)
171+
assert isinstance(result, TaskId) # nosec
172+
return result

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_sidecar/volumes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414

1515
@log_decorator(_logger, level=logging.DEBUG)
16-
async def save_volume_state(
16+
async def update_volume_status(
1717
rabbitmq_rpc_client: RabbitMQRPCClient,
1818
*,
1919
node_id: NodeID,
@@ -23,7 +23,7 @@ async def save_volume_state(
2323
rpc_namespace = get_rpc_namespace(node_id)
2424
result = await rabbitmq_rpc_client.request(
2525
rpc_namespace,
26-
TypeAdapter(RPCMethodName).validate_python("save_volume_state"),
26+
TypeAdapter(RPCMethodName).validate_python("update_volume_status"),
2727
status=status,
2828
category=category,
2929
)

0 commit comments

Comments
 (0)