1212import asyncio
1313import logging
1414from collections .abc import AsyncIterator
15- from contextlib import suppress
16- from pprint import pformat
15+ from datetime import timedelta
1716from typing import Final
1817
18+ from common_library .json_serialization import json_dumps , representation_encoder
1919from fastapi import FastAPI , HTTPException
2020from fastapi_lifespan_manager import State
2121from models_library .services import ServiceMetaDataPublished
2222from models_library .services_types import ServiceKey , ServiceVersion
2323from packaging .version import Version
2424from pydantic import ValidationError
25+ from servicelib .async_utils import cancel_wait_task
26+ from servicelib .background_task_utils import exclusive_periodic
2527from sqlalchemy .exc import SQLAlchemyError
2628from sqlalchemy .ext .asyncio import AsyncEngine
2729
3537_logger = logging .getLogger (__name__ )
3638
3739
38- async def _list_services_in_database (
39- db_engine : AsyncEngine ,
40- ):
41- services_repo = ServicesRepository (db_engine = db_engine )
40+ async def _list_services_in_database (app : FastAPI ):
41+ services_repo = ServicesRepository (app .state .engine )
4242 return {
4343 (service .key , service .version )
4444 for service in await services_repo .list_services ()
@@ -117,15 +117,17 @@ async def _ensure_registry_and_database_are_synced(app: FastAPI) -> None:
117117 services_in_manifest_map = await manifest .get_services_map (director_api )
118118
119119 services_in_db : set [tuple [ServiceKey , ServiceVersion ]] = (
120- await _list_services_in_database (app . state . engine )
120+ await _list_services_in_database (app )
121121 )
122122
123123 # check that the db has all the services at least once
124124 missing_services_in_db = set (services_in_manifest_map .keys ()) - services_in_db
125125 if missing_services_in_db :
126126 _logger .debug (
127127 "Missing services in db: %s" ,
128- pformat (missing_services_in_db ),
128+ json_dumps (
129+ missing_services_in_db , default = representation_encoder , indent = 1
130+ ),
129131 )
130132
131133 # update db
@@ -175,7 +177,7 @@ async def _ensure_published_templates_accessible(
175177 await services_repo .upsert_service_access_rights (missing_services_access_rights )
176178
177179
178- async def _run_sync_services (app : FastAPI ):
180+ async def _sync_services_in_registry (app : FastAPI ):
179181 default_product : Final [str ] = app .state .default_product_name
180182 engine : AsyncEngine = app .state .engine
181183
@@ -187,57 +189,32 @@ async def _run_sync_services(app: FastAPI):
187189 await _ensure_published_templates_accessible (engine , default_product )
188190
189191
190- async def _sync_services_task (app : FastAPI ) -> None :
191- while app .state .registry_syncer_running :
192- try :
193- _logger .debug ("Syncing services between registry and database..." )
194-
195- await _run_sync_services (app )
196-
197- await asyncio .sleep (app .state .settings .CATALOG_BACKGROUND_TASK_REST_TIME )
192+ _TASK_NAME_PERIODIC_SYNC_SERVICES = f"{ __name__ } .{ _sync_services_in_registry .__name__ } "
198193
199- except asyncio .CancelledError : # noqa: PERF203
200- # task is stopped
201- _logger .info ("registry syncing task cancelled" )
202- raise
203-
204- except Exception : # pylint: disable=broad-except
205- if not app .state .registry_syncer_running :
206- _logger .warning ("registry syncing task forced to stop" )
207- break
208- _logger .exception (
209- "Unexpected error while syncing registry entries, restarting now..."
210- )
211- # wait a bit before retrying, so it does not block everything until the director is up
212- await asyncio .sleep (
213- app .state .settings .CATALOG_BACKGROUND_TASK_WAIT_AFTER_FAILURE
214- )
215-
216-
217- async def start_registry_sync_task (app : FastAPI ) -> None :
218- # FIXME: added this variable to overcome the state in which the
219- # task cancelation is ignored and the exceptions enter in a loop
220- # that never stops the background task. This flag is an additional
221- # mechanism to enforce stopping the background task
222- app .state .registry_syncer_running = True
223- task = asyncio .create_task (_sync_services_task (app ))
224- app .state .registry_sync_task = task
225- _logger .info ("registry syncing task started" )
226194
195+ async def background_task_lifespan (app : FastAPI ) -> AsyncIterator [State ]:
196+ assert app .state .settings # nosec
197+ assert app .state .redis_client # nosec
198+ assert app .state .engine # nosec
199+ assert app .state .default_product_name # nosec
200+
201+ settings = app .state .settings
202+
203+ @exclusive_periodic (
204+ app .state .redis_client ,
205+ task_interval = timedelta (seconds = settings .CATALOG_BACKGROUND_TASK_REST_TIME ),
206+ retry_after = timedelta (
207+ seconds = settings .CATALOG_BACKGROUND_TASK_WAIT_AFTER_FAILURE
208+ ),
209+ )
210+ async def _sync_services_task () -> None :
211+ await _sync_services_in_registry (app )
227212
228- async def stop_registry_sync_task (app : FastAPI ) -> None :
229- if task := app .state .registry_sync_task :
230- with suppress (asyncio .CancelledError ):
231- app .state .registry_syncer_running = False
232- task .cancel ()
233- await task
234- app .state .registry_sync_task = None
235- _logger .info ("registry syncing task stopped" )
213+ app .state .sync_services_task = asyncio .create_task (
214+ _sync_services_task (), name = _TASK_NAME_PERIODIC_SYNC_SERVICES
215+ )
236216
217+ yield {}
237218
238- async def background_task_lifespan (app : FastAPI ) -> AsyncIterator [State ]:
239- await start_registry_sync_task (app )
240- try :
241- yield {}
242- finally :
243- await stop_registry_sync_task (app )
219+ assert isinstance (app .state .sync_services_task , asyncio .Task ) # nosec
220+ await cancel_wait_task (app .state .sync_services_task )
0 commit comments