99from settings_library .redis import RedisDatabase
1010from simcore_service_storage .modules .celery .tasks import archive
1111
12- from ...core .settings import ApplicationSettings , get_application_settings
12+ from ...core .settings import get_application_settings
1313
1414_log = logging .getLogger (__name__ )
1515
1616
1717class CeleryTaskQueue :
18- def __init__ (self , app_settings : ApplicationSettings ):
19- assert app_settings .STORAGE_REDIS
20- redis_dsn = app_settings .STORAGE_REDIS .build_redis_dsn (
21- RedisDatabase .CELERY_TASKS ,
22- )
23-
24- self ._celery_app = Celery (
25- broker = redis_dsn ,
26- backend = redis_dsn ,
27- )
28-
29- @property
30- def celery_app (self ):
31- return self ._celery_app
18+ def __init__ (self , celery_app : Celery ):
19+ self ._celery_app = celery_app
3220
3321 def create_task (self , task_fn : Callable ):
3422 self ._celery_app .task ()(task_fn )
@@ -40,13 +28,23 @@ def cancel_task(self, task_id: str):
4028 self ._celery_app .control .revoke (task_id )
4129
4230
43- # TODO: use new FastAPI lifespan
31+ # TODO: move and use new FastAPI lifespan
4432def setup_celery (app : FastAPI ) -> None :
4533 async def on_startup () -> None :
4634 settings = get_application_settings (app )
4735 assert settings .STORAGE_REDIS
4836
49- task_queue = CeleryTaskQueue (settings )
37+ assert settings .STORAGE_REDIS
38+ redis_dsn = settings .STORAGE_REDIS .build_redis_dsn (
39+ RedisDatabase .CELERY_TASKS ,
40+ )
41+
42+ celery_app = Celery (
43+ broker = redis_dsn ,
44+ backend = redis_dsn ,
45+ )
46+
47+ task_queue = CeleryTaskQueue (celery_app )
5048 task_queue .create_task (archive )
5149
5250 app .state .task_queue = task_queue
0 commit comments