Skip to content

Commit 21cd8ae

Browse files
committed
Merge branch 'master' into 1973-add-celery-worker-to-api-server
2 parents 95ae30d + 3e8d9ed commit 21cd8ae

File tree

76 files changed

+2198
-609
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+2198
-609
lines changed

.github/workflows/ci-testing-deploy.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1057,7 +1057,7 @@ jobs:
10571057
unit-test-dynamic-sidecar:
10581058
needs: changes
10591059
if: ${{ needs.changes.outputs.dynamic-sidecar == 'true' || github.event_name == 'push' || github.event.inputs.force_all_builds == 'true' }}
1060-
timeout-minutes: 18 # if this timeout gets too small, then split the tests
1060+
timeout-minutes: 19 # if this timeout gets too small, then split the tests
10611061
name: "[unit] dynamic-sidecar"
10621062
runs-on: ${{ matrix.os }}
10631063
strategy:

packages/models-library/src/models_library/api_schemas_directorv2/dynamic_services.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from ..resource_tracker import HardwareInfo, PricingInfo
77
from ..services import ServicePortKey
8+
from ..services_creation import CreateServiceMetricsAdditionalParams
89
from ..services_resources import ServiceResourcesDict, ServiceResourcesDictHelpers
910
from ..wallets import WalletInfo
1011
from .dynamic_services_service import RunningDynamicServiceDetails, ServiceDetails
@@ -104,3 +105,11 @@ class GetProjectInactivityResponse(BaseModel):
104105
is_inactive: bool
105106

106107
model_config = ConfigDict(json_schema_extra={"example": {"is_inactive": "false"}})
108+
109+
110+
class ContainersComposeSpec(BaseModel):
111+
docker_compose_yaml: str
112+
113+
114+
class ContainersCreate(BaseModel):
115+
metrics_params: CreateServiceMetricsAdditionalParams

packages/service-library/src/servicelib/fastapi/long_running_tasks/_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def retry_on_http_errors(
8888
assert asyncio.iscoroutinefunction(request_func)
8989

9090
@functools.wraps(request_func)
91-
async def request_wrapper(zelf: "Client", *args, **kwargs) -> Any:
91+
async def request_wrapper(zelf: "HttpClient", *args, **kwargs) -> Any:
9292
async for attempt in AsyncRetrying(
9393
stop=stop_after_attempt(max_attempt_number=3),
9494
wait=wait_exponential(min=1),
@@ -106,7 +106,7 @@ async def request_wrapper(zelf: "Client", *args, **kwargs) -> Any:
106106
return request_wrapper
107107

108108

109-
class Client:
109+
class HttpClient:
110110
"""
111111
This is a client that aims to simplify the requests to get the
112112
status, result and/or cancel of a long running task.

packages/service-library/src/servicelib/fastapi/long_running_tasks/_context_manager.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
import warnings
34
from collections.abc import AsyncIterator
45
from contextlib import asynccontextmanager
56
from typing import Any, Final
@@ -15,7 +16,7 @@
1516
TaskId,
1617
TaskStatus,
1718
)
18-
from ._client import Client
19+
from ._client import HttpClient
1920

2021
_logger = logging.getLogger(__name__)
2122

@@ -69,7 +70,7 @@ async def update(
6970

7071
@asynccontextmanager
7172
async def periodic_task_result(
72-
client: Client,
73+
client: HttpClient,
7374
task_id: TaskId,
7475
*,
7576
task_timeout: PositiveFloat,
@@ -95,6 +96,13 @@ async def periodic_task_result(
9596
raises: `asyncio.TimeoutError` NOTE: the remote task will also be removed
9697
"""
9798

99+
warnings.warn(
100+
"This context manager is deprecated and will be removed in future releases. "
101+
"Please use the `servicelib.long_running_tasks.lrt_api` instead.",
102+
DeprecationWarning,
103+
stacklevel=2,
104+
)
105+
98106
progress_manager = _ProgressManager(progress_callback)
99107

100108
async def _status_update() -> TaskStatus:

packages/service-library/src/servicelib/fastapi/long_running_tasks/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
Provides a convenient way to return the result given a TaskId.
33
"""
44

5-
from ._client import Client, setup
5+
from ._client import HttpClient, setup
66
from ._context_manager import periodic_task_result # attach to the same object!
77

88
__all__: tuple[str, ...] = (
9-
"Client",
9+
"HttpClient",
1010
"periodic_task_result",
1111
"setup",
1212
)

packages/service-library/src/servicelib/long_running_tasks/_serialization.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,6 @@ def loads(obj_str: str) -> Any:
7878
msg = f"Could not reconstruct object from data: {data}"
7979
raise ValueError(msg) from e
8080

81+
if isinstance(data, Exception):
82+
raise data
8183
return data

packages/service-library/src/servicelib/long_running_tasks/errors.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ class TaskExceptionError(BaseLongRunningError):
3434
)
3535

3636

37+
class TaskRaisedUnserializableError(BaseLongRunningError):
38+
msg_template: str = (
39+
"Task {task_id} raised an exception that could not be serialized.\n"
40+
"Original exception: '{original_exception_str}'\n"
41+
"As a consequence, the following error was raised: '{exception}'"
42+
)
43+
44+
3745
class TaskClientTimeoutError(BaseLongRunningError):
3846
msg_template: str = (
3947
"Timed out after {timeout} seconds while awaiting '{task_id}' to complete"

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
TaskNotCompletedError,
3333
TaskNotFoundError,
3434
TaskNotRegisteredError,
35+
TaskRaisedUnserializableError,
3536
)
3637
from .models import (
3738
LRTNamespace,
@@ -353,7 +354,30 @@ async def _tasks_monitor(self) -> None:
353354
},
354355
),
355356
)
356-
result_field = ResultField(str_error=dumps(e))
357+
try:
358+
result_field = ResultField(str_error=dumps(e))
359+
except (
360+
Exception # pylint:disable=broad-except
361+
) as serialization_error:
362+
_logger.exception(
363+
**create_troubleshootting_log_kwargs(
364+
(
365+
f"Execution of {task_id=} finished with an error "
366+
f"which could not be serialized"
367+
),
368+
error=serialization_error,
369+
tip="Check the error above for more details",
370+
),
371+
)
372+
result_field = ResultField(
373+
str_error=dumps(
374+
TaskRaisedUnserializableError(
375+
task_id=task_id,
376+
exception=serialization_error,
377+
original_exception_str=f"{e}",
378+
)
379+
)
380+
)
357381

358382
# update and store in Redis
359383
updates = {"is_done": is_done, "result_field": task_data.result_field}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import logging
2+
3+
from models_library.api_schemas_directorv2.dynamic_services import ContainersComposeSpec
4+
from models_library.projects_nodes_io import NodeID
5+
from models_library.rabbitmq_basic_types import RPCMethodName
6+
from pydantic import TypeAdapter
7+
8+
from ....logging_utils import log_decorator
9+
from ... import RabbitMQRPCClient
10+
from ._utils import get_rpc_namespace
11+
12+
_logger = logging.getLogger(__name__)
13+
14+
15+
@log_decorator(_logger, level=logging.DEBUG)
16+
async def create_compose_spec(
17+
rabbitmq_rpc_client: RabbitMQRPCClient,
18+
*,
19+
node_id: NodeID,
20+
containers_compose_spec: ContainersComposeSpec,
21+
) -> None:
22+
rpc_namespace = get_rpc_namespace(node_id)
23+
result = await rabbitmq_rpc_client.request(
24+
rpc_namespace,
25+
TypeAdapter(RPCMethodName).validate_python("create_compose_spec"),
26+
containers_compose_spec=containers_compose_spec,
27+
)
28+
assert result is None # nosec
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import logging
2+
3+
from models_library.api_schemas_directorv2.dynamic_services import ContainersCreate
4+
from models_library.projects_nodes_io import NodeID
5+
from models_library.rabbitmq_basic_types import RPCMethodName
6+
from pydantic import TypeAdapter
7+
from servicelib.long_running_tasks.models import LRTNamespace, TaskId
8+
9+
from ....logging_utils import log_decorator
10+
from ... import RabbitMQRPCClient
11+
from ._utils import get_rpc_namespace
12+
13+
_logger = logging.getLogger(__name__)
14+
15+
16+
@log_decorator(_logger, level=logging.DEBUG)
17+
async def pull_user_services_images(
18+
rabbitmq_rpc_client: RabbitMQRPCClient,
19+
*,
20+
node_id: NodeID,
21+
lrt_namespace: LRTNamespace,
22+
) -> TaskId:
23+
rpc_namespace = get_rpc_namespace(node_id)
24+
result = await rabbitmq_rpc_client.request(
25+
rpc_namespace,
26+
TypeAdapter(RPCMethodName).validate_python("pull_user_services_images"),
27+
lrt_namespace=lrt_namespace,
28+
)
29+
assert isinstance(result, TaskId) # nosec
30+
return result
31+
32+
33+
@log_decorator(_logger, level=logging.DEBUG)
34+
async def create_user_services(
35+
rabbitmq_rpc_client: RabbitMQRPCClient,
36+
*,
37+
node_id: NodeID,
38+
lrt_namespace: LRTNamespace,
39+
containers_create: ContainersCreate,
40+
) -> TaskId:
41+
rpc_namespace = get_rpc_namespace(node_id)
42+
result = await rabbitmq_rpc_client.request(
43+
rpc_namespace,
44+
TypeAdapter(RPCMethodName).validate_python("create_user_services"),
45+
lrt_namespace=lrt_namespace,
46+
containers_create=containers_create,
47+
)
48+
assert isinstance(result, TaskId) # nosec
49+
return result
50+
51+
52+
@log_decorator(_logger, level=logging.DEBUG)
53+
async def remove_user_services(
54+
rabbitmq_rpc_client: RabbitMQRPCClient,
55+
*,
56+
node_id: NodeID,
57+
lrt_namespace: LRTNamespace,
58+
) -> TaskId:
59+
rpc_namespace = get_rpc_namespace(node_id)
60+
result = await rabbitmq_rpc_client.request(
61+
rpc_namespace,
62+
TypeAdapter(RPCMethodName).validate_python("remove_user_services"),
63+
lrt_namespace=lrt_namespace,
64+
)
65+
assert isinstance(result, TaskId) # nosec
66+
return result
67+
68+
69+
@log_decorator(_logger, level=logging.DEBUG)
70+
async def restore_user_services_state_paths(
71+
rabbitmq_rpc_client: RabbitMQRPCClient,
72+
*,
73+
node_id: NodeID,
74+
lrt_namespace: LRTNamespace,
75+
) -> TaskId:
76+
rpc_namespace = get_rpc_namespace(node_id)
77+
result = await rabbitmq_rpc_client.request(
78+
rpc_namespace,
79+
TypeAdapter(RPCMethodName).validate_python("restore_user_services_state_paths"),
80+
lrt_namespace=lrt_namespace,
81+
)
82+
assert isinstance(result, TaskId) # nosec
83+
return result
84+
85+
86+
@log_decorator(_logger, level=logging.DEBUG)
87+
async def save_user_services_state_paths(
88+
rabbitmq_rpc_client: RabbitMQRPCClient,
89+
*,
90+
node_id: NodeID,
91+
lrt_namespace: LRTNamespace,
92+
) -> TaskId:
93+
rpc_namespace = get_rpc_namespace(node_id)
94+
result = await rabbitmq_rpc_client.request(
95+
rpc_namespace,
96+
TypeAdapter(RPCMethodName).validate_python("save_user_services_state_paths"),
97+
lrt_namespace=lrt_namespace,
98+
)
99+
assert isinstance(result, TaskId) # nosec
100+
return result
101+
102+
103+
@log_decorator(_logger, level=logging.DEBUG)
104+
async def pull_user_services_input_ports(
105+
rabbitmq_rpc_client: RabbitMQRPCClient,
106+
*,
107+
node_id: NodeID,
108+
lrt_namespace: LRTNamespace,
109+
port_keys: list[str] | None,
110+
) -> TaskId:
111+
rpc_namespace = get_rpc_namespace(node_id)
112+
result = await rabbitmq_rpc_client.request(
113+
rpc_namespace,
114+
TypeAdapter(RPCMethodName).validate_python("pull_user_services_input_ports"),
115+
lrt_namespace=lrt_namespace,
116+
port_keys=port_keys,
117+
)
118+
assert isinstance(result, TaskId) # nosec
119+
return result
120+
121+
122+
@log_decorator(_logger, level=logging.DEBUG)
123+
async def pull_user_services_output_ports(
124+
rabbitmq_rpc_client: RabbitMQRPCClient,
125+
*,
126+
node_id: NodeID,
127+
lrt_namespace: LRTNamespace,
128+
port_keys: list[str] | None,
129+
) -> TaskId:
130+
rpc_namespace = get_rpc_namespace(node_id)
131+
result = await rabbitmq_rpc_client.request(
132+
rpc_namespace,
133+
TypeAdapter(RPCMethodName).validate_python("pull_user_services_output_ports"),
134+
lrt_namespace=lrt_namespace,
135+
port_keys=port_keys,
136+
)
137+
assert isinstance(result, TaskId) # nosec
138+
return result
139+
140+
141+
@log_decorator(_logger, level=logging.DEBUG)
142+
async def push_user_services_output_ports(
143+
rabbitmq_rpc_client: RabbitMQRPCClient,
144+
*,
145+
node_id: NodeID,
146+
lrt_namespace: LRTNamespace,
147+
) -> TaskId:
148+
rpc_namespace = get_rpc_namespace(node_id)
149+
result = await rabbitmq_rpc_client.request(
150+
rpc_namespace,
151+
TypeAdapter(RPCMethodName).validate_python("push_user_services_output_ports"),
152+
lrt_namespace=lrt_namespace,
153+
)
154+
assert isinstance(result, TaskId) # nosec
155+
return result
156+
157+
158+
@log_decorator(_logger, level=logging.DEBUG)
159+
async def restart_user_services(
160+
rabbitmq_rpc_client: RabbitMQRPCClient,
161+
*,
162+
node_id: NodeID,
163+
lrt_namespace: LRTNamespace,
164+
) -> TaskId:
165+
rpc_namespace = get_rpc_namespace(node_id)
166+
result = await rabbitmq_rpc_client.request(
167+
rpc_namespace,
168+
TypeAdapter(RPCMethodName).validate_python("restart_user_services"),
169+
lrt_namespace=lrt_namespace,
170+
)
171+
assert isinstance(result, TaskId) # nosec
172+
return result

0 commit comments

Comments
 (0)