11import asyncio
22import functools
3+ import json
34import logging
45import urllib .parse
56from collections .abc import Awaitable , Callable
67from contextlib import suppress
7- from typing import Any
8+ from pprint import pformat
9+ from typing import Any , Final
810
911import httpx
1012from common_library .json_serialization import json_dumps
1315from models_library .services_types import ServiceKey , ServiceVersion
1416from servicelib .fastapi .tracing import setup_httpx_client_tracing
1517from servicelib .logging_utils import log_context
16- from settings_library .tracing import TracingSettings
1718from starlette import status
1819from tenacity .asyncio import AsyncRetrying
1920from tenacity .before_sleep import before_sleep_log
2021from tenacity .stop import stop_after_delay
2122from tenacity .wait import wait_random
2223
24+ from ..core .settings import ApplicationSettings
2325from ..exceptions .errors import DirectorUnresponsiveError
2426
2527_logger = logging .getLogger (__name__ )
@@ -98,6 +100,114 @@ async def request_wrapper(
98100 return request_wrapper
99101
100102
103+ _SERVICE_RUNTIME_SETTINGS : Final [str ] = "simcore.service.settings"
104+ _ORG_LABELS_TO_SCHEMA_LABELS : Final [dict [str , str ]] = {
105+ "org.label-schema.build-date" : "build_date" ,
106+ "org.label-schema.vcs-ref" : "vcs_ref" ,
107+ "org.label-schema.vcs-url" : "vcs_url" ,
108+ }
109+
110+ _CONTAINER_SPEC_ENTRY_NAME = "ContainerSpec" .lower ()
111+ _RESOURCES_ENTRY_NAME = "Resources" .lower ()
112+
113+
114+ def _validate_kind (entry_to_validate : dict [str , Any ], kind_name : str ):
115+ for element in (
116+ entry_to_validate .get ("value" , {})
117+ .get ("Reservations" , {})
118+ .get ("GenericResources" , [])
119+ ):
120+ if element .get ("DiscreteResourceSpec" , {}).get ("Kind" ) == kind_name :
121+ return True
122+ return False
123+
124+
125+ async def _get_service_extras (
126+ director_client : "DirectorApi" , image_key : str , image_tag : str
127+ ) -> dict [str , Any ]:
128+ # check physical node requirements
129+ # all nodes require "CPU"
130+ result : dict [str , Any ] = {
131+ "node_requirements" : {
132+ "CPU" : director_client .default_max_nano_cpus / 1.0e09 ,
133+ "RAM" : director_client .default_max_memory ,
134+ }
135+ }
136+
137+ labels = await director_client .get_service_labels (image_key , image_tag )
138+ _logger .debug ("Compiling service extras from labels %s" , pformat (labels ))
139+
140+ if _SERVICE_RUNTIME_SETTINGS in labels :
141+ service_settings : list [dict [str , Any ]] = json .loads (
142+ labels [_SERVICE_RUNTIME_SETTINGS ]
143+ )
144+ for entry in service_settings :
145+ entry_name = entry .get ("name" , "" ).lower ()
146+ entry_value = entry .get ("value" )
147+ invalid_with_msg = None
148+
149+ if entry_name == _RESOURCES_ENTRY_NAME :
150+ if entry_value and isinstance (entry_value , dict ):
151+ res_limit = entry_value .get ("Limits" , {})
152+ res_reservation = entry_value .get ("Reservations" , {})
153+ # CPU
154+ result ["node_requirements" ]["CPU" ] = (
155+ float (res_limit .get ("NanoCPUs" , 0 ))
156+ or float (res_reservation .get ("NanoCPUs" , 0 ))
157+ or director_client .default_max_nano_cpus
158+ ) / 1.0e09
159+ # RAM
160+ result ["node_requirements" ]["RAM" ] = (
161+ res_limit .get ("MemoryBytes" , 0 )
162+ or res_reservation .get ("MemoryBytes" , 0 )
163+ or director_client .default_max_memory
164+ )
165+ else :
166+ invalid_with_msg = f"invalid type for resource [{ entry_value } ]"
167+
168+ # discrete resources (custom made ones) ---
169+ # check if the service requires GPU support
170+ if not invalid_with_msg and _validate_kind (entry , "VRAM" ):
171+
172+ result ["node_requirements" ]["GPU" ] = 1
173+ if not invalid_with_msg and _validate_kind (entry , "MPI" ):
174+ result ["node_requirements" ]["MPI" ] = 1
175+
176+ elif entry_name == _CONTAINER_SPEC_ENTRY_NAME :
177+ # NOTE: some minor validation
178+ # expects {'name': 'ContainerSpec', 'type': 'ContainerSpec', 'value': {'Command': [...]}}
179+ if (
180+ entry_value
181+ and isinstance (entry_value , dict )
182+ and "Command" in entry_value
183+ ):
184+ result ["container_spec" ] = entry_value
185+ else :
186+ invalid_with_msg = f"invalid container_spec [{ entry_value } ]"
187+
188+ if invalid_with_msg :
189+ _logger .warning (
190+ "%s entry [%s] encoded in settings labels of service image %s:%s" ,
191+ invalid_with_msg ,
192+ entry ,
193+ image_key ,
194+ image_tag ,
195+ )
196+
197+ # get org labels
198+ result .update (
199+ {
200+ sl : labels [dl ]
201+ for dl , sl in _ORG_LABELS_TO_SCHEMA_LABELS .items ()
202+ if dl in labels
203+ }
204+ )
205+
206+ _logger .debug ("Following service extras were compiled: %s" , pformat (result ))
207+
208+ return result
209+
210+
101211class DirectorApi :
102212 """
103213 - wrapper around thin-client to simplify director's API
@@ -108,16 +218,22 @@ class DirectorApi:
108218 SEE services/catalog/src/simcore_service_catalog/api/dependencies/director.py
109219 """
110220
111- def __init__ (
112- self , base_url : str , app : FastAPI , tracing_settings : TracingSettings | None
113- ):
221+ def __init__ (self , base_url : str , app : FastAPI ):
222+ settings : ApplicationSettings = app .state .settings
223+
224+ assert settings .CATALOG_CLIENT_REQUEST # nosec
114225 self .client = httpx .AsyncClient (
115226 base_url = base_url ,
116- timeout = app . state . settings .CATALOG_CLIENT_REQUEST .HTTP_CLIENT_REQUEST_TOTAL_TIMEOUT ,
227+ timeout = settings .CATALOG_CLIENT_REQUEST .HTTP_CLIENT_REQUEST_TOTAL_TIMEOUT ,
117228 )
118- if tracing_settings :
229+ if settings . CATALOG_TRACING :
119230 setup_httpx_client_tracing (self .client )
120- self .vtag = app .state .settings .CATALOG_DIRECTOR .DIRECTOR_VTAG
231+
232+ assert settings .CATALOG_DIRECTOR # nosec
233+ self .vtag = settings .CATALOG_DIRECTOR .DIRECTOR_VTAG
234+
235+ self .default_max_memory = settings .DIRECTOR_DEFAULT_MAX_MEMORY
236+ self .default_max_nano_cpus = settings .DIRECTOR_DEFAULT_MAX_NANO_CPUS
121237
122238 async def close (self ):
123239 await self .client .aclose ()
@@ -172,32 +288,18 @@ async def get_service_extras(
172288 service_key : ServiceKey ,
173289 service_version : ServiceVersion ,
174290 ) -> dict [str , Any ]:
175- response = await self .get (
176- f"/service_extras/{ urllib .parse .quote_plus (service_key )} /{ service_version } "
177- )
178- assert isinstance (response , dict ) # nosec
179- return response
291+ return await _get_service_extras (self , service_key , service_version )
180292
181293
182- async def setup_director (
183- app : FastAPI , tracing_settings : TracingSettings | None
184- ) -> None :
294+ async def setup_director (app : FastAPI ) -> None :
185295 if settings := app .state .settings .CATALOG_DIRECTOR :
186296 with log_context (
187297 _logger , logging .DEBUG , "Setup director at %s" , f"{ settings .base_url = } "
188298 ):
189299 async for attempt in AsyncRetrying (** _director_startup_retry_policy ):
190- client = DirectorApi (
191- base_url = settings .base_url ,
192- app = app ,
193- tracing_settings = tracing_settings ,
194- )
300+ client = DirectorApi (base_url = settings .base_url , app = app )
195301 with attempt :
196- client = DirectorApi (
197- base_url = settings .base_url ,
198- app = app ,
199- tracing_settings = tracing_settings ,
200- )
302+ client = DirectorApi (base_url = settings .base_url , app = app )
201303 if not await client .is_responsive ():
202304 with suppress (Exception ):
203305 await client .close ()
0 commit comments