2424
2525
2626class SchedulingAPI (AbstractSchedulingAPI ):
27- def __init__ (
28- self , session_id : str , address : str , manager_ref = None , queueing_ref = None
29- ):
27+ def __init__ (self , session_id : str , address : str , manager_ref = None ):
3028 self ._session_id = session_id
3129 self ._address = address
3230
3331 self ._manager_ref = manager_ref
34- self ._queueing_ref = queueing_ref
3532
3633 @classmethod
3734 @alru_cache
@@ -41,20 +38,30 @@ async def create(cls: Type[APIType], session_id: str, address: str) -> APIType:
4138 manager_ref = await mo .actor_ref (
4239 SubtaskManagerActor .gen_uid (session_id ), address = address
4340 )
44- from ..supervisor .queueing import SubtaskQueueingActor
45-
46- queueing_ref = await mo .actor_ref (
47- SubtaskQueueingActor .gen_uid (session_id ), address = address
48- )
4941
50- scheduling_api = SchedulingAPI (session_id , address , manager_ref , queueing_ref )
42+ scheduling_api = SchedulingAPI (session_id , address , manager_ref )
5143 return scheduling_api
5244
5345 async def get_subtask_schedule_summaries (
5446 self , task_id : Optional [str ] = None
5547 ) -> List [SubtaskScheduleSummary ]:
5648 return await self ._manager_ref .get_schedule_summaries (task_id )
5749
50+ async def cache_subtasks (
51+ self , subtasks : List [Subtask ], priorities : Optional [List [Tuple ]] = None
52+ ):
53+ """
54+ Add subtask graph to cache for fast forwarding
55+
56+ Parameters
57+ ----------
58+ subtasks
59+ list of subtasks to be submitted to service
60+ priorities
61+ list of priorities of subtasks
62+ """
63+ await self ._manager_ref .cache_subtasks (subtasks , priorities )
64+
5865 async def add_subtasks (
5966 self , subtasks : List [Subtask ], priorities : Optional [List [Tuple ]] = None
6067 ):
@@ -88,12 +95,12 @@ async def update_subtask_priority(self, subtask_id: str, priority: Tuple):
8895
8996 @update_subtask_priority .batch
9097 async def update_subtask_priority (self , args_list , kwargs_list ):
91- await self . _queueing_ref . update_subtask_priority . batch (
92- * (
93- self ._queueing_ref . update_subtask_priority .delay (* args , ** kwargs )
94- for args , kwargs in zip ( args_list , kwargs_list )
95- )
96- )
98+ subtask_ids , priorities = [], []
99+ for args , kwargs in zip ( args_list , kwargs_list ):
100+ subtask_id , priority = self .update_subtask_priority .bind (* args , ** kwargs )
101+ subtask_ids . append ( subtask_id )
102+ priorities . append ( priority )
103+ await self . _manager_ref . update_subtask_priorities ( subtask_ids , priorities )
97104
98105 async def cancel_subtasks (
99106 self , subtask_ids : List [str ], kill_timeout : Union [float , int ] = 5
@@ -128,33 +135,39 @@ async def finish_subtasks(self, subtask_ids: List[str], schedule_next: bool = Tr
128135class MockSchedulingAPI (SchedulingAPI ):
129136 @classmethod
130137 async def create (cls : Type [APIType ], session_id : str , address : str ) -> APIType :
131- from ..supervisor import GlobalSlotManagerActor , AutoscalerActor
132-
133- await mo .create_actor (
134- GlobalSlotManagerActor ,
135- uid = GlobalSlotManagerActor .default_uid (),
136- address = address ,
137- )
138- await mo .create_actor (
139- AutoscalerActor , {}, uid = AutoscalerActor .default_uid (), address = address
140- )
138+ # from ..supervisor import AutoscalerActor
139+ # await mo.create_actor(
140+ # AutoscalerActor, {}, uid=AutoscalerActor.default_uid(), address=address
141+ # )
141142
142143 from .... import resource as mars_resource
143144 from ..worker import (
144145 SubtaskExecutionActor ,
145- WorkerSlotManagerActor ,
146+ SubtaskPrepareQueueActor ,
147+ SubtaskExecutionQueueActor ,
146148 WorkerQuotaManagerActor ,
149+ SlotManagerActor ,
147150 )
148151
149152 await mo .create_actor (
150- SubtaskExecutionActor ,
151- subtask_max_retries = 0 ,
152- uid = SubtaskExecutionActor .default_uid (),
153+ SlotManagerActor ,
154+ uid = SlotManagerActor .default_uid (),
153155 address = address ,
154156 )
155157 await mo .create_actor (
156- WorkerSlotManagerActor ,
157- uid = WorkerSlotManagerActor .default_uid (),
158+ SubtaskPrepareQueueActor ,
159+ uid = SubtaskPrepareQueueActor .default_uid (),
160+ address = address ,
161+ )
162+ await mo .create_actor (
163+ SubtaskExecutionQueueActor ,
164+ uid = SubtaskExecutionQueueActor .default_uid (),
165+ address = address ,
166+ )
167+ await mo .create_actor (
168+ SubtaskExecutionActor ,
169+ subtask_max_retries = 0 ,
170+ uid = SubtaskExecutionActor .default_uid (),
158171 address = address ,
159172 )
160173 await mo .create_actor (
0 commit comments