Skip to content

Commit 25ce7b5

Browse files
authored
♻️✨🐛Dask-Sidecar: add RabbitMQ dependency and remove usage of deprecated Pub/Sub for logs 🚨🚨🚨 (#7621)
1 parent 6b53689 commit 25ce7b5

File tree

46 files changed

+805
-468
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+805
-468
lines changed

packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import logging
21
from abc import ABC, abstractmethod
3-
from typing import TypeAlias
42

53
import dask.typing
64
from distributed.worker import get_worker
@@ -85,48 +83,3 @@ def ensure_between_0_1(cls, v):
8583
if 0 <= v <= 1:
8684
return v
8785
return min(max(0, v), 1)
88-
89-
90-
LogMessageStr: TypeAlias = str
91-
LogLevelInt: TypeAlias = int
92-
93-
94-
class TaskLogEvent(BaseTaskEvent):
95-
log: LogMessageStr
96-
log_level: LogLevelInt
97-
98-
@staticmethod
99-
def topic_name() -> str:
100-
return "task_logs"
101-
102-
@classmethod
103-
def from_dask_worker(
104-
cls, log: str, log_level: LogLevelInt, *, task_owner: TaskOwner
105-
) -> "TaskLogEvent":
106-
worker = get_worker()
107-
job_id = worker.get_current_task()
108-
return cls(
109-
job_id=_dask_key_to_dask_task_id(job_id),
110-
log=log,
111-
log_level=log_level,
112-
task_owner=task_owner,
113-
)
114-
115-
model_config = ConfigDict(
116-
json_schema_extra={
117-
"examples": [
118-
{
119-
"job_id": "simcore/services/comp/sleeper:1.1.0:projectid_ec7e595a-63ee-46a1-a04a-901b11b649f8:nodeid_39467d89-b659-4914-9359-c40b1b6d1d6d:uuid_5ee5c655-450d-4711-a3ec-32ffe16bc580",
120-
"log": "some logs",
121-
"log_level": logging.INFO,
122-
"task_owner": {
123-
"user_id": 32,
124-
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
125-
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
126-
"parent_project_id": None,
127-
"parent_node_id": None,
128-
},
129-
},
130-
]
131-
}
132-
)

packages/dask-task-models-library/src/dask_task_models_library/container_tasks/io.py

Lines changed: 114 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -17,46 +17,59 @@
1717
StrictInt,
1818
StrictStr,
1919
)
20+
from pydantic.config import JsonDict
2021

2122
TaskCancelEventName = "cancel_event_{}"
2223

2324

2425
class PortSchema(BaseModel):
2526
required: bool
2627

28+
@staticmethod
29+
def _update_json_schema_extra(schema: JsonDict) -> None:
30+
schema.update(
31+
{
32+
"examples": [
33+
{
34+
"required": True,
35+
},
36+
{
37+
"required": False,
38+
},
39+
]
40+
}
41+
)
42+
2743
model_config = ConfigDict(
2844
extra="forbid",
29-
json_schema_extra={
30-
"examples": [
31-
{
32-
"required": True,
33-
},
34-
{
35-
"required": False,
36-
},
37-
]
38-
},
45+
json_schema_extra=_update_json_schema_extra,
3946
)
4047

4148

4249
class FilePortSchema(PortSchema):
4350
mapping: str | None = None
4451
url: AnyUrl
4552

53+
@staticmethod
54+
def _update_json_schema_extra(schema: JsonDict) -> None:
55+
schema.update(
56+
{
57+
"examples": [
58+
{
59+
"mapping": "some_filename.txt",
60+
"url": "sftp://some_file_url",
61+
"required": True,
62+
},
63+
{
64+
"required": False,
65+
"url": "s3://another_file_url",
66+
},
67+
]
68+
}
69+
)
70+
4671
model_config = ConfigDict(
47-
json_schema_extra={
48-
"examples": [
49-
{
50-
"mapping": "some_filename.txt",
51-
"url": "sftp://some_file_url",
52-
"required": True,
53-
},
54-
{
55-
"required": False,
56-
"url": "s3://another_file_url",
57-
},
58-
]
59-
}
72+
json_schema_extra=_update_json_schema_extra,
6073
)
6174

6275

@@ -70,18 +83,27 @@ class FileUrl(BaseModel):
7083
default=None, description="the file MIME type", pattern=MIME_TYPE_RE
7184
)
7285

86+
@staticmethod
87+
def _update_json_schema_extra(schema: JsonDict) -> None:
88+
schema.update(
89+
{
90+
"examples": [
91+
{
92+
"url": "https://some_file_url",
93+
"file_mime_type": "application/json",
94+
},
95+
{
96+
"url": "https://some_file_url",
97+
"file_mapping": "some_file_name.txt",
98+
"file_mime_type": "application/json",
99+
},
100+
]
101+
}
102+
)
103+
73104
model_config = ConfigDict(
74105
extra="forbid",
75-
json_schema_extra={
76-
"examples": [
77-
{"url": "https://some_file_url", "file_mime_type": "application/json"},
78-
{
79-
"url": "https://some_file_url",
80-
"file_mapping": "some_file_name.txt",
81-
"file_mime_type": "application/json",
82-
},
83-
]
84-
},
106+
json_schema_extra=_update_json_schema_extra,
85107
)
86108

87109

@@ -99,18 +121,24 @@ class FileUrl(BaseModel):
99121

100122

101123
class TaskInputData(DictModel[ServicePortKey, PortValue]):
124+
@staticmethod
125+
def _update_json_schema_extra(schema: JsonDict) -> None:
126+
schema.update(
127+
{
128+
"examples": [
129+
{
130+
"boolean_input": False,
131+
"int_input": -45,
132+
"float_input": 4564.45,
133+
"string_input": "nobody thinks like a string",
134+
"file_input": {"url": "s3://thatis_file_url"},
135+
},
136+
]
137+
}
138+
)
139+
102140
model_config = ConfigDict(
103-
json_schema_extra={
104-
"examples": [
105-
{
106-
"boolean_input": False,
107-
"int_input": -45,
108-
"float_input": 4564.45,
109-
"string_input": "nobody thinks like a string",
110-
"file_input": {"url": "s3://thatis_file_url"},
111-
},
112-
]
113-
}
141+
json_schema_extra=_update_json_schema_extra,
114142
)
115143

116144

@@ -126,26 +154,32 @@ class TaskOutputDataSchema(DictModel[ServicePortKey, PortSchemaValue]):
126154
# does not work well in that case. For that reason, the schema is
127155
# sent as a json-schema instead of with a dynamically-created model class
128156
#
129-
model_config = ConfigDict(
130-
json_schema_extra={
131-
"examples": [
132-
{
133-
"boolean_output": {"required": False},
134-
"int_output": {"required": True},
135-
"float_output": {"required": True},
136-
"string_output": {"required": False},
137-
"file_output": {
138-
"required": True,
139-
"url": "https://some_file_url",
140-
"mapping": "the_output_filename",
141-
},
142-
"optional_file_output": {
143-
"required": False,
144-
"url": "s3://one_file_url",
157+
@staticmethod
158+
def _update_json_schema_extra(schema: JsonDict) -> None:
159+
schema.update(
160+
{
161+
"examples": [
162+
{
163+
"boolean_output": {"required": False},
164+
"int_output": {"required": True},
165+
"float_output": {"required": True},
166+
"string_output": {"required": False},
167+
"file_output": {
168+
"required": True,
169+
"url": "https://some_file_url",
170+
"mapping": "the_output_filename",
171+
},
172+
"optional_file_output": {
173+
"required": False,
174+
"url": "s3://one_file_url",
175+
},
145176
},
146-
},
147-
]
148-
}
177+
]
178+
}
179+
)
180+
181+
model_config = ConfigDict(
182+
json_schema_extra=_update_json_schema_extra,
149183
)
150184

151185

@@ -181,16 +215,20 @@ def from_task_output(
181215

182216
return cls.model_validate(data)
183217

184-
model_config = ConfigDict(
185-
json_schema_extra={
186-
"examples": [
187-
{
188-
"boolean_output": False,
189-
"int_output": -45,
190-
"float_output": 4564.45,
191-
"string_output": "nobody thinks like a string",
192-
"file_output": {"url": "s3://yet_another_file_url"},
193-
},
194-
]
195-
}
196-
)
218+
@staticmethod
219+
def _update_json_schema_extra(schema: JsonDict) -> None:
220+
schema.update(
221+
{
222+
"examples": [
223+
{
224+
"boolean_output": False,
225+
"int_output": -45,
226+
"float_output": 4564.45,
227+
"string_output": "nobody thinks like a string",
228+
"file_output": {"url": "s3://yet_another_file_url"},
229+
},
230+
]
231+
}
232+
)
233+
234+
model_config = ConfigDict(json_schema_extra=_update_json_schema_extra)

packages/dask-task-models-library/src/dask_task_models_library/container_tasks/protocol.py

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from models_library.services_resources import BootMode
88
from models_library.users import UserID
99
from pydantic import AnyUrl, BaseModel, ConfigDict, model_validator
10+
from pydantic.config import JsonDict
1011
from settings_library.s3 import S3Settings
1112

1213
from .docker import DockerBasicAuth
@@ -44,25 +45,31 @@ def check_parent_valid(cls, values: dict[str, Any]) -> dict[str, Any]:
4445
raise ValueError(msg)
4546
return values
4647

48+
@staticmethod
49+
def _update_json_schema_extra(schema: JsonDict) -> None:
50+
schema.update(
51+
{
52+
"examples": [
53+
{
54+
"user_id": 32,
55+
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
56+
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
57+
"parent_project_id": None,
58+
"parent_node_id": None,
59+
},
60+
{
61+
"user_id": 32,
62+
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
63+
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
64+
"parent_project_id": "887e595a-63ee-46a1-a04a-901b11b649f8",
65+
"parent_node_id": "aa467d89-b659-4914-9359-c40b1b6d1d6d",
66+
},
67+
]
68+
}
69+
)
70+
4771
model_config = ConfigDict(
48-
json_schema_extra={
49-
"examples": [
50-
{
51-
"user_id": 32,
52-
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
53-
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
54-
"parent_project_id": None,
55-
"parent_node_id": None,
56-
},
57-
{
58-
"user_id": 32,
59-
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
60-
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
61-
"parent_project_id": "887e595a-63ee-46a1-a04a-901b11b649f8",
62-
"parent_node_id": "aa467d89-b659-4914-9359-c40b1b6d1d6d",
63-
},
64-
]
65-
}
72+
json_schema_extra=_update_json_schema_extra,
6673
)
6774

6875

@@ -83,13 +90,15 @@ class ContainerTaskParameters(BaseModel):
8390
{
8491
"image": "ubuntu",
8592
"tag": "latest",
86-
"input_data": TaskInputData.model_config["json_schema_extra"]["examples"][0], # type: ignore[index]
87-
"output_data_keys": TaskOutputDataSchema.model_config["json_schema_extra"]["examples"][0], # type: ignore[index]
93+
"input_data": TaskInputData.model_json_schema()["examples"][0],
94+
"output_data_keys": TaskOutputDataSchema.model_json_schema()[
95+
"examples"
96+
][0],
8897
"command": ["sleep 10", "echo hello"],
8998
"envs": {"MYENV": "is an env"},
9099
"labels": {"io.simcore.thelabel": "is amazing"},
91100
"boot_mode": BootMode.CPU.value,
92-
"task_owner": TaskOwner.model_config["json_schema_extra"]["examples"][0], # type: ignore[index]
101+
"task_owner": TaskOwner.model_json_schema()["examples"][0],
93102
},
94103
]
95104
}
@@ -104,5 +113,4 @@ def __call__(
104113
docker_auth: DockerBasicAuth,
105114
log_file_url: LogFileUploadURL,
106115
s3_settings: S3Settings | None,
107-
) -> TaskOutputData:
108-
...
116+
) -> TaskOutputData: ...

0 commit comments

Comments
 (0)