Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
61e7bb6
fix warning polution
sanderegg May 1, 2025
a53db9f
ensure comp run is marked as started
sanderegg May 1, 2025
87872c4
ensure comp_runs is set correctly
sanderegg May 1, 2025
0c94f09
refactor
sanderegg May 1, 2025
c174aea
clean
sanderegg May 1, 2025
f61b4e0
added RABBIT_MQ
sanderegg May 1, 2025
4a44811
refactor
sanderegg May 1, 2025
63d78ab
added rabbitmq client
sanderegg May 1, 2025
ecfca31
creating dask plugin
sanderegg May 1, 2025
b7c4097
clean
sanderegg May 1, 2025
756943f
renaming and testing
sanderegg May 1, 2025
f506ac3
rename files
sanderegg May 5, 2025
238ffde
refactor
sanderegg May 5, 2025
634fbb5
refactor
sanderegg May 5, 2025
aebe904
create message
sanderegg May 5, 2025
2a9ea35
simplify
sanderegg May 5, 2025
a3d577b
simplify
sanderegg May 5, 2025
1b0a967
done
sanderegg May 5, 2025
33a2e57
bootmode
sanderegg May 6, 2025
7fb3592
cleanup
sanderegg May 6, 2025
a380885
use construct
sanderegg May 6, 2025
9c617ad
removing TaskLogEvent
sanderegg May 6, 2025
d7d95aa
ruff
sanderegg May 6, 2025
732cbeb
not in main thread only
sanderegg May 6, 2025
2439aa7
use correct ENV
sanderegg May 6, 2025
b6b0331
tests are almost good to go
sanderegg May 6, 2025
919b236
ongoing
sanderegg May 6, 2025
52c7849
tests are passing
sanderegg May 6, 2025
2b6e739
moved utils to folder
sanderegg May 6, 2025
2e6463c
cleanup
sanderegg May 7, 2025
d144bf1
removed logs sub
sanderegg May 7, 2025
8257b7a
pylint
sanderegg May 7, 2025
8180e5e
sonar
sanderegg May 7, 2025
2c221c1
if available_space is not filled set to 0
sanderegg May 7, 2025
300d08d
rabbit mq must be in the network of the sidecar now
sanderegg May 7, 2025
9c0f019
ensure errors do not let the sidecar start
sanderegg May 7, 2025
4303d78
allow queueing of messages from any thread
sanderegg May 7, 2025
f278199
cleanup
sanderegg May 7, 2025
90af8e7
ongoing
sanderegg May 7, 2025
d0b1c4f
ensure we close the worker if the plugin cannot start
sanderegg May 8, 2025
c3b8bba
add some checks
sanderegg May 8, 2025
713c155
fixed tests
sanderegg May 8, 2025
f9e0896
mypy
sanderegg May 8, 2025
c8a3901
pylint
sanderegg May 8, 2025
82659c3
fixed test
sanderegg May 8, 2025
c720845
use tenacity instead of sleeps
sanderegg May 8, 2025
5d57751
move things around
sanderegg May 8, 2025
b261d5f
add rabbitmq for private clusters
sanderegg May 8, 2025
e4f2793
use single env
sanderegg May 8, 2025
7805c17
@copilot review: fix
sanderegg May 8, 2025
b72f719
pylint
sanderegg May 8, 2025
6bec045
pylint
sanderegg May 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import logging
from abc import ABC, abstractmethod
from typing import TypeAlias

import dask.typing
from distributed.worker import get_worker
Expand Down Expand Up @@ -85,48 +83,3 @@ def ensure_between_0_1(cls, v):
if 0 <= v <= 1:
return v
return min(max(0, v), 1)


LogMessageStr: TypeAlias = str
LogLevelInt: TypeAlias = int


class TaskLogEvent(BaseTaskEvent):
log: LogMessageStr
log_level: LogLevelInt

@staticmethod
def topic_name() -> str:
return "task_logs"

@classmethod
def from_dask_worker(
cls, log: str, log_level: LogLevelInt, *, task_owner: TaskOwner
) -> "TaskLogEvent":
worker = get_worker()
job_id = worker.get_current_task()
return cls(
job_id=_dask_key_to_dask_task_id(job_id),
log=log,
log_level=log_level,
task_owner=task_owner,
)

model_config = ConfigDict(
json_schema_extra={
"examples": [
{
"job_id": "simcore/services/comp/sleeper:1.1.0:projectid_ec7e595a-63ee-46a1-a04a-901b11b649f8:nodeid_39467d89-b659-4914-9359-c40b1b6d1d6d:uuid_5ee5c655-450d-4711-a3ec-32ffe16bc580",
"log": "some logs",
"log_level": logging.INFO,
"task_owner": {
"user_id": 32,
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
"parent_project_id": None,
"parent_node_id": None,
},
},
]
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,59 @@
StrictInt,
StrictStr,
)
from pydantic.config import JsonDict

TaskCancelEventName = "cancel_event_{}"


class PortSchema(BaseModel):
required: bool

@staticmethod
def _update_json_schema_extra(schema: JsonDict) -> None:
schema.update(
{
"examples": [
{
"required": True,
},
{
"required": False,
},
]
}
)

model_config = ConfigDict(
extra="forbid",
json_schema_extra={
"examples": [
{
"required": True,
},
{
"required": False,
},
]
},
json_schema_extra=_update_json_schema_extra,
)


class FilePortSchema(PortSchema):
mapping: str | None = None
url: AnyUrl

@staticmethod
def _update_json_schema_extra(schema: JsonDict) -> None:
schema.update(
{
"examples": [
{
"mapping": "some_filename.txt",
"url": "sftp://some_file_url",
"required": True,
},
{
"required": False,
"url": "s3://another_file_url",
},
]
}
)

model_config = ConfigDict(
json_schema_extra={
"examples": [
{
"mapping": "some_filename.txt",
"url": "sftp://some_file_url",
"required": True,
},
{
"required": False,
"url": "s3://another_file_url",
},
]
}
json_schema_extra=_update_json_schema_extra,
)


Expand All @@ -70,18 +83,27 @@ class FileUrl(BaseModel):
default=None, description="the file MIME type", pattern=MIME_TYPE_RE
)

@staticmethod
def _update_json_schema_extra(schema: JsonDict) -> None:
schema.update(
{
"examples": [
{
"url": "https://some_file_url",
"file_mime_type": "application/json",
},
{
"url": "https://some_file_url",
"file_mapping": "some_file_name.txt",
"file_mime_type": "application/json",
},
]
}
)

model_config = ConfigDict(
extra="forbid",
json_schema_extra={
"examples": [
{"url": "https://some_file_url", "file_mime_type": "application/json"},
{
"url": "https://some_file_url",
"file_mapping": "some_file_name.txt",
"file_mime_type": "application/json",
},
]
},
json_schema_extra=_update_json_schema_extra,
)


Expand All @@ -99,18 +121,24 @@ class FileUrl(BaseModel):


class TaskInputData(DictModel[ServicePortKey, PortValue]):
@staticmethod
def _update_json_schema_extra(schema: JsonDict) -> None:
schema.update(
{
"examples": [
{
"boolean_input": False,
"int_input": -45,
"float_input": 4564.45,
"string_input": "nobody thinks like a string",
"file_input": {"url": "s3://thatis_file_url"},
},
]
}
)

model_config = ConfigDict(
json_schema_extra={
"examples": [
{
"boolean_input": False,
"int_input": -45,
"float_input": 4564.45,
"string_input": "nobody thinks like a string",
"file_input": {"url": "s3://thatis_file_url"},
},
]
}
json_schema_extra=_update_json_schema_extra,
)


Expand All @@ -126,26 +154,32 @@ class TaskOutputDataSchema(DictModel[ServicePortKey, PortSchemaValue]):
# does not work well in that case. For that reason, the schema is
# sent as a json-schema instead of with a dynamically-created model class
#
model_config = ConfigDict(
json_schema_extra={
"examples": [
{
"boolean_output": {"required": False},
"int_output": {"required": True},
"float_output": {"required": True},
"string_output": {"required": False},
"file_output": {
"required": True,
"url": "https://some_file_url",
"mapping": "the_output_filename",
},
"optional_file_output": {
"required": False,
"url": "s3://one_file_url",
@staticmethod
def _update_json_schema_extra(schema: JsonDict) -> None:
schema.update(
{
"examples": [
{
"boolean_output": {"required": False},
"int_output": {"required": True},
"float_output": {"required": True},
"string_output": {"required": False},
"file_output": {
"required": True,
"url": "https://some_file_url",
"mapping": "the_output_filename",
},
"optional_file_output": {
"required": False,
"url": "s3://one_file_url",
},
},
},
]
}
]
}
)

model_config = ConfigDict(
json_schema_extra=_update_json_schema_extra,
)


Expand Down Expand Up @@ -181,16 +215,20 @@ def from_task_output(

return cls.model_validate(data)

model_config = ConfigDict(
json_schema_extra={
"examples": [
{
"boolean_output": False,
"int_output": -45,
"float_output": 4564.45,
"string_output": "nobody thinks like a string",
"file_output": {"url": "s3://yet_another_file_url"},
},
]
}
)
@staticmethod
def _update_json_schema_extra(schema: JsonDict) -> None:
schema.update(
{
"examples": [
{
"boolean_output": False,
"int_output": -45,
"float_output": 4564.45,
"string_output": "nobody thinks like a string",
"file_output": {"url": "s3://yet_another_file_url"},
},
]
}
)

model_config = ConfigDict(json_schema_extra=_update_json_schema_extra)
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from models_library.services_resources import BootMode
from models_library.users import UserID
from pydantic import AnyUrl, BaseModel, ConfigDict, model_validator
from pydantic.config import JsonDict
from settings_library.s3 import S3Settings

from .docker import DockerBasicAuth
Expand Down Expand Up @@ -44,25 +45,31 @@ def check_parent_valid(cls, values: dict[str, Any]) -> dict[str, Any]:
raise ValueError(msg)
return values

@staticmethod
def _update_json_schema_extra(schema: JsonDict) -> None:
schema.update(
{
"examples": [
{
"user_id": 32,
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
"parent_project_id": None,
"parent_node_id": None,
},
{
"user_id": 32,
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
"parent_project_id": "887e595a-63ee-46a1-a04a-901b11b649f8",
"parent_node_id": "aa467d89-b659-4914-9359-c40b1b6d1d6d",
},
]
}
)

model_config = ConfigDict(
json_schema_extra={
"examples": [
{
"user_id": 32,
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
"parent_project_id": None,
"parent_node_id": None,
},
{
"user_id": 32,
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
"parent_project_id": "887e595a-63ee-46a1-a04a-901b11b649f8",
"parent_node_id": "aa467d89-b659-4914-9359-c40b1b6d1d6d",
},
]
}
json_schema_extra=_update_json_schema_extra,
)


Expand All @@ -83,13 +90,15 @@ class ContainerTaskParameters(BaseModel):
{
"image": "ubuntu",
"tag": "latest",
"input_data": TaskInputData.model_config["json_schema_extra"]["examples"][0], # type: ignore[index]
"output_data_keys": TaskOutputDataSchema.model_config["json_schema_extra"]["examples"][0], # type: ignore[index]
"input_data": TaskInputData.model_json_schema()["examples"][0],
"output_data_keys": TaskOutputDataSchema.model_json_schema()[
"examples"
][0],
"command": ["sleep 10", "echo hello"],
"envs": {"MYENV": "is an env"},
"labels": {"io.simcore.thelabel": "is amazing"},
"boot_mode": BootMode.CPU.value,
"task_owner": TaskOwner.model_config["json_schema_extra"]["examples"][0], # type: ignore[index]
"task_owner": TaskOwner.model_json_schema()["examples"][0],
},
]
}
Expand All @@ -104,5 +113,4 @@ def __call__(
docker_auth: DockerBasicAuth,
log_file_url: LogFileUploadURL,
s3_settings: S3Settings | None,
) -> TaskOutputData:
...
) -> TaskOutputData: ...
Loading
Loading