Skip to content

Commit 8599207

Browse files
committed
add prefetching and generators
1 parent 35de004 commit 8599207

File tree

1 file changed

+65
-55
lines changed

1 file changed

+65
-55
lines changed

services/director/src/simcore_service_director/registry_proxy.py

Lines changed: 65 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
1+
import asyncio
12
import enum
23
import json
34
import logging
45
import re
56
from collections.abc import Mapping
67
from http import HTTPStatus
78
from pprint import pformat
8-
from typing import Any, cast
9+
from typing import Any, AsyncGenerator, Final, cast
910

1011
from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped]
1112
from aiohttp import BasicAuth, ClientSession, client_exceptions
1213
from aiohttp.client import ClientTimeout
1314
from fastapi import FastAPI
14-
from servicelib.utils import limited_gather
15+
from servicelib.logging_utils import log_catch, log_context
16+
from servicelib.utils import limited_as_completed
1517
from tenacity import retry
1618
from tenacity.before_sleep import before_sleep_log
1719
from tenacity.retry import retry_if_result
@@ -271,20 +273,44 @@ async def on_shutdown() -> None:
271273
app.add_event_handler("shutdown", on_shutdown)
272274

273275

274-
async def _list_repositories(app: FastAPI) -> list[str]:
275-
logger.debug("listing repositories")
276-
# if there are more repos, the Link will be available in the response headers until none available
277-
path = f"/v2/_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
278-
repos_list: list = []
279-
while True:
280-
result, headers = await registry_request(app, path)
281-
if result["repositories"]:
282-
repos_list.extend(result["repositories"])
283-
if "Link" not in headers:
284-
break
285-
path = str(headers["Link"]).split(";")[0].strip("<>")
286-
logger.debug("listed %s repositories", len(repos_list))
287-
return repos_list
276+
def _get_prefix(service_type: ServiceType) -> str:
277+
return f"{DIRECTOR_SIMCORE_SERVICES_PREFIX}/{service_type.value}/"
278+
279+
280+
_SERVICE_TYPE_FILTER_MAP: Final[dict[ServiceType, tuple[str, ...]]] = {
281+
ServiceType.DYNAMIC: (_get_prefix(ServiceType.DYNAMIC),),
282+
ServiceType.COMPUTATIONAL: (_get_prefix(ServiceType.COMPUTATIONAL),),
283+
ServiceType.ALL: (
284+
_get_prefix(ServiceType.DYNAMIC),
285+
_get_prefix(ServiceType.COMPUTATIONAL),
286+
),
287+
}
288+
289+
290+
async def _list_repositories_gen(
291+
app: FastAPI, service_type: ServiceType
292+
) -> AsyncGenerator[list[str], None]:
293+
with log_context(logger, logging.DEBUG, msg="listing repositories"):
294+
path = f"/v2/_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
295+
result, headers = await registry_request(app, path) # initial call
296+
297+
while True:
298+
if "Link" in headers:
299+
next_path = str(headers["Link"]).split(";")[0].strip("<>")
300+
prefetch_task = asyncio.create_task(registry_request(app, next_path))
301+
else:
302+
prefetch_task = None
303+
304+
yield list(
305+
filter(
306+
lambda x: str(x).startswith(_SERVICE_TYPE_FILTER_MAP[service_type]),
307+
result["repositories"],
308+
)
309+
)
310+
if prefetch_task:
311+
result, headers = await prefetch_task
312+
else:
313+
return
288314

289315

290316
async def list_image_tags(app: FastAPI, image_key: str) -> list[str]:
@@ -375,48 +401,36 @@ async def get_image_details(
375401

376402

377403
async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]:
378-
379404
image_tags = await list_image_tags(app, image_key)
380-
381-
results = await limited_gather(
382-
*[get_image_details(app, image_key, tag) for tag in image_tags],
383-
reraise=False,
384-
log=logger,
405+
repo_details = []
406+
async for image_details_future in limited_as_completed(
407+
(get_image_details(app, image_key, tag) for tag in image_tags),
385408
limit=get_application_settings(
386409
app
387410
).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS,
388-
)
389-
return [result for result in results if not isinstance(result, BaseException)]
411+
):
412+
with log_catch(logger, reraise=False):
413+
if image_details := await image_details_future:
414+
repo_details.append(image_details)
415+
return repo_details
390416

391417

392418
async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]:
393-
logger.debug("getting list of services")
394-
repos = await _list_repositories(app)
395-
# get the services repos
396-
prefixes = []
397-
if service_type in [ServiceType.DYNAMIC, ServiceType.ALL]:
398-
prefixes.append(_get_prefix(ServiceType.DYNAMIC))
399-
if service_type in [ServiceType.COMPUTATIONAL, ServiceType.ALL]:
400-
prefixes.append(_get_prefix(ServiceType.COMPUTATIONAL))
401-
repos = [x for x in repos if str(x).startswith(tuple(prefixes))]
402-
logger.debug("retrieved list of repos : %s", repos)
403-
404-
# only list as service if it actually contains the necessary labels
405-
results = await limited_gather(
406-
*[get_repo_details(app, repo) for repo in repos],
407-
reraise=False,
408-
log=logger,
409-
limit=get_application_settings(
410-
app
411-
).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS,
412-
)
413-
414-
return [
415-
service
416-
for repo_details in results
417-
if isinstance(repo_details, list)
418-
for service in repo_details
419-
]
419+
with log_context(logger, logging.DEBUG, msg="listing services"):
420+
services = []
421+
async for repos in _list_repositories_gen(app, service_type):
422+
# only list as service if it actually contains the necessary labels
423+
async for repo_details_future in limited_as_completed(
424+
(get_repo_details(app, repo) for repo in repos),
425+
limit=get_application_settings(
426+
app
427+
).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS,
428+
):
429+
with log_catch(logger, reraise=False):
430+
if repo_details := await repo_details_future:
431+
services.extend(repo_details)
432+
433+
return services
420434

421435

422436
async def list_interactive_service_dependencies(
@@ -441,10 +455,6 @@ async def list_interactive_service_dependencies(
441455
return dependency_keys
442456

443457

444-
def _get_prefix(service_type: ServiceType) -> str:
445-
return f"{DIRECTOR_SIMCORE_SERVICES_PREFIX}/{service_type.value}/"
446-
447-
448458
def get_service_first_name(image_key: str) -> str:
449459
if str(image_key).startswith(_get_prefix(ServiceType.DYNAMIC)):
450460
service_name_suffixes = str(image_key)[len(_get_prefix(ServiceType.DYNAMIC)) :]

0 commit comments

Comments
 (0)