Skip to content

Commit e9ec48c

Browse files
Merge branch 'master' into performance-test-improvements
2 parents 8d45899 + 0aeae77 commit e9ec48c

File tree

2 files changed

+85
-33
lines changed

2 files changed

+85
-33
lines changed

services/director/src/simcore_service_director/registry_proxy.py

Lines changed: 75 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import httpx
1111
from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped]
1212
from fastapi import FastAPI, status
13+
from servicelib.background_task import start_periodic_task, stop_periodic_task
1314
from servicelib.logging_utils import log_catch, log_context
1415
from servicelib.utils import limited_as_completed
1516
from tenacity import retry
@@ -189,26 +190,26 @@ async def registry_request(
189190
app: FastAPI,
190191
*,
191192
path: str,
192-
method: str = "GET",
193-
no_cache: bool = False,
193+
method: str,
194+
use_cache: bool,
194195
**session_kwargs,
195196
) -> tuple[dict, Mapping]:
196197
cache: SimpleMemoryCache = app.state.registry_cache_memory
197198
cache_key = f"{method}_{path}"
198-
if not no_cache and (cached_response := await cache.get(cache_key)):
199+
if use_cache and (cached_response := await cache.get(cache_key)):
199200
assert isinstance(cached_response, tuple) # nosec
200201
return cast(tuple[dict, Mapping], cached_response)
201202

202203
app_settings = get_application_settings(app)
203204
try:
204205
response, response_headers = await _retried_request(
205-
app, path, method, **session_kwargs
206+
app, path, method.upper(), **session_kwargs
206207
)
207208
except httpx.RequestError as exc:
208209
msg = f"Unknown error while accessing registry: {exc!s} via {exc.request}"
209210
raise DirectorRuntimeError(msg=msg) from exc
210211

211-
if not no_cache and app_settings.DIRECTOR_REGISTRY_CACHING and method == "GET":
212+
if app_settings.DIRECTOR_REGISTRY_CACHING and method.upper() == "GET":
212213
await cache.set(
213214
cache_key,
214215
(response, response_headers),
@@ -218,10 +219,6 @@ async def registry_request(
218219
return response, response_headers
219220

220221

221-
async def _is_registry_responsive(app: FastAPI) -> None:
222-
await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0)
223-
224-
225222
async def _setup_registry(app: FastAPI) -> None:
226223
@retry(
227224
wait=wait_fixed(1),
@@ -230,22 +227,36 @@ async def _setup_registry(app: FastAPI) -> None:
230227
reraise=True,
231228
)
232229
async def _wait_until_registry_responsive(app: FastAPI) -> None:
233-
await _is_registry_responsive(app)
230+
await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0)
234231

235232
with log_context(_logger, logging.INFO, msg="Connecting to docker registry"):
236233
await _wait_until_registry_responsive(app)
237234

238235

236+
async def _list_all_services_task(*, app: FastAPI) -> None:
237+
with log_context(_logger, logging.INFO, msg="Updating cache with services"):
238+
await list_services(app, ServiceType.ALL, update_cache=True)
239+
240+
239241
def setup(app: FastAPI) -> None:
240242
async def on_startup() -> None:
241243
cache = Cache(Cache.MEMORY)
242244
assert isinstance(cache, SimpleMemoryCache) # nosec
243245
app.state.registry_cache_memory = cache
244246
await _setup_registry(app)
247+
app_settings = get_application_settings(app)
248+
app.state.auto_cache_task = None
249+
if app_settings.DIRECTOR_REGISTRY_CACHING:
250+
app.state.auto_cache_task = start_periodic_task(
251+
_list_all_services_task,
252+
interval=app_settings.DIRECTOR_REGISTRY_CACHING_TTL / 2,
253+
task_name="director-auto-cache-task",
254+
app=app,
255+
)
245256

246257
async def on_shutdown() -> None:
247-
# nothing to do here
248-
...
258+
if app.state.auto_cache_task:
259+
await stop_periodic_task(app.state.auto_cache_task)
249260

250261
app.add_event_handler("startup", on_startup)
251262
app.add_event_handler("shutdown", on_shutdown)
@@ -266,19 +277,23 @@ def _get_prefix(service_type: ServiceType) -> str:
266277

267278

268279
async def _list_repositories_gen(
269-
app: FastAPI, service_type: ServiceType
280+
app: FastAPI, service_type: ServiceType, *, update_cache: bool
270281
) -> AsyncGenerator[list[str], None]:
271282
with log_context(_logger, logging.DEBUG, msg="listing repositories"):
272283
path = f"_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
273-
result, headers = await registry_request(app, path=path) # initial call
284+
result, headers = await registry_request(
285+
app, path=path, method="GET", use_cache=not update_cache
286+
) # initial call
274287

275288
while True:
276289
if "Link" in headers:
277290
next_path = (
278291
str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/")
279292
)
280293
prefetch_task = asyncio.create_task(
281-
registry_request(app, path=next_path)
294+
registry_request(
295+
app, path=next_path, method="GET", use_cache=not update_cache
296+
)
282297
)
283298
else:
284299
prefetch_task = None
@@ -296,27 +311,36 @@ async def _list_repositories_gen(
296311

297312

298313
async def list_image_tags_gen(
299-
app: FastAPI, image_key: str
314+
app: FastAPI, image_key: str, *, update_cache=False
300315
) -> AsyncGenerator[list[str], None]:
301316
with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"):
302317
path = f"{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
303-
tags, headers = await registry_request(app, path=path) # initial call
318+
tags, headers = await registry_request(
319+
app, path=path, method="GET", use_cache=not update_cache
320+
) # initial call
321+
assert "tags" in tags # nosec
304322
while True:
305323
if "Link" in headers:
306324
next_path = (
307325
str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/")
308326
)
309327
prefetch_task = asyncio.create_task(
310-
registry_request(app, path=next_path)
328+
registry_request(
329+
app, path=next_path, method="GET", use_cache=not update_cache
330+
)
311331
)
312332
else:
313333
prefetch_task = None
314334

315-
yield list(
316-
filter(
317-
VERSION_REG.match,
318-
tags["tags"],
335+
yield (
336+
list(
337+
filter(
338+
VERSION_REG.match,
339+
tags["tags"],
340+
)
319341
)
342+
if tags["tags"] is not None
343+
else []
320344
)
321345
if prefetch_task:
322346
tags, headers = await prefetch_task
@@ -342,20 +366,22 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None:
342366
SEE https://distribution.github.io/distribution/spec/api/#digest-header
343367
"""
344368
path = f"{image}/manifests/{tag}"
345-
_, headers = await registry_request(app, path=path)
369+
_, headers = await registry_request(app, path=path, method="GET", use_cache=True)
346370

347371
headers = headers or {}
348372
return headers.get(_DOCKER_CONTENT_DIGEST_HEADER, None)
349373

350374

351375
async def get_image_labels(
352-
app: FastAPI, image: str, tag: str
376+
app: FastAPI, image: str, tag: str, *, update_cache=False
353377
) -> tuple[dict[str, str], str | None]:
354378
"""Returns image labels and the image manifest digest"""
355379

356380
_logger.debug("getting image labels of %s:%s", image, tag)
357381
path = f"{image}/manifests/{tag}"
358-
request_result, headers = await registry_request(app, path=path)
382+
request_result, headers = await registry_request(
383+
app, path=path, method="GET", use_cache=not update_cache
384+
)
359385
v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"])
360386
container_config: dict[str, Any] = v1_compatibility_key.get(
361387
"container_config", v1_compatibility_key["config"]
@@ -371,10 +397,12 @@ async def get_image_labels(
371397

372398

373399
async def get_image_details(
374-
app: FastAPI, image_key: str, image_tag: str
400+
app: FastAPI, image_key: str, image_tag: str, *, update_cache=False
375401
) -> dict[str, Any]:
376402
image_details: dict = {}
377-
labels, image_manifest_digest = await get_image_labels(app, image_key, image_tag)
403+
labels, image_manifest_digest = await get_image_labels(
404+
app, image_key, image_tag, update_cache=update_cache
405+
)
378406

379407
if image_manifest_digest:
380408
# Adds manifest as extra key in the response similar to org.opencontainers.image.base.digest
@@ -402,11 +430,18 @@ async def get_image_details(
402430
return image_details
403431

404432

405-
async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]:
433+
async def get_repo_details(
434+
app: FastAPI, image_key: str, *, update_cache=False
435+
) -> list[dict[str, Any]]:
406436
repo_details = []
407-
async for image_tags in list_image_tags_gen(app, image_key):
437+
async for image_tags in list_image_tags_gen(
438+
app, image_key, update_cache=update_cache
439+
):
408440
async for image_details_future in limited_as_completed(
409-
(get_image_details(app, image_key, tag) for tag in image_tags),
441+
(
442+
get_image_details(app, image_key, tag, update_cache=update_cache)
443+
for tag in image_tags
444+
),
410445
limit=get_application_settings(
411446
app
412447
).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS,
@@ -417,16 +452,23 @@ async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]
417452
return repo_details
418453

419454

420-
async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]:
455+
async def list_services(
456+
app: FastAPI, service_type: ServiceType, *, update_cache=False
457+
) -> list[dict]:
421458
with log_context(_logger, logging.DEBUG, msg="listing services"):
422459
services = []
423460
concurrency_limit = get_application_settings(
424461
app
425462
).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS
426-
async for repos in _list_repositories_gen(app, service_type):
463+
async for repos in _list_repositories_gen(
464+
app, service_type, update_cache=update_cache
465+
):
427466
# only list as service if it actually contains the necessary labels
428467
async for repo_details_future in limited_as_completed(
429-
(get_repo_details(app, repo) for repo in repos),
468+
(
469+
get_repo_details(app, repo, update_cache=update_cache)
470+
for repo in repos
471+
),
430472
limit=concurrency_limit,
431473
):
432474
with log_catch(_logger, reraise=False):

services/director/tests/unit/test_registry_proxy.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import asyncio
55
import json
66
import time
7+
from unittest import mock
78

89
import pytest
910
from fastapi import FastAPI
1011
from pytest_benchmark.plugin import BenchmarkFixture
12+
from pytest_mock.plugin import MockerFixture
1113
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
1214
from pytest_simcore.helpers.typing_env import EnvVarsDict
1315
from settings_library.docker_registry import RegistrySettings
@@ -228,9 +230,17 @@ def configure_registry_caching(
228230
)
229231

230232

233+
@pytest.fixture
234+
def with_disabled_auto_caching(mocker: MockerFixture) -> mock.Mock:
235+
return mocker.patch(
236+
"simcore_service_director.registry_proxy._list_all_services_task", autospec=True
237+
)
238+
239+
231240
async def test_registry_caching(
232241
configure_registry_access: EnvVarsDict,
233242
configure_registry_caching: EnvVarsDict,
243+
with_disabled_auto_caching: mock.Mock,
234244
app_settings: ApplicationSettings,
235245
app: FastAPI,
236246
push_services,

0 commit comments

Comments
 (0)