Skip to content

Commit 1a721f8

Browse files
authored
🐛🎨Do not fail a pipeline when the clusters-keeper or the computational backend in general is not reachable for short time 🚨 (#8286)
1 parent c8f61c1 commit 1a721f8

File tree

8 files changed

+397
-228
lines changed

8 files changed

+397
-228
lines changed

services/director-v2/src/simcore_service_director_v2/core/settings.py

Lines changed: 158 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from functools import cached_property
77
from typing import Annotated, cast
88

9+
from common_library.basic_types import DEFAULT_FACTORY
910
from common_library.pydantic_validators import validate_numeric_string_as_timedelta
1011
from fastapi import FastAPI
1112
from models_library.basic_types import LogLevel, PortInt
@@ -50,38 +51,53 @@
5051

5152

5253
class ComputationalBackendSettings(BaseCustomSettings):
53-
COMPUTATIONAL_BACKEND_ENABLED: bool = Field(
54-
default=True,
55-
)
56-
COMPUTATIONAL_BACKEND_SCHEDULING_CONCURRENCY: PositiveInt = Field(
57-
default=50,
58-
description="defines how many pipelines the application can schedule concurrently",
59-
)
60-
COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED: bool = Field(
61-
default=True,
62-
)
63-
COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL: AnyUrl = Field(
64-
...,
65-
description="This is the cluster that will be used by default"
66-
" when submitting computational services (typically "
67-
"tcp://dask-scheduler:8786, tls://dask-scheduler:8786 for the internal cluster",
68-
)
69-
COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH: ClusterAuthentication = Field(
70-
default=...,
71-
description="this is the cluster authentication that will be used by default",
72-
)
73-
COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_FILE_LINK_TYPE: FileLinkType = Field(
74-
FileLinkType.S3,
75-
description=f"Default file link type to use with the internal cluster '{list(FileLinkType)}'",
76-
)
77-
COMPUTATIONAL_BACKEND_DEFAULT_FILE_LINK_TYPE: FileLinkType = Field(
78-
FileLinkType.PRESIGNED,
79-
description=f"Default file link type to use with computational backend '{list(FileLinkType)}'",
80-
)
81-
COMPUTATIONAL_BACKEND_ON_DEMAND_CLUSTERS_FILE_LINK_TYPE: FileLinkType = Field(
82-
FileLinkType.PRESIGNED,
83-
description=f"Default file link type to use with computational backend on-demand clusters '{list(FileLinkType)}'",
84-
)
54+
COMPUTATIONAL_BACKEND_ENABLED: bool = True
55+
COMPUTATIONAL_BACKEND_SCHEDULING_CONCURRENCY: Annotated[
56+
PositiveInt,
57+
Field(
58+
description="defines how many pipelines the application can schedule concurrently"
59+
),
60+
] = 50
61+
COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED: bool = True
62+
COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL: Annotated[
63+
AnyUrl,
64+
Field(
65+
description="This is the cluster that will be used by default"
66+
" when submitting computational services (typically "
67+
"tcp://dask-scheduler:8786, tls://dask-scheduler:8786 for the internal cluster",
68+
),
69+
]
70+
COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH: Annotated[
71+
ClusterAuthentication,
72+
Field(
73+
description="this is the cluster authentication that will be used by default"
74+
),
75+
]
76+
COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_FILE_LINK_TYPE: Annotated[
77+
FileLinkType,
78+
Field(
79+
description=f"Default file link type to use with the internal cluster '{list(FileLinkType)}'"
80+
),
81+
] = FileLinkType.S3
82+
COMPUTATIONAL_BACKEND_DEFAULT_FILE_LINK_TYPE: Annotated[
83+
FileLinkType,
84+
Field(
85+
description=f"Default file link type to use with computational backend '{list(FileLinkType)}'"
86+
),
87+
] = FileLinkType.PRESIGNED
88+
COMPUTATIONAL_BACKEND_ON_DEMAND_CLUSTERS_FILE_LINK_TYPE: Annotated[
89+
FileLinkType,
90+
Field(
91+
description=f"Default file link type to use with computational backend on-demand clusters '{list(FileLinkType)}'"
92+
),
93+
] = FileLinkType.PRESIGNED
94+
COMPUTATIONAL_BACKEND_MAX_WAITING_FOR_CLUSTER_TIMEOUT: Annotated[
95+
datetime.timedelta,
96+
Field(
97+
description="maximum time a pipeline can wait for a cluster to start"
98+
"(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formatting)."
99+
),
100+
] = datetime.timedelta(minutes=10)
85101

86102
@cached_property
87103
def default_cluster(self) -> BaseCluster:
@@ -111,91 +127,107 @@ class AppSettings(BaseApplicationSettings, MixinLoggingSettings):
111127
),
112128
] = LogLevel.INFO
113129

114-
DIRECTOR_V2_LOG_FORMAT_LOCAL_DEV_ENABLED: bool = Field(
115-
default=False,
116-
validation_alias=AliasChoices(
117-
"DIRECTOR_V2_LOG_FORMAT_LOCAL_DEV_ENABLED",
118-
"LOG_FORMAT_LOCAL_DEV_ENABLED",
130+
DIRECTOR_V2_LOG_FORMAT_LOCAL_DEV_ENABLED: Annotated[
131+
bool,
132+
Field(
133+
validation_alias=AliasChoices(
134+
"DIRECTOR_V2_LOG_FORMAT_LOCAL_DEV_ENABLED",
135+
"LOG_FORMAT_LOCAL_DEV_ENABLED",
136+
),
137+
description="Enables local development log format. WARNING: make sure it is disabled if you want to have structured logs!",
119138
),
120-
description="Enables local development log format. WARNING: make sure it is disabled if you want to have structured logs!",
121-
)
122-
DIRECTOR_V2_LOG_FILTER_MAPPING: dict[LoggerName, list[MessageSubstring]] = Field(
123-
default_factory=dict,
124-
validation_alias=AliasChoices(
125-
"DIRECTOR_V2_LOG_FILTER_MAPPING", "LOG_FILTER_MAPPING"
139+
] = False
140+
DIRECTOR_V2_LOG_FILTER_MAPPING: Annotated[
141+
dict[LoggerName, list[MessageSubstring]],
142+
Field(
143+
default_factory=dict,
144+
validation_alias=AliasChoices(
145+
"DIRECTOR_V2_LOG_FILTER_MAPPING", "LOG_FILTER_MAPPING"
146+
),
147+
description="is a dictionary that maps specific loggers (such as 'uvicorn.access' or 'gunicorn.access') to a list of log message patterns that should be filtered out.",
126148
),
127-
description="is a dictionary that maps specific loggers (such as 'uvicorn.access' or 'gunicorn.access') to a list of log message patterns that should be filtered out.",
128-
)
149+
] = DEFAULT_FACTORY
129150
DIRECTOR_V2_DEV_FEATURES_ENABLED: bool = False
130151

131-
DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED: bool = Field(
132-
default=False,
133-
description=(
134-
"Under development feature. If enabled state "
135-
"is saved using rclone docker volumes."
152+
DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED: Annotated[
153+
bool,
154+
Field(
155+
description=(
156+
"Under development feature. If enabled state "
157+
"is saved using rclone docker volumes."
158+
)
136159
),
137-
)
160+
] = False
138161

139162
# for passing self-signed certificate to spawned services
140-
DIRECTOR_V2_SELF_SIGNED_SSL_SECRET_ID: str = Field(
141-
default="",
142-
description="ID of the docker secret containing the self-signed certificate",
143-
)
144-
DIRECTOR_V2_SELF_SIGNED_SSL_SECRET_NAME: str = Field(
145-
default="",
146-
description="Name of the docker secret containing the self-signed certificate",
147-
)
148-
DIRECTOR_V2_SELF_SIGNED_SSL_FILENAME: str = Field(
149-
default="",
150-
description="Filepath to self-signed osparc.crt file *as mounted inside the container*, empty strings disables it",
151-
)
163+
DIRECTOR_V2_SELF_SIGNED_SSL_SECRET_ID: Annotated[
164+
str,
165+
Field(
166+
description="ID of the docker secret containing the self-signed certificate"
167+
),
168+
] = ""
169+
DIRECTOR_V2_SELF_SIGNED_SSL_SECRET_NAME: Annotated[
170+
str,
171+
Field(
172+
description="Name of the docker secret containing the self-signed certificate"
173+
),
174+
] = ""
175+
DIRECTOR_V2_SELF_SIGNED_SSL_FILENAME: Annotated[
176+
str,
177+
Field(
178+
description="Filepath to self-signed osparc.crt file *as mounted inside the container*, empty strings disables it"
179+
),
180+
] = ""
152181
DIRECTOR_V2_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True
153182
DIRECTOR_V2_PROFILING: bool = False
154183

155-
DIRECTOR_V2_REMOTE_DEBUGGING_PORT: PortInt | None = Field(default=None)
184+
DIRECTOR_V2_REMOTE_DEBUGGING_PORT: PortInt | None = None
156185

157186
# extras
158-
SWARM_STACK_NAME: str = Field(default="undefined-please-check")
159-
SERVICE_TRACKING_HEARTBEAT: datetime.timedelta = Field(
160-
default=DEFAULT_RESOURCE_USAGE_HEARTBEAT_INTERVAL,
161-
description="Service scheduler heartbeat (everytime a heartbeat is sent into RabbitMQ)"
162-
" (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)",
163-
)
187+
SWARM_STACK_NAME: str = "undefined-please-check"
188+
SERVICE_TRACKING_HEARTBEAT: Annotated[
189+
datetime.timedelta,
190+
Field(
191+
description="Service scheduler heartbeat (everytime a heartbeat is sent into RabbitMQ)"
192+
" (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)"
193+
),
194+
] = DEFAULT_RESOURCE_USAGE_HEARTBEAT_INTERVAL
164195

165-
SIMCORE_SERVICES_NETWORK_NAME: str | None = Field(
166-
default=None,
167-
description="used to find the right network name",
168-
)
169-
SIMCORE_SERVICES_PREFIX: str | None = Field(
170-
"simcore/services",
171-
description="useful when developing with an alternative registry namespace",
172-
)
196+
SIMCORE_SERVICES_NETWORK_NAME: Annotated[
197+
str | None, Field(description="used to find the right network name")
198+
] = None
199+
SIMCORE_SERVICES_PREFIX: Annotated[
200+
str | None,
201+
Field(
202+
description="useful when developing with an alternative registry namespace"
203+
),
204+
] = "simcore/services"
173205

174-
DIRECTOR_V2_NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS: NonNegativeInt = Field(
175-
default=NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS_DEFAULT_VALUE,
176-
description="forwarded to sidecars which use nodeports",
177-
)
206+
DIRECTOR_V2_NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS: Annotated[
207+
NonNegativeInt, Field(description="forwarded to sidecars which use nodeports")
208+
] = NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS_DEFAULT_VALUE
178209

179210
# debug settings
180-
CLIENT_REQUEST: ClientRequestSettings = Field(
181-
json_schema_extra={"auto_default_from_env": True}
182-
)
211+
CLIENT_REQUEST: Annotated[
212+
ClientRequestSettings, Field(json_schema_extra={"auto_default_from_env": True})
213+
] = DEFAULT_FACTORY
183214

184215
# App modules settings ---------------------
185216
DIRECTOR_V2_STORAGE: Annotated[
186217
StorageSettings, Field(json_schema_extra={"auto_default_from_env": True})
187218
]
188-
DIRECTOR_V2_NODE_PORTS_STORAGE_AUTH: StorageAuthSettings | None = Field(
189-
json_schema_extra={"auto_default_from_env": True}
190-
)
219+
DIRECTOR_V2_NODE_PORTS_STORAGE_AUTH: Annotated[
220+
StorageAuthSettings | None,
221+
Field(json_schema_extra={"auto_default_from_env": True}),
222+
] = None
191223

192224
DIRECTOR_V2_CATALOG: Annotated[
193225
CatalogSettings | None, Field(json_schema_extra={"auto_default_from_env": True})
194226
]
195227

196-
DIRECTOR_V0: DirectorV0Settings = Field(
197-
json_schema_extra={"auto_default_from_env": True}
198-
)
228+
DIRECTOR_V0: Annotated[
229+
DirectorV0Settings, Field(json_schema_extra={"auto_default_from_env": True})
230+
] = DEFAULT_FACTORY
199231

200232
DYNAMIC_SERVICES: Annotated[
201233
DynamicServicesSettings,
@@ -206,35 +238,47 @@ class AppSettings(BaseApplicationSettings, MixinLoggingSettings):
206238
PostgresSettings, Field(json_schema_extra={"auto_default_from_env": True})
207239
]
208240

209-
REDIS: RedisSettings = Field(json_schema_extra={"auto_default_from_env": True})
241+
REDIS: Annotated[
242+
RedisSettings, Field(json_schema_extra={"auto_default_from_env": True})
243+
] = DEFAULT_FACTORY
210244

211-
DIRECTOR_V2_RABBITMQ: RabbitSettings = Field(
212-
json_schema_extra={"auto_default_from_env": True}
213-
)
245+
DIRECTOR_V2_RABBITMQ: Annotated[
246+
RabbitSettings, Field(json_schema_extra={"auto_default_from_env": True})
247+
] = DEFAULT_FACTORY
214248

215-
TRAEFIK_SIMCORE_ZONE: str = Field("internal_simcore_stack")
249+
TRAEFIK_SIMCORE_ZONE: str = "internal_simcore_stack"
216250

217-
DIRECTOR_V2_COMPUTATIONAL_BACKEND: ComputationalBackendSettings = Field(
218-
json_schema_extra={"auto_default_from_env": True}
219-
)
251+
DIRECTOR_V2_COMPUTATIONAL_BACKEND: Annotated[
252+
ComputationalBackendSettings,
253+
Field(json_schema_extra={"auto_default_from_env": True}),
254+
] = DEFAULT_FACTORY
220255

221-
DIRECTOR_V2_DOCKER_REGISTRY: RegistrySettings = Field(
222-
json_schema_extra={"auto_default_from_env": True},
223-
description="settings for the private registry deployed with the platform",
224-
)
225-
DIRECTOR_V2_DOCKER_HUB_REGISTRY: RegistrySettings | None = Field(
226-
default=None, description="public DockerHub registry settings"
227-
)
256+
DIRECTOR_V2_DOCKER_REGISTRY: Annotated[
257+
RegistrySettings,
258+
Field(
259+
json_schema_extra={"auto_default_from_env": True},
260+
description="settings for the private registry deployed with the platform",
261+
),
262+
] = DEFAULT_FACTORY
263+
DIRECTOR_V2_DOCKER_HUB_REGISTRY: Annotated[
264+
RegistrySettings | None, Field(description="public DockerHub registry settings")
265+
] = None
228266

229-
DIRECTOR_V2_RESOURCE_USAGE_TRACKER: ResourceUsageTrackerSettings = Field(
230-
json_schema_extra={"auto_default_from_env": True},
231-
description="resource usage tracker service client's plugin",
232-
)
267+
DIRECTOR_V2_RESOURCE_USAGE_TRACKER: Annotated[
268+
ResourceUsageTrackerSettings,
269+
Field(
270+
json_schema_extra={"auto_default_from_env": True},
271+
description="resource usage tracker service client's plugin",
272+
),
273+
] = DEFAULT_FACTORY
233274

234-
DIRECTOR_V2_TRACING: TracingSettings | None = Field(
235-
json_schema_extra={"auto_default_from_env": True},
236-
description="settings for opentelemetry tracing",
237-
)
275+
DIRECTOR_V2_TRACING: Annotated[
276+
TracingSettings | None,
277+
Field(
278+
json_schema_extra={"auto_default_from_env": True},
279+
description="settings for opentelemetry tracing",
280+
),
281+
] = None
238282

239283
@field_validator("LOG_LEVEL", mode="before")
240284
@classmethod

services/director-v2/src/simcore_service_director_v2/models/comp_tasks.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,3 +276,15 @@ class ComputationTaskForRpcDBGet(BaseModel):
276276
image: dict[str, Any]
277277
started_at: dt.datetime | None
278278
ended_at: dt.datetime | None
279+
280+
@field_validator("state", mode="before")
281+
@classmethod
282+
def _convert_from_state_type_enum_if_needed(cls, v):
283+
if isinstance(v, str):
284+
# try to convert to a StateType, if it fails the validations will continue
285+
# and pydantic will try to convert it to a RunninState later on
286+
with suppress(ValueError):
287+
v = StateType(v)
288+
if isinstance(v, StateType):
289+
return RunningState(DB_TO_RUNNING_STATE[v])
290+
return v

0 commit comments

Comments
 (0)