88from models_library .users import UserID
99from servicelib .background_task import start_periodic_task , stop_periodic_task
1010from servicelib .logging_utils import log_context
11- from servicelib .redis import RedisClientSDK
1211from servicelib .redis_utils import exclusive
1312from servicelib .utils import limited_gather
14- from settings_library .redis import RedisDatabase
1513
1614from ...models .comp_runs import RunMetadataDict
1715from ...utils .rabbitmq import publish_project_log
1816from ..db import get_db_engine
1917from ..db .repositories .comp_pipelines import CompPipelinesRepository
2018from ..db .repositories .comp_runs import CompRunsRepository
2119from ..rabbitmq import get_rabbitmq_client
22- from ..redis import get_redis_client_manager
2320from ._constants import (
2421 MAX_CONCURRENT_PIPELINE_SCHEDULING ,
25- MODULE_NAME ,
22+ MODULE_NAME_SCHEDULER ,
2623 SCHEDULER_INTERVAL ,
2724)
2825from ._publisher import request_pipeline_scheduling
29- from ._utils import SCHEDULED_STATES
26+ from ._utils import SCHEDULED_STATES , get_redis_client_from_app , get_redis_lock_key
3027
3128_logger = logging .getLogger (__name__ )
3229
@@ -99,28 +96,13 @@ async def stop_pipeline(
9996 if updated_comp_run :
10097 # ensure the scheduler starts right away
10198 rabbitmq_client = get_rabbitmq_client (app )
102- await request_pipeline_scheduling (updated_comp_run , rabbitmq_client , db_engine )
103-
104-
105- def _get_app_from_args (* args , ** kwargs ) -> FastAPI :
106- assert kwargs is not None # nosec
107- if args :
108- app = args [0 ]
109- else :
110- assert "app" in kwargs # nosec
111- app = kwargs ["app" ]
112- assert isinstance (app , FastAPI ) # nosec
113- return app
114-
115-
116- def _redis_client_getter (* args , ** kwargs ) -> RedisClientSDK :
117- app = _get_app_from_args (* args , ** kwargs )
118- return get_redis_client_manager (app ).client (RedisDatabase .LOCKS )
119-
120-
121- def _redis_lock_key_builder (* args , ** kwargs ) -> str :
122- app = _get_app_from_args (* args , ** kwargs )
123- return f"{ app .title } _{ MODULE_NAME } "
99+ await request_pipeline_scheduling (
100+ rabbitmq_client ,
101+ db_engine ,
102+ user_id = updated_comp_run .user_id ,
103+ project_id = updated_comp_run .project_uuid ,
104+ iteration = updated_comp_run .iteration ,
105+ )
124106
125107
126108async def _get_pipeline_dag (project_id : ProjectID , db_engine : Engine ) -> nx .DiGraph :
@@ -129,7 +111,12 @@ async def _get_pipeline_dag(project_id: ProjectID, db_engine: Engine) -> nx.DiGr
129111 return pipeline_at_db .get_graph ()
130112
131113
132- @exclusive (_redis_client_getter , lock_key = _redis_lock_key_builder )
114+ @exclusive (
115+ get_redis_client_from_app ,
116+ lock_key = get_redis_lock_key (
117+ MODULE_NAME_SCHEDULER , unique_lock_key_builder = lambda : ""
118+ ),
119+ )
133120async def schedule_pipelines (app : FastAPI ) -> None :
134121 with log_context (_logger , logging .DEBUG , msg = "scheduling pipelines" ):
135122 db_engine = get_db_engine (app )
@@ -159,7 +146,7 @@ async def setup_manager(app: FastAPI) -> None:
159146 app .state .scheduler_manager = start_periodic_task (
160147 schedule_pipelines ,
161148 interval = SCHEDULER_INTERVAL ,
162- task_name = MODULE_NAME ,
149+ task_name = MODULE_NAME_SCHEDULER ,
163150 app = app ,
164151 )
165152
0 commit comments