Skip to content

Commit f6a4e68

Browse files
GitHKAndrei Neagu
andauthored
🐛 Fixed issue with accumulating tracked services (#6631)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent e994f5d commit f6a4e68

File tree

17 files changed

+251
-65
lines changed

17 files changed

+251
-65
lines changed

services/autoscaling/requirements/_base.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ markupsafe==2.1.5
240240
# jinja2
241241
mdurl==0.1.2
242242
# via markdown-it-py
243-
msgpack==1.0.8
243+
msgpack==1.1.0
244244
# via
245245
# -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
246246
# distributed

services/catalog/requirements/_base.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ markupsafe==2.1.5
174174
# mako
175175
mdurl==0.1.2
176176
# via markdown-it-py
177-
msgpack==1.0.8
177+
msgpack==1.1.0
178178
# via aiocache
179179
multidict==6.0.5
180180
# via

services/clusters-keeper/requirements/_base.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ markupsafe==2.1.5
238238
# jinja2
239239
mdurl==0.1.2
240240
# via markdown-it-py
241-
msgpack==1.0.8
241+
msgpack==1.1.0
242242
# via
243243
# -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
244244
# distributed

services/dask-sidecar/requirements/_base.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ markupsafe==2.1.5
170170
# via jinja2
171171
mdurl==0.1.2
172172
# via markdown-it-py
173-
msgpack==1.0.8
173+
msgpack==1.1.0
174174
# via distributed
175175
multidict==6.0.5
176176
# via

services/dask-sidecar/requirements/_dask-distributed.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ markupsafe==2.1.5
4646
# via
4747
# -c requirements/./_base.txt
4848
# jinja2
49-
msgpack==1.0.8
49+
msgpack==1.1.0
5050
# via
5151
# -c requirements/./_base.txt
5252
# distributed

services/director-v2/requirements/_base.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ markupsafe==2.1.5
314314
# mako
315315
mdurl==0.1.2
316316
# via markdown-it-py
317-
msgpack==1.0.8
317+
msgpack==1.1.0
318318
# via
319319
# -r requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
320320
# aiocache

services/director-v2/requirements/_test.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ markupsafe==2.1.5
171171
# -c requirements/_base.txt
172172
# jinja2
173173
# mako
174-
msgpack==1.0.8
174+
msgpack==1.1.0
175175
# via
176176
# -c requirements/_base.txt
177177
# distributed

services/dynamic-scheduler/requirements/_base.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ httpx
2020
packaging
2121
python-socketio
2222
typer[all]
23+
u-msgpack-python
2324
uvicorn[standard]

services/dynamic-scheduler/requirements/_base.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,8 @@ typing-extensions==4.10.0
368368
# opentelemetry-sdk
369369
# pydantic
370370
# typer
371+
u-msgpack-python==2.8.0
372+
# via -r requirements/_base.in
371373
urllib3==2.2.2
372374
# via
373375
# -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt
Lines changed: 62 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,39 @@
1-
import pickle
2-
from dataclasses import dataclass, field
31
from datetime import timedelta
2+
from decimal import Decimal
43
from enum import auto
4+
from typing import Any, Callable, Final
5+
from uuid import UUID
56

67
import arrow
8+
import umsgpack # type: ignore[import-untyped]
79
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
810
DynamicServiceStart,
911
)
1012
from models_library.projects import ProjectID
1113
from models_library.users import UserID
1214
from models_library.utils.enums import StrAutoEnum
15+
from pydantic import BaseModel, Field
1316
from servicelib.deferred_tasks import TaskUID
1417

18+
# `umsgpack.Ext`` extension types are part of the msgpack specification
19+
# allows to define serialization and deserialization rules for custom types
20+
# see https://github.com/msgpack/msgpack/blob/master/spec.md#extension-types
21+
22+
_UUID_TYPE: Final[int] = 0x00
23+
_DECIMAL_TYPE: Final[int] = 0x01
24+
25+
_PACKB_EXTENSION_TYPES: Final[dict[type[Any], Callable[[Any], umsgpack.Ext]]] = {
26+
# helpers to serialize an object to bytes
27+
UUID: lambda obj: umsgpack.Ext(_UUID_TYPE, obj.bytes),
28+
Decimal: lambda obj: umsgpack.Ext(_DECIMAL_TYPE, f"{obj}".encode()),
29+
}
30+
31+
_UNPACKB_EXTENSION_TYPES: Final[dict[int, Callable[[umsgpack.Ext], Any]]] = {
32+
# helpers to deserialize an object from bytes
33+
_UUID_TYPE: lambda ext: UUID(bytes=ext.data),
34+
_DECIMAL_TYPE: lambda ext: Decimal(ext.data.decode()),
35+
}
36+
1537

1638
class UserRequestedState(StrAutoEnum):
1739
RUNNING = auto()
@@ -35,74 +57,67 @@ class SchedulerServiceState(StrAutoEnum):
3557
UNKNOWN = auto()
3658

3759

38-
@dataclass
39-
class TrackedServiceModel: # pylint:disable=too-many-instance-attributes
60+
class TrackedServiceModel(BaseModel): # pylint:disable=too-many-instance-attributes
4061

41-
dynamic_service_start: DynamicServiceStart | None = field(
42-
metadata={
43-
"description": (
44-
"used to create the service in any given moment if the requested_state is RUNNING"
45-
"can be set to None only when stopping the service"
46-
)
47-
}
62+
dynamic_service_start: DynamicServiceStart | None = Field(
63+
description=(
64+
"used to create the service in any given moment if the requested_state is RUNNING"
65+
"can be set to None only when stopping the service"
66+
)
4867
)
4968

50-
user_id: UserID | None = field(
51-
metadata={
52-
"description": "required for propagating status changes to the frontend"
53-
}
69+
user_id: UserID | None = Field(
70+
description="required for propagating status changes to the frontend"
5471
)
55-
project_id: ProjectID | None = field(
56-
metadata={
57-
"description": "required for propagating status changes to the frontend"
58-
}
72+
project_id: ProjectID | None = Field(
73+
description="required for propagating status changes to the frontend"
5974
)
6075

61-
requested_state: UserRequestedState = field(
62-
metadata={
63-
"description": (
64-
"status of the service desidered by the user RUNNING or STOPPED"
65-
)
66-
}
76+
requested_state: UserRequestedState = Field(
77+
description=("status of the service desidered by the user RUNNING or STOPPED")
6778
)
6879

69-
current_state: SchedulerServiceState = field(
80+
current_state: SchedulerServiceState = Field(
7081
default=SchedulerServiceState.UNKNOWN,
71-
metadata={
72-
"description": "to set after parsing the incoming state via the API calls"
73-
},
82+
description="to set after parsing the incoming state via the API calls",
83+
)
84+
85+
def __setattr__(self, name, value):
86+
if name == "current_state" and value != self.current_state:
87+
self.last_state_change = arrow.utcnow().timestamp()
88+
super().__setattr__(name, value)
89+
90+
last_state_change: float = Field(
91+
default_factory=lambda: arrow.utcnow().timestamp(),
92+
metadata={"description": "keeps track when the current_state was last updated"},
7493
)
7594

7695
#############################
7796
### SERVICE STATUS UPDATE ###
7897
#############################
7998

80-
scheduled_to_run: bool = field(
99+
scheduled_to_run: bool = Field(
81100
default=False,
82-
metadata={"description": "set when a job will be immediately scheduled"},
101+
description="set when a job will be immediately scheduled",
83102
)
84103

85-
service_status: str = field(
104+
service_status: str = Field(
86105
default="",
87-
metadata={
88-
"description": "stored for debug mainly this is used to compute ``current_state``"
89-
},
106+
description="stored for debug mainly this is used to compute ``current_state``",
90107
)
91-
service_status_task_uid: TaskUID | None = field(
108+
service_status_task_uid: TaskUID | None = Field(
92109
default=None,
93-
metadata={"description": "uid of the job currently fetching the status"},
110+
description="uid of the job currently fetching the status",
94111
)
95112

96-
check_status_after: float = field(
113+
check_status_after: float = Field(
97114
default_factory=lambda: arrow.utcnow().timestamp(),
98-
metadata={"description": "used to determine when to poll the status again"},
115+
description="used to determine when to poll the status again",
99116
)
100117

101-
last_status_notification: float = field(
118+
last_status_notification: float = Field(
102119
default=0,
103-
metadata={
104-
"description": "used to determine when was the last time the status was notified"
105-
},
120+
description="used to determine when was the last time the status was notified",
106121
)
107122

108123
def set_check_status_after_to(self, delay_from_now: timedelta) -> None:
@@ -116,8 +131,10 @@ def set_last_status_notification_to_now(self) -> None:
116131
#####################
117132

118133
def to_bytes(self) -> bytes:
119-
return pickle.dumps(self)
134+
result: bytes = umsgpack.packb(self.dict(), ext_handlers=_PACKB_EXTENSION_TYPES)
135+
return result
120136

121137
@classmethod
122138
def from_bytes(cls, data: bytes) -> "TrackedServiceModel":
123-
return pickle.loads(data) # type: ignore # noqa: S301
139+
unpacked_data = umsgpack.unpackb(data, ext_handlers=_UNPACKB_EXTENSION_TYPES)
140+
return cls(**unpacked_data)

0 commit comments

Comments
 (0)