|
13 | 13 | from fastapi import FastAPI, HTTPException |
14 | 14 | from models_library.services_metadata_published import ServiceMetaDataPublished |
15 | 15 | from models_library.services_types import ServiceKey, ServiceVersion |
| 16 | +from pydantic import NonNegativeInt |
16 | 17 | from servicelib.fastapi.tracing import setup_httpx_client_tracing |
17 | 18 | from servicelib.logging_utils import log_context |
18 | 19 | from starlette import status |
|
26 | 27 |
|
27 | 28 | _logger = logging.getLogger(__name__) |
28 | 29 |
|
29 | | -MINUTE = 60 |
| 30 | +_MINUTE: Final[NonNegativeInt] = 60 |
| 31 | + |
| 32 | + |
| 33 | +_SERVICE_RUNTIME_SETTINGS: Final[str] = "simcore.service.settings" |
| 34 | +_ORG_LABELS_TO_SCHEMA_LABELS: Final[dict[str, str]] = { |
| 35 | + "org.label-schema.build-date": "build_date", |
| 36 | + "org.label-schema.vcs-ref": "vcs_ref", |
| 37 | + "org.label-schema.vcs-url": "vcs_url", |
| 38 | +} |
| 39 | + |
| 40 | +_CONTAINER_SPEC_ENTRY_NAME = "ContainerSpec".lower() |
| 41 | +_RESOURCES_ENTRY_NAME = "Resources".lower() |
| 42 | + |
| 43 | + |
| 44 | +def _validate_kind(entry_to_validate: dict[str, Any], kind_name: str): |
| 45 | + for element in ( |
| 46 | + entry_to_validate.get("value", {}) |
| 47 | + .get("Reservations", {}) |
| 48 | + .get("GenericResources", []) |
| 49 | + ): |
| 50 | + if element.get("DiscreteResourceSpec", {}).get("Kind") == kind_name: |
| 51 | + return True |
| 52 | + return False |
| 53 | + |
30 | 54 |
|
31 | 55 | _director_startup_retry_policy: dict[str, Any] = { |
32 | 56 | # Random service startup order in swarm. |
33 | 57 | # wait_random prevents saturating other services while startup |
34 | 58 | # |
35 | 59 | "wait": wait_random(2, 5), |
36 | | - "stop": stop_after_delay(2 * MINUTE), |
| 60 | + "stop": stop_after_delay(2 * _MINUTE), |
37 | 61 | "before_sleep": before_sleep_log(_logger, logging.WARNING), |
38 | 62 | "reraise": True, |
39 | 63 | } |
@@ -100,114 +124,6 @@ async def request_wrapper( |
100 | 124 | return request_wrapper |
101 | 125 |
|
102 | 126 |
|
103 | | -_SERVICE_RUNTIME_SETTINGS: Final[str] = "simcore.service.settings" |
104 | | -_ORG_LABELS_TO_SCHEMA_LABELS: Final[dict[str, str]] = { |
105 | | - "org.label-schema.build-date": "build_date", |
106 | | - "org.label-schema.vcs-ref": "vcs_ref", |
107 | | - "org.label-schema.vcs-url": "vcs_url", |
108 | | -} |
109 | | - |
110 | | -_CONTAINER_SPEC_ENTRY_NAME = "ContainerSpec".lower() |
111 | | -_RESOURCES_ENTRY_NAME = "Resources".lower() |
112 | | - |
113 | | - |
114 | | -def _validate_kind(entry_to_validate: dict[str, Any], kind_name: str): |
115 | | - for element in ( |
116 | | - entry_to_validate.get("value", {}) |
117 | | - .get("Reservations", {}) |
118 | | - .get("GenericResources", []) |
119 | | - ): |
120 | | - if element.get("DiscreteResourceSpec", {}).get("Kind") == kind_name: |
121 | | - return True |
122 | | - return False |
123 | | - |
124 | | - |
125 | | -async def _get_service_extras( |
126 | | - director_client: "DirectorApi", image_key: str, image_tag: str |
127 | | -) -> dict[str, Any]: |
128 | | - # check physical node requirements |
129 | | - # all nodes require "CPU" |
130 | | - result: dict[str, Any] = { |
131 | | - "node_requirements": { |
132 | | - "CPU": director_client.default_max_nano_cpus / 1.0e09, |
133 | | - "RAM": director_client.default_max_memory, |
134 | | - } |
135 | | - } |
136 | | - |
137 | | - labels = await director_client.get_service_labels(image_key, image_tag) |
138 | | - _logger.debug("Compiling service extras from labels %s", pformat(labels)) |
139 | | - |
140 | | - if _SERVICE_RUNTIME_SETTINGS in labels: |
141 | | - service_settings: list[dict[str, Any]] = json.loads( |
142 | | - labels[_SERVICE_RUNTIME_SETTINGS] |
143 | | - ) |
144 | | - for entry in service_settings: |
145 | | - entry_name = entry.get("name", "").lower() |
146 | | - entry_value = entry.get("value") |
147 | | - invalid_with_msg = None |
148 | | - |
149 | | - if entry_name == _RESOURCES_ENTRY_NAME: |
150 | | - if entry_value and isinstance(entry_value, dict): |
151 | | - res_limit = entry_value.get("Limits", {}) |
152 | | - res_reservation = entry_value.get("Reservations", {}) |
153 | | - # CPU |
154 | | - result["node_requirements"]["CPU"] = ( |
155 | | - float(res_limit.get("NanoCPUs", 0)) |
156 | | - or float(res_reservation.get("NanoCPUs", 0)) |
157 | | - or director_client.default_max_nano_cpus |
158 | | - ) / 1.0e09 |
159 | | - # RAM |
160 | | - result["node_requirements"]["RAM"] = ( |
161 | | - res_limit.get("MemoryBytes", 0) |
162 | | - or res_reservation.get("MemoryBytes", 0) |
163 | | - or director_client.default_max_memory |
164 | | - ) |
165 | | - else: |
166 | | - invalid_with_msg = f"invalid type for resource [{entry_value}]" |
167 | | - |
168 | | - # discrete resources (custom made ones) --- |
169 | | - # check if the service requires GPU support |
170 | | - if not invalid_with_msg and _validate_kind(entry, "VRAM"): |
171 | | - |
172 | | - result["node_requirements"]["GPU"] = 1 |
173 | | - if not invalid_with_msg and _validate_kind(entry, "MPI"): |
174 | | - result["node_requirements"]["MPI"] = 1 |
175 | | - |
176 | | - elif entry_name == _CONTAINER_SPEC_ENTRY_NAME: |
177 | | - # NOTE: some minor validation |
178 | | - # expects {'name': 'ContainerSpec', 'type': 'ContainerSpec', 'value': {'Command': [...]}} |
179 | | - if ( |
180 | | - entry_value |
181 | | - and isinstance(entry_value, dict) |
182 | | - and "Command" in entry_value |
183 | | - ): |
184 | | - result["container_spec"] = entry_value |
185 | | - else: |
186 | | - invalid_with_msg = f"invalid container_spec [{entry_value}]" |
187 | | - |
188 | | - if invalid_with_msg: |
189 | | - _logger.warning( |
190 | | - "%s entry [%s] encoded in settings labels of service image %s:%s", |
191 | | - invalid_with_msg, |
192 | | - entry, |
193 | | - image_key, |
194 | | - image_tag, |
195 | | - ) |
196 | | - |
197 | | - # get org labels |
198 | | - result.update( |
199 | | - { |
200 | | - sl: labels[dl] |
201 | | - for dl, sl in _ORG_LABELS_TO_SCHEMA_LABELS.items() |
202 | | - if dl in labels |
203 | | - } |
204 | | - ) |
205 | | - |
206 | | - _logger.debug("Following service extras were compiled: %s", pformat(result)) |
207 | | - |
208 | | - return result |
209 | | - |
210 | | - |
211 | 127 | class DirectorApi: |
212 | 128 | """ |
213 | 129 | - wrapper around thin-client to simplify director's API |
@@ -288,7 +204,87 @@ async def get_service_extras( |
288 | 204 | service_key: ServiceKey, |
289 | 205 | service_version: ServiceVersion, |
290 | 206 | ) -> dict[str, Any]: |
291 | | - return await _get_service_extras(self, service_key, service_version) |
| 207 | + # check physical node requirements |
| 208 | + # all nodes require "CPU" |
| 209 | + result: dict[str, Any] = { |
| 210 | + "node_requirements": { |
| 211 | + "CPU": self.default_max_nano_cpus / 1.0e09, |
| 212 | + "RAM": self.default_max_memory, |
| 213 | + } |
| 214 | + } |
| 215 | + |
| 216 | + labels = await self.get_service_labels(service_key, service_version) |
| 217 | + _logger.debug("Compiling service extras from labels %s", pformat(labels)) |
| 218 | + |
| 219 | + if _SERVICE_RUNTIME_SETTINGS in labels: |
| 220 | + service_settings: list[dict[str, Any]] = json.loads( |
| 221 | + labels[_SERVICE_RUNTIME_SETTINGS] |
| 222 | + ) |
| 223 | + for entry in service_settings: |
| 224 | + entry_name = entry.get("name", "").lower() |
| 225 | + entry_value = entry.get("value") |
| 226 | + invalid_with_msg = None |
| 227 | + |
| 228 | + if entry_name == _RESOURCES_ENTRY_NAME: |
| 229 | + if entry_value and isinstance(entry_value, dict): |
| 230 | + res_limit = entry_value.get("Limits", {}) |
| 231 | + res_reservation = entry_value.get("Reservations", {}) |
| 232 | + # CPU |
| 233 | + result["node_requirements"]["CPU"] = ( |
| 234 | + float(res_limit.get("NanoCPUs", 0)) |
| 235 | + or float(res_reservation.get("NanoCPUs", 0)) |
| 236 | + or self.default_max_nano_cpus |
| 237 | + ) / 1.0e09 |
| 238 | + # RAM |
| 239 | + result["node_requirements"]["RAM"] = ( |
| 240 | + res_limit.get("MemoryBytes", 0) |
| 241 | + or res_reservation.get("MemoryBytes", 0) |
| 242 | + or self.default_max_memory |
| 243 | + ) |
| 244 | + else: |
| 245 | + invalid_with_msg = f"invalid type for resource [{entry_value}]" |
| 246 | + |
| 247 | + # discrete resources (custom made ones) --- |
| 248 | + # check if the service requires GPU support |
| 249 | + if not invalid_with_msg and _validate_kind(entry, "VRAM"): |
| 250 | + |
| 251 | + result["node_requirements"]["GPU"] = 1 |
| 252 | + if not invalid_with_msg and _validate_kind(entry, "MPI"): |
| 253 | + result["node_requirements"]["MPI"] = 1 |
| 254 | + |
| 255 | + elif entry_name == _CONTAINER_SPEC_ENTRY_NAME: |
| 256 | + # NOTE: some minor validation |
| 257 | + # expects {'name': 'ContainerSpec', 'type': 'ContainerSpec', 'value': {'Command': [...]}} |
| 258 | + if ( |
| 259 | + entry_value |
| 260 | + and isinstance(entry_value, dict) |
| 261 | + and "Command" in entry_value |
| 262 | + ): |
| 263 | + result["container_spec"] = entry_value |
| 264 | + else: |
| 265 | + invalid_with_msg = f"invalid container_spec [{entry_value}]" |
| 266 | + |
| 267 | + if invalid_with_msg: |
| 268 | + _logger.warning( |
| 269 | + "%s entry [%s] encoded in settings labels of service image %s:%s", |
| 270 | + invalid_with_msg, |
| 271 | + entry, |
| 272 | + service_key, |
| 273 | + service_version, |
| 274 | + ) |
| 275 | + |
| 276 | + # get org labels |
| 277 | + result.update( |
| 278 | + { |
| 279 | + sl: labels[dl] |
| 280 | + for dl, sl in _ORG_LABELS_TO_SCHEMA_LABELS.items() |
| 281 | + if dl in labels |
| 282 | + } |
| 283 | + ) |
| 284 | + |
| 285 | + _logger.debug("Following service extras were compiled: %s", pformat(result)) |
| 286 | + |
| 287 | + return result |
292 | 288 |
|
293 | 289 |
|
294 | 290 | async def setup_director(app: FastAPI) -> None: |
|
0 commit comments