Skip to content

Commit 79aab7f

Browse files
committed
refactoring, setup background task
1 parent 3fe2e6f commit 79aab7f

File tree

1 file changed

+33
-13
lines changed

1 file changed

+33
-13
lines changed

services/director/src/simcore_service_director/registry_proxy.py

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import httpx
1111
from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped]
1212
from fastapi import FastAPI, status
13+
from servicelib.background_task import start_periodic_task, stop_periodic_task
1314
from servicelib.logging_utils import log_catch, log_context
1415
from servicelib.utils import limited_as_completed
1516
from tenacity import retry
@@ -189,26 +190,26 @@ async def registry_request(
189190
app: FastAPI,
190191
*,
191192
path: str,
192-
method: str = "GET",
193-
no_cache: bool = False,
193+
method: str,
194+
use_cache: bool,
194195
**session_kwargs,
195196
) -> tuple[dict, Mapping]:
196197
cache: SimpleMemoryCache = app.state.registry_cache_memory
197198
cache_key = f"{method}_{path}"
198-
if not no_cache and (cached_response := await cache.get(cache_key)):
199+
if use_cache and (cached_response := await cache.get(cache_key)):
199200
assert isinstance(cached_response, tuple) # nosec
200201
return cast(tuple[dict, Mapping], cached_response)
201202

202203
app_settings = get_application_settings(app)
203204
try:
204205
response, response_headers = await _retried_request(
205-
app, path, method, **session_kwargs
206+
app, path, method.upper(), **session_kwargs
206207
)
207208
except httpx.RequestError as exc:
208209
msg = f"Unknown error while accessing registry: {exc!s} via {exc.request}"
209210
raise DirectorRuntimeError(msg=msg) from exc
210211

211-
if not no_cache and app_settings.DIRECTOR_REGISTRY_CACHING and method == "GET":
212+
if app_settings.DIRECTOR_REGISTRY_CACHING and method.upper() == "GET":
212213
await cache.set(
213214
cache_key,
214215
(response, response_headers),
@@ -236,16 +237,29 @@ async def _wait_until_registry_responsive(app: FastAPI) -> None:
236237
await _wait_until_registry_responsive(app)
237238

238239

240+
async def _list_all_services_task(app: FastAPI) -> None:
241+
with log_context(_logger, logging.INFO, msg="Listing all services"):
242+
await list_services(app, ServiceType.ALL)
243+
244+
239245
def setup(app: FastAPI) -> None:
240246
async def on_startup() -> None:
241247
cache = Cache(Cache.MEMORY)
242248
assert isinstance(cache, SimpleMemoryCache) # nosec
243249
app.state.registry_cache_memory = cache
244250
await _setup_registry(app)
251+
app_settings = get_application_settings(app)
252+
app.state.auto_cache_task = None
253+
if app_settings.DIRECTOR_REGISTRY_CACHING:
254+
app.state.auto_cache_task = await start_periodic_task(
255+
_list_all_services_task,
256+
interval=app_settings.DIRECTOR_REGISTRY_CACHING_TTL / 2,
257+
task_name="director-auto-cache-task",
258+
)
245259

246260
async def on_shutdown() -> None:
247-
# nothing to do here
248-
...
261+
if app.state.auto_cache_task:
262+
await stop_periodic_task(app.state.auto_cache_task)
249263

250264
app.add_event_handler("startup", on_startup)
251265
app.add_event_handler("shutdown", on_shutdown)
@@ -270,15 +284,17 @@ async def _list_repositories_gen(
270284
) -> AsyncGenerator[list[str], None]:
271285
with log_context(_logger, logging.DEBUG, msg="listing repositories"):
272286
path = f"_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
273-
result, headers = await registry_request(app, path=path) # initial call
287+
result, headers = await registry_request(
288+
app, path=path, method="GET", use_cache=True
289+
) # initial call
274290

275291
while True:
276292
if "Link" in headers:
277293
next_path = (
278294
str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/")
279295
)
280296
prefetch_task = asyncio.create_task(
281-
registry_request(app, path=next_path)
297+
registry_request(app, path=next_path, method="GET", use_cache=True)
282298
)
283299
else:
284300
prefetch_task = None
@@ -300,14 +316,16 @@ async def list_image_tags_gen(
300316
) -> AsyncGenerator[list[str], None]:
301317
with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"):
302318
path = f"{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
303-
tags, headers = await registry_request(app, path=path) # initial call
319+
tags, headers = await registry_request(
320+
app, path=path, method="GET", use_cache=True
321+
) # initial call
304322
while True:
305323
if "Link" in headers:
306324
next_path = (
307325
str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/")
308326
)
309327
prefetch_task = asyncio.create_task(
310-
registry_request(app, path=next_path)
328+
registry_request(app, path=next_path, method="GET", use_cache=True)
311329
)
312330
else:
313331
prefetch_task = None
@@ -342,7 +360,7 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None:
342360
SEE https://distribution.github.io/distribution/spec/api/#digest-header
343361
"""
344362
path = f"{image}/manifests/{tag}"
345-
_, headers = await registry_request(app, path=path)
363+
_, headers = await registry_request(app, path=path, method="GET", use_cache=True)
346364

347365
headers = headers or {}
348366
return headers.get(_DOCKER_CONTENT_DIGEST_HEADER, None)
@@ -355,7 +373,9 @@ async def get_image_labels(
355373

356374
_logger.debug("getting image labels of %s:%s", image, tag)
357375
path = f"{image}/manifests/{tag}"
358-
request_result, headers = await registry_request(app, path=path)
376+
request_result, headers = await registry_request(
377+
app, path=path, method="GET", use_cache=True
378+
)
359379
v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"])
360380
container_config: dict[str, Any] = v1_compatibility_key.get(
361381
"container_config", v1_compatibility_key["config"]

0 commit comments

Comments
 (0)