66import asyncio
77import logging
88from collections .abc import AsyncIterator , Callable
9+ from datetime import timedelta
910
1011from aiohttp import web
1112from models_library .users import UserID
13+ from servicelib .async_utils import cancel_wait_task
14+ from servicelib .background_task_utils import exclusive_periodic
1215from servicelib .logging_utils import get_log_record_extra , log_context
13- from tenacity import retry
14- from tenacity .before_sleep import before_sleep_log
15- from tenacity .wait import wait_exponential
16+ from simcore_service_webserver .redis import get_redis_lock_manager_client_sdk
1617
1718from ..login import login_service
1819from ..security import security_service
2324CleanupContextFunc = Callable [[web .Application ], AsyncIterator [None ]]
2425
2526
26- _PERIODIC_TASK_NAME = f"{ __name__ } .update_expired_users_periodically"
27- _APP_TASK_KEY = f"{ _PERIODIC_TASK_NAME } .task"
28-
29-
3027async def notify_user_logout_all_sessions (
3128 app : web .Application , user_id : UserID
3229) -> None :
@@ -49,15 +46,7 @@ async def notify_user_logout_all_sessions(
4946 )
5047
5148
52- @retry (
53- wait = wait_exponential (min = 5 , max = 20 ),
54- before_sleep = before_sleep_log (_logger , logging .WARNING ),
55- # NOTE: this function does suppresses all exceptions and retry indefinitly
56- )
5749async def _update_expired_users (app : web .Application ):
58- """
59- It is resilient, i.e. if update goes wrong, it waits a bit and retries
60- """
6150
6251 if updated := await update_expired_users (app ):
6352 # expired users might be cached in the auth. If so, any request
@@ -81,36 +70,36 @@ async def _update_expired_users(app: web.Application):
8170 _logger .info ("No users expired" )
8271
8372
84- async def _update_expired_users_periodically (
85- app : web .Application , wait_interval_s : float
86- ):
87- """Periodically checks expiration dates and updates user status"""
73+ def create_background_task_for_trial_accounts (wait_s : float ) -> CleanupContextFunc :
8874
89- while True :
90- await _update_expired_users (app )
91- await asyncio .sleep (wait_interval_s )
75+ async def _cleanup_ctx_fun (app : web .Application ) -> AsyncIterator [None ]:
9276
77+ @exclusive_periodic (
78+ # Function-exclusiveness is required to avoid multiple tasks like thisone running concurrently
79+ get_redis_lock_manager_client_sdk (app ),
80+ task_interval = timedelta (seconds = wait_s ),
81+ retry_after = timedelta (minutes = 5 ),
82+ )
83+ async def _update_expired_users_periodically () -> None :
84+ with log_context (_logger , logging .INFO , "Updating expired users" ):
85+ await _update_expired_users (app )
9386
94- def create_background_task_for_trial_accounts (
95- wait_s : float , task_name : str = _PERIODIC_TASK_NAME
96- ) -> CleanupContextFunc :
97- async def _cleanup_ctx_fun (
98- app : web .Application ,
99- ) -> AsyncIterator [None ]:
10087 # setup
88+ task_name = _update_expired_users_periodically .__name__
89+
10190 task = asyncio .create_task (
102- _update_expired_users_periodically (app , wait_s ),
91+ _update_expired_users_periodically (),
10392 name = task_name ,
10493 )
105- app [_APP_TASK_KEY ] = task
94+
95+ # prevents premature garbage collection of the task
96+ app_task_key = f"tasks.{ task_name } "
97+ app [app_task_key ] = task
10698
10799 yield
108100
109101 # tear-down
110- task .cancel ()
111- try :
112- await task
113- except asyncio .CancelledError :
114- assert task .cancelled () # nosec
102+ await cancel_wait_task (task )
103+ app .pop (app_task_key , None )
115104
116105 return _cleanup_ctx_fun
0 commit comments